diff options
Diffstat (limited to 'examples/redis-unstable/src/t_stream.c')
| -rw-r--r-- | examples/redis-unstable/src/t_stream.c | 5755 |
1 files changed, 0 insertions, 5755 deletions
diff --git a/examples/redis-unstable/src/t_stream.c b/examples/redis-unstable/src/t_stream.c deleted file mode 100644 index 497c714..0000000 --- a/examples/redis-unstable/src/t_stream.c +++ /dev/null @@ -1,5755 +0,0 @@ -/* - * Copyright (c) 2017-Present, Redis Ltd. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - */ - -#include "server.h" -#include "endianconv.h" -#include "stream.h" -#include "xxhash.h" -#include <string.h> - -/* Every stream item inside the listpack, has a flags field that is used to - * mark the entry as deleted, or having the same field as the "master" - * entry at the start of the listpack> */ -#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ -#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ -#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ - -/* For stream commands that require multiple IDs - * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN', - * avoid malloc allocation.*/ -#define STREAMID_STATIC_VECTOR_LEN 8 - -/* Max pre-allocation for listpack. This is done to avoid abuse of a user - * setting stream_node_max_bytes to a huge number. */ -#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096 - -/* Don't let listpacks grow too big, even if the user config allows it. - * doing so can lead to an overflow (trying to store more than 32bit length - * into the listpack header), or actually an assertion since lpInsert - * will return NULL. */ -#define STREAM_LISTPACK_MAX_SIZE (1<<30) - -void streamFreeCGGeneric(void *cg, void *s); -void streamFreeNACK(stream *s, streamNACK *na); -size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer); -int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given); -int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); - -int streamEntryIsReferenced(stream *s, streamID *id); -void streamCleanupEntryCGroupRefs(stream *s, streamID *id); -void streamUpdateCGroupLastId(stream *s, streamCG *cg, streamID *id); -void trackStreamClaimTimeouts(client *c, robj **keys, int numkeys, uint64_t expire_time); - -/* Forward declarations for IDMP functions (defined at end of file) */ -static void trackStreamIdmpEntries(client *c, robj *key); -static void streamClearIdmpEntries(stream *s); -static void idmpInsertEntry(stream *s, idmpProducer *producer, idmpEntry *entry, const streamID *id); -static int idmpLookupAndReply(stream *s, idmpProducer *producer, idmpEntry *entry, client *c); -static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t pid_len); -static int createIdempotencyHash(robj **argv, int64_t numfields, XXH128_hash_t *out_hash); -static void idmpEvictOldestEntry(stream *s, idmpProducer *producer); - -/* ----------------------------------------------------------------------- - * Low level stream encoding: a radix tree of listpacks. - * ----------------------------------------------------------------------- */ - -/* Create a new stream data structure. */ -stream *streamNew(void) { - size_t usable; - stream *s = zmalloc_usable(sizeof(*s), &usable); - s->alloc_size = usable; - s->rax = raxNewWithMetadata(0, &s->alloc_size); - s->length = 0; - s->first_id.ms = 0; - s->first_id.seq = 0; - s->last_id.ms = 0; - s->last_id.seq = 0; - s->max_deleted_entry_id.seq = 0; - s->max_deleted_entry_id.ms = 0; - s->entries_added = 0; - s->cgroups = NULL; /* Created on demand to save memory when not used. */ - s->cgroups_ref = NULL; - s->min_cgroup_last_id.ms = UINT64_MAX; - s->min_cgroup_last_id.seq = UINT64_MAX; - s->min_cgroup_last_id_valid = 0; - s->idmp_duration = server.stream_idmp_duration; /* Default from server config */ - s->idmp_max_entries = server.stream_idmp_maxsize; /* Default from server config */ - s->idmp_producers = NULL; /* Created on demand to save memory when not used. */ - s->iids_added = 0; - s->iids_duplicates = 0; - return s; -} - -static void streamLpFreeGeneric(void *lp, void *strm) { - stream *s = strm; - s->alloc_size -= lpBytes(lp); - lpFree(lp); -} - -void streamFreeIdmpProducerGeneric(void *producer, void *strm) { - stream *s = strm; - idmpProducerFree((idmpProducer *)producer, &s->alloc_size); -} - -/* Free a stream, including the listpacks stored inside the radix tree. */ -void freeStream(stream *s) { - raxFreeWithCbAndContext(s->rax, streamLpFreeGeneric, s); - if (s->cgroups) - raxFreeWithCbAndContext(s->cgroups, streamFreeCGGeneric, s); - if (s->cgroups_ref) - raxFreeWithCallback(s->cgroups_ref, listReleaseGeneric); - /* Free IDMP producers rax tree */ - if (s->idmp_producers) - raxFreeWithCbAndContext(s->idmp_producers, streamFreeIdmpProducerGeneric, s); - debugServerAssert(s->alloc_size == zmalloc_usable_size(s)); - zfree(s); -} - -/* Return the length of a stream. */ -unsigned long streamLength(const robj *subject) { - stream *s = subject->ptr; - return s->length; -} - -/* Set 'id' to be its successor stream ID. - * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a - * C_ERR is returned. */ -int streamIncrID(streamID *id) { - int ret = C_OK; - if (id->seq == UINT64_MAX) { - if (id->ms == UINT64_MAX) { - /* Special case where 'id' is the last possible streamID... */ - id->ms = id->seq = 0; - ret = C_ERR; - } else { - id->ms++; - id->seq = 0; - } - } else { - id->seq++; - } - return ret; -} - -/* Set 'id' to be its predecessor stream ID. - * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is - * returned. */ -int streamDecrID(streamID *id) { - int ret = C_OK; - if (id->seq == 0) { - if (id->ms == 0) { - /* Special case where 'id' is the first possible streamID... */ - id->ms = id->seq = UINT64_MAX; - ret = C_ERR; - } else { - id->ms--; - id->seq = UINT64_MAX; - } - } else { - id->seq--; - } - return ret; -} - -/* Generate the next stream item ID given the previous one. If the current - * milliseconds Unix time is greater than the previous one, just use this - * as time part and start with sequence part of zero. Otherwise we use the - * previous time (and never go backward) and increment the sequence. */ -void streamNextID(streamID *last_id, streamID *new_id) { - uint64_t ms = commandTimeSnapshot(); - if (ms > last_id->ms) { - new_id->ms = ms; - new_id->seq = 0; - } else { - *new_id = *last_id; - streamIncrID(new_id); - } -} - -/* This is a helper function for the COPY command. - * Duplicate a Stream object, with the guarantee that the returned object - * has the same encoding as the original one. - * - * The resulting object always has refcount set to 1 */ -robj *streamDup(robj *o) { - robj *sobj; - - serverAssert(o->type == OBJ_STREAM); - - switch (o->encoding) { - case OBJ_ENCODING_STREAM: - sobj = createStreamObject(); - break; - default: - serverPanic("Wrong encoding."); - break; - } - - stream *s; - stream *new_s; - s = o->ptr; - new_s = sobj->ptr; - - raxIterator ri; - raxStart(&ri, s->rax); - raxSeek(&ri, "^", NULL, 0); - size_t lp_bytes = 0; /* Total bytes in the listpack. */ - unsigned char *lp = NULL; /* listpack pointer. */ - /* Get a reference to the listpack node. */ - while (raxNext(&ri)) { - serverAssert(ri.key_len == sizeof(streamID)); - lp = ri.data; - lp_bytes = lpBytes(lp); - unsigned char *new_lp = zmalloc(lp_bytes); - new_s->alloc_size += lp_bytes; - memcpy(new_lp, lp, lp_bytes); - raxInsert(new_s->rax, ri.key, ri.key_len, - new_lp, NULL); - } - new_s->length = s->length; - new_s->first_id = s->first_id; - new_s->last_id = s->last_id; - new_s->max_deleted_entry_id = s->max_deleted_entry_id; - new_s->entries_added = s->entries_added; - raxStop(&ri); - - if (s->cgroups == NULL) return sobj; - - /* Consumer Groups */ - raxIterator ri_cgroups; - raxStart(&ri_cgroups, s->cgroups); - raxSeek(&ri_cgroups, "^", NULL, 0); - while (raxNext(&ri_cgroups)) { - streamCG *cg = ri_cgroups.data; - streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, - ri_cgroups.key_len, &cg->last_id, - cg->entries_read); - - serverAssert(new_cg != NULL); - - /* Consumer Group PEL */ - raxIterator ri_cg_pel; - raxStart(&ri_cg_pel,cg->pel); - raxSeek(&ri_cg_pel,"^",NULL,0); - while(raxNext(&ri_cg_pel)){ - streamNACK *nack = ri_cg_pel.data; - streamNACK *new_nack = streamCreateNACK(new_s, NULL); - new_nack->delivery_time = nack->delivery_time; - new_nack->delivery_count = nack->delivery_count; - new_nack->cgroup_ref_node = streamLinkCGroupToEntry(new_s, new_cg, ri_cg_pel.key); - raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL); - - streamID id; - streamDecodeID(ri_cg_pel.key, &id); - raxInsertPelByTime(new_cg->pel_by_time, new_nack->delivery_time, &id); - } - raxStop(&ri_cg_pel); - - /* Consumers */ - raxIterator ri_consumers; - raxStart(&ri_consumers, cg->consumers); - raxSeek(&ri_consumers, "^", NULL, 0); - while (raxNext(&ri_consumers)) { - streamConsumer *consumer = ri_consumers.data; - streamConsumer *new_consumer; - size_t usable; - new_consumer = zmalloc_usable(sizeof(*new_consumer), &usable); - new_s->alloc_size += usable; - new_consumer->name = sdsdup(consumer->name); - new_s->alloc_size += sdsAllocSize(new_consumer->name); - new_consumer->pel = raxNewWithMetadata(0, &new_s->alloc_size); - raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, - sdslen(new_consumer->name), new_consumer, NULL); - new_consumer->seen_time = consumer->seen_time; - new_consumer->active_time = consumer->active_time; - - /* Consumer PEL */ - raxIterator ri_cpel; - raxStart(&ri_cpel, consumer->pel); - raxSeek(&ri_cpel, "^", NULL, 0); - while (raxNext(&ri_cpel)) { - void *result; - int found = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID),&result); - - serverAssert(found); - - streamNACK *new_nack = result; - new_nack->consumer = new_consumer; - raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL); - } - raxStop(&ri_cpel); - } - raxStop(&ri_consumers); - } - raxStop(&ri_cgroups); - return sobj; -} - -/* This is a wrapper function for lpGet() to directly get an integer value - * from the listpack (that may store numbers as a string), converting - * the string if needed. - * The 'valid' argument is an optional output parameter to get an indication - * if the record was valid, when this parameter is NULL, the function will - * fail with an assertion. */ -static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) { - int64_t v; - unsigned char *e = lpGet(ele,&v,NULL); - if (e == NULL) { - if (valid) - *valid = 1; - return v; - } - /* The following code path should never be used for how listpacks work: - * they should always be able to store an int64_t value in integer - * encoded form. However the implementation may change. */ - long long ll = 0; - int ret = string2ll((char*)e,v,&ll); - if (valid) - *valid = ret; - else - serverAssert(ret != 0); - v = ll; - return v; -} - -#define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL) - -/* Get an edge streamID of a given listpack. - * 'master_id' is an input param, used to build the 'edge_id' output param */ -int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id) -{ - if (lp == NULL) - return 0; - - unsigned char *lp_ele; - - /* We need to seek either the first or the last entry depending - * on the direction of the iteration. */ - if (first) { - /* Get the master fields count. */ - lp_ele = lpFirst(lp); /* Seek items count */ - lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */ - lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */ - int64_t master_fields_count = lpGetInteger(lp_ele); - lp_ele = lpNext(lp, lp_ele); /* Seek first field. */ - - /* If we are iterating in normal order, skip the master fields - * to seek the first actual entry. */ - for (int64_t i = 0; i < master_fields_count; i++) - lp_ele = lpNext(lp, lp_ele); - - /* If we are going forward, skip the previous entry's - * lp-count field (or in case of the master entry, the zero - * term field) */ - lp_ele = lpNext(lp, lp_ele); - if (lp_ele == NULL) - return 0; - } else { - /* If we are iterating in reverse direction, just seek the - * last part of the last entry in the listpack (that is, the - * fields count). */ - lp_ele = lpLast(lp); - - /* If we are going backward, read the number of elements this - * entry is composed of, and jump backward N times to seek - * its start. */ - int64_t lp_count = lpGetInteger(lp_ele); - if (lp_count == 0) /* We reached the master entry. */ - return 0; - - while (lp_count--) - lp_ele = lpPrev(lp, lp_ele); - } - - lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */ - - /* Get the ID: it is encoded as difference between the master - * ID and this entry ID. */ - streamID id = *master_id; - id.ms += lpGetInteger(lp_ele); - lp_ele = lpNext(lp, lp_ele); - id.seq += lpGetInteger(lp_ele); - *edge_id = id; - return 1; -} - -/* Debugging function to log the full content of a listpack. Useful - * for development and debugging. */ -void streamLogListpackContent(unsigned char *lp) { - unsigned char *p = lpFirst(lp); - while(p) { - unsigned char buf[LP_INTBUF_SIZE]; - int64_t v; - unsigned char *ele = lpGet(p,&v,buf); - serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele); - p = lpNext(lp,p); - } -} - -/* Convert the specified stream entry ID as a 128 bit big endian number, so - * that the IDs can be sorted lexicographically. */ -void streamEncodeID(void *buf, streamID *id) { - uint64_t e[2]; - e[0] = htonu64(id->ms); - e[1] = htonu64(id->seq); - memcpy(buf,e,sizeof(e)); -} - -/* This is the reverse of streamEncodeID(): the decoded ID will be stored - * in the 'id' structure passed by reference. The buffer 'buf' must point - * to a 128 bit big-endian encoded ID. */ -void streamDecodeID(void *buf, streamID *id) { - uint64_t e[2]; - memcpy(e,buf,sizeof(e)); - id->ms = ntohu64(e[0]); - id->seq = ntohu64(e[1]); -} - -/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */ -int streamCompareID(streamID *a, streamID *b) { - if (a->ms > b->ms) return 1; - else if (a->ms < b->ms) return -1; - /* The ms part is the same. Check the sequence part. */ - else if (a->seq > b->seq) return 1; - else if (a->seq < b->seq) return -1; - /* Everything is the same: IDs are equal. */ - return 0; -} - -/* Retrieves the ID of the stream edge entry. An edge is either the first or - * the last ID in the stream, and may be a tombstone. To filter out tombstones, - * set the'skip_tombstones' argument to 1. */ -void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id) -{ - streamIterator si; - int64_t numfields; - streamIteratorStart(&si,s,NULL,NULL,!first); - si.skip_tombstones = skip_tombstones; - int found = streamIteratorGetID(&si,edge_id,&numfields); - if (!found) { - streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX}; - *edge_id = first ? max_id : min_id; - } - streamIteratorStop(&si); -} - -/* Adds a new item into the stream 's' having the specified number of - * field-value pairs as specified in 'numfields' and stored into 'argv'. - * Returns the new entry ID populating the 'added_id' structure. - * - * If 'use_id' is not NULL, the ID is not auto-generated by the function, - * but instead the passed ID is used to add the new entry. In this case - * adding the entry may fail as specified later in this comment. - * - * When 'use_id' is used alongside with a zero 'seq-given', the sequence - * part of the passed ID is ignored and the function will attempt to use an - * auto-generated sequence. - * - * The function returns C_OK if the item was added, this is always true - * if the ID was generated by the function. However the function may return - * C_ERR in several cases: - * 1. If an ID was given via 'use_id', but adding it failed since the - * current top ID is greater or equal. errno will be set to EDOM. - * 2. If a size of a single element or the sum of the elements is too big to - * be stored into the stream. errno will be set to ERANGE. */ -int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) { - - /* Generate the new entry ID. */ - streamID id; - if (use_id) { - if (seq_given) { - id = *use_id; - } else { - /* The automatically generated sequence can be either zero (new - * timestamps) or the incremented sequence of the last ID. In the - * latter case, we need to prevent an overflow/advancing forward - * in time. */ - if (s->last_id.ms == use_id->ms) { - if (s->last_id.seq == UINT64_MAX) { - errno = EDOM; - return C_ERR; - } - id = s->last_id; - id.seq++; - } else { - id = *use_id; - } - } - } else { - streamNextID(&s->last_id,&id); - } - - /* Check that the new ID is greater than the last entry ID - * or return an error. Automatically generated IDs might - * overflow (and wrap-around) when incrementing the sequence - part. */ - if (streamCompareID(&id,&s->last_id) <= 0) { - errno = EDOM; - return C_ERR; - } - - /* Avoid overflow when trying to add an element to the stream (listpack - * can only host up to 32bit length strings, and also a total listpack size - * can't be bigger than 32bit length. */ - size_t totelelen = 0; - for (int64_t i = 0; i < numfields*2; i++) { - sds ele = argv[i]->ptr; - totelelen += sdslen(ele); - } - if (totelelen > STREAM_LISTPACK_MAX_SIZE) { - errno = ERANGE; - return C_ERR; - } - - /* Add the new entry. */ - raxIterator ri; - raxStart(&ri,s->rax); - raxSeek(&ri,"$",NULL,0); - - size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ - unsigned char *lp = NULL; /* Tail listpack pointer. */ - - if (!raxEOF(&ri)) { - /* Get a reference to the tail node listpack. */ - lp = ri.data; - lp_bytes = lpBytes(lp); - } - raxStop(&ri); - - /* We have to add the key into the radix tree in lexicographic order, - * to do so we consider the ID as a single 128 bit number written in - * big endian, so that the most significant bytes are the first ones. */ - uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ - streamID master_id; /* ID of the master entry in the listpack. */ - - /* Create a new listpack and radix tree node if needed. Note that when - * a new listpack is created, we populate it with a "master entry". This - * is just a set of fields that is taken as references in order to compress - * the stream entries that we'll add inside the listpack. - * - * Note that while we use the first added entry fields to create - * the master entry, the first added entry is NOT represented in the master - * entry, which is a stand alone object. But of course, the first entry - * will compress well because it's used as reference. - * - * The master entry is composed like in the following example: - * - * +-------+---------+------------+---------+--/--+---------+---------+-+ - * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0| - * +-------+---------+------------+---------+--/--+---------+---------+-+ - * - * count and deleted just represent respectively the total number of - * entries inside the listpack that are valid, and marked as deleted - * (deleted flag in the entry flags set). So the total number of items - * actually inside the listpack (both deleted and not) is count+deleted. - * - * The real entries will be encoded with an ID that is just the - * millisecond and sequence difference compared to the key stored at - * the radix tree node containing the listpack (delta encoding), and - * if the fields of the entry are the same as the master entry fields, the - * entry flags will specify this fact and the entry fields and number - * of fields will be omitted (see later in the code of this function). - * - * The "0" entry at the end is the same as the 'lp-count' entry in the - * regular stream entries (see below), and marks the fact that there are - * no more entries, when we scan the stream from right to left. */ - - /* First of all, check if we can append to the current macro node or - * if we need to switch to the next one. 'lp' will be set to NULL if - * the current node is full. */ - if (lp != NULL) { - int new_node = 0; - size_t node_max_bytes = server.stream_node_max_bytes; - if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE) - node_max_bytes = STREAM_LISTPACK_MAX_SIZE; - if (lp_bytes + totelelen >= node_max_bytes) { - new_node = 1; - } else if (server.stream_node_max_entries) { - unsigned char *lp_ele = lpFirst(lp); - /* Count both live entries and deleted ones. */ - int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele)); - if (count >= server.stream_node_max_entries) new_node = 1; - } - - if (new_node) { - /* Shrink extra pre-allocated memory */ - lp = lpShrinkToFit(lp); - s->alloc_size -= lp_bytes; - s->alloc_size += lpBytes(lp); - if (ri.data != lp) - raxSetData(ri.node, lp); - lp = NULL; - } - } - - int flags = STREAM_ITEM_FLAG_NONE; - if (lp == NULL) { - master_id = id; - streamEncodeID(rax_key,&id); - /* Create the listpack having the master entry ID and fields. - * Pre-allocate some bytes when creating listpack to avoid realloc on - * every XADD. Since listpack.c uses malloc_size, it'll grow in steps, - * and won't realloc on every XADD. - * When listpack reaches max number of entries, we'll shrink the - * allocation to fit the data. */ - size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE; - if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) { - prealloc = server.stream_node_max_bytes; - } - lp = lpNew(prealloc); - lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ - lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ - lp = lpAppendInteger(lp,numfields); - for (int64_t i = 0; i < numfields; i++) { - sds field = argv[i*2]->ptr; - lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); - } - lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ - s->alloc_size += lpBytes(lp); - raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); - /* The first entry we insert, has obviously the same fields of the - * master entry. */ - flags |= STREAM_ITEM_FLAG_SAMEFIELDS; - } else { - serverAssert(ri.key_len == sizeof(rax_key)); - memcpy(rax_key,ri.key,sizeof(rax_key)); - - /* Read the master ID from the radix tree key. */ - streamDecodeID(rax_key,&master_id); - unsigned char *lp_ele = lpFirst(lp); - - /* Update count and skip the deleted fields. */ - int64_t count = lpGetInteger(lp_ele); - size_t oldsize = lpBytes(lp); - lp = lpReplaceInteger(lp,&lp_ele,count+1); - s->alloc_size -= oldsize; - s->alloc_size += lpBytes(lp); - lp_ele = lpNext(lp,lp_ele); /* seek deleted. */ - lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ - - /* Check if the entry we are adding, have the same fields - * as the master entry. */ - int64_t master_fields_count = lpGetInteger(lp_ele); - lp_ele = lpNext(lp,lp_ele); - if (numfields == master_fields_count) { - int64_t i; - for (i = 0; i < master_fields_count; i++) { - sds field = argv[i*2]->ptr; - int64_t e_len; - unsigned char buf[LP_INTBUF_SIZE]; - unsigned char *e = lpGet(lp_ele,&e_len,buf); - /* Stop if there is a mismatch. */ - if (sdslen(field) != (size_t)e_len || - memcmp(e,field,e_len) != 0) break; - lp_ele = lpNext(lp,lp_ele); - } - /* All fields are the same! We can compress the field names - * setting a single bit in the flags. */ - if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS; - } - } - - /* Populate the listpack with the new entry. We use the following - * encoding: - * - * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ - * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count| - * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ - * - * However if the SAMEFIELD flag is set, we have just to populate - * the entry with the values, so it becomes: - * - * +-----+--------+-------+-/-+-------+--------+ - * |flags|entry-id|value-1|...|value-N|lp-count| - * +-----+--------+-------+-/-+-------+--------+ - * - * The entry-id field is actually two separated fields: the ms - * and seq difference compared to the master entry. - * - * The lp-count field is a number that states the number of listpack pieces - * that compose the entry, so that it's possible to travel the entry - * in reverse order: we can just start from the end of the listpack, read - * the entry, and jump back N times to seek the "flags" field to read - * the stream full entry. */ - size_t oldsize = lpBytes(lp); - lp = lpAppendInteger(lp,flags); - lp = lpAppendInteger(lp,id.ms - master_id.ms); - lp = lpAppendInteger(lp,id.seq - master_id.seq); - if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) - lp = lpAppendInteger(lp,numfields); - for (int64_t i = 0; i < numfields; i++) { - sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; - if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) - lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); - lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); - } - /* Compute and store the lp-count field. */ - int64_t lp_count = numfields; - lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */ - if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { - /* If the item is not compressed, it also has the fields other than - * the values, and an additional num-fields field. */ - lp_count += numfields+1; - } - lp = lpAppendInteger(lp,lp_count); - s->alloc_size -= oldsize; - s->alloc_size += lpBytes(lp); - - /* Insert back into the tree in order to update the listpack pointer. */ - if (ri.data != lp) - raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); - s->length++; - s->entries_added++; - s->last_id = id; - if (s->length == 1) s->first_id = id; - if (added_id) *added_id = id; - return C_OK; -} - -typedef struct { - /* XADD options */ - streamID id; /* User-provided ID, for XADD only. */ - int id_given; /* Was an ID different than "*" specified? for XADD only. */ - int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */ - int no_mkstream; /* if set to 1 do not create new stream */ - robj *idmp_pid; /* IDMP producer id parameter, for XADD only. */ - robj *idmp_iid; /* IDMP idempotent id parameter, for XADD only. */ - int idmp_auto; /* If set to 1, auto-generate IID from field-value pairs, for XADD only. */ - - /* XADD + XTRIM common options */ - int trim_strategy; /* TRIM_STRATEGY_* */ - int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ - int delete_strategy; /* DELETE_STRATEGY_* */ - int approx_trim; /* If 1 only delete whole radix tree nodes, so - * the trim argument is not applied verbatim. - * Note: This flag is ignored when delete_strategy is non-KEEPREF. - * Individual entries may still be processed for consumer groups. */ - long long limit; /* Maximum amount of entries to trim. If 0, no limitation - * on the amount of trimming work is enforced. */ - /* TRIM_STRATEGY_MAXLEN options */ - long long maxlen; /* After trimming, leave stream at this length . */ - /* TRIM_STRATEGY_MINID options */ - streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ -} streamAddTrimArgs; - -#define TRIM_STRATEGY_NONE 0 -#define TRIM_STRATEGY_MAXLEN 1 -#define TRIM_STRATEGY_MINID 2 - -typedef struct { - int startidx; /* Starting index of IDs in argv */ - long numids; /* Number of IDs to process */ - int delete_strategy; /* DELETE_STRATEGY_* */ -} streamAckDelArgs; - -#define DELETE_STRATEGY_NONE 0 -#define DELETE_STRATEGY_KEEPREF 1 /* Delete and keep references */ -#define DELETE_STRATEGY_DELREF 2 /* Delete from pending entries list */ -#define DELETE_STRATEGY_ACKED 3 /* Only delete messages that are acknowledged */ - -/* Trim the stream 's' according to args->trim_strategy, and return the - * number of elements removed from the stream. The 'approx' option, if non-zero, - * specifies that the trimming must be performed in a approximated way in - * order to maximize performances. This means that the stream may contain - * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen' - * in case of MAXLEN), and elements are only removed if we can remove - * a *whole* node of the radix tree. The elements are removed from the head - * of the stream (older elements). - * - * The function may return zero if: - * - * 1) The minimal entry ID of the stream is already < 'id' (MINID); or - * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or - * 3) The 'approx' option is true and the head node did not have enough elements - * to be deleted. - * - * args->limit is the maximum number of entries to delete. The purpose is to - * prevent this function from taking to long. - * If 'limit' is 0 then we do not limit the number of deleted entries. - * Much like the 'approx', if 'limit' is smaller than the number of entries - * that should be trimmed, there is a chance we will still have entries with - * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). - */ -int64_t streamTrim(stream *s, streamAddTrimArgs *args) { - size_t maxlen = args->maxlen; - streamID *id = &args->minid; - int approx = args->approx_trim; - int64_t limit = args->limit; - int trim_strategy = args->trim_strategy; - int delete_strategy = args->delete_strategy; - - if (trim_strategy == TRIM_STRATEGY_NONE) - return 0; - - raxIterator ri; - raxStart(&ri,s->rax); - raxSeek(&ri,"^",NULL,0); - - int64_t deleted = 0; - while (raxNext(&ri)) { - if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen) - break; - - unsigned char *lp = ri.data, *p = lpFirst(lp); - int64_t entries = lpGetInteger(p); - - /* Check if we exceeded the amount of work we could do */ - if (limit && (deleted + entries) > limit) - break; - - /* Check if we can remove the whole node */ - int remove_node = 0; /* Final decision flag for node removal */ - int node_eligible_for_remove = 0; /* Whether node meets the basic criteria for removal */ - streamID master_id = {0}; - /* Read the master ID from the radix tree key. */ - streamDecodeID(ri.key, &master_id); - if (trim_strategy == TRIM_STRATEGY_MAXLEN) { - node_eligible_for_remove = s->length - entries >= maxlen; - } else { - /* Read last ID. */ - streamID last_id = {0,0}; - lpGetEdgeStreamID(lp, 0, &master_id, &last_id); - - /* We can remove the entire node id its last ID < 'id' */ - node_eligible_for_remove = streamCompareID(&last_id, id) < 0; - } - - if (node_eligible_for_remove && delete_strategy == DELETE_STRATEGY_KEEPREF) { - /* With KEEPREF strategy, we can remove the whole node directly since we don't need - * to check or clean up consumer group references. */ - remove_node = 1; - } - - if (remove_node) { - s->alloc_size -= lpBytes(lp); - lpFree(lp); - raxRemove(s->rax,ri.key,ri.key_len,NULL); - raxSeek(&ri,">=",ri.key,ri.key_len); - s->length -= entries; - deleted += entries; - continue; - } - - /* If we cannot remove a whole element, and approx is true, - * stop here. However, for non-KEEPREF strategies, if the node was - * eligible for removal but we couldn't remove it (because we need - * to check consumer group references), we should continue to process - * entries within this node. */ - if (approx && delete_strategy == DELETE_STRATEGY_KEEPREF) break; - - /* Now we have to trim entries from within 'lp' */ - size_t oldsize = lpBytes(lp); - int64_t deleted_from_lp = 0; - - p = lpNext(lp, p); /* Skip deleted field. */ - p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */ - - /* Skip all the master fields. */ - int64_t master_fields_count = lpGetInteger(p); - p = lpNext(lp,p); /* Skip the first field. */ - for (int64_t j = 0; j < master_fields_count; j++) - p = lpNext(lp,p); /* Skip all master fields. */ - p = lpNext(lp,p); /* Skip the zero master entry terminator. */ - - /* 'p' is now pointing to the first entry inside the listpack. - * We have to run entry after entry, marking entries as deleted - * if they are already not deleted. */ - while (p) { - /* We keep a copy of p (which point to flags part) in order to - * update it after (and if) we actually remove the entry */ - unsigned char *pcopy = p; - - int64_t flags = lpGetInteger(p); - p = lpNext(lp, p); /* Skip flags. */ - int64_t to_skip; - - int64_t ms_delta = lpGetInteger(p); - p = lpNext(lp, p); /* Skip ID ms delta */ - int64_t seq_delta = lpGetInteger(p); - p = lpNext(lp, p); /* Skip ID seq delta */ - - streamID currid = {0}; - currid.ms = master_id.ms + ms_delta; - currid.seq = master_id.seq + seq_delta; - - int stop; - if (trim_strategy == TRIM_STRATEGY_MAXLEN) { - stop = s->length <= maxlen; - } else { - /* Following IDs will definitely be greater because the rax - * tree is sorted, no point of continuing. */ - stop = streamCompareID(&currid, id) >= 0; - } - if (stop) - break; - - if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { - to_skip = master_fields_count; - } else { - to_skip = lpGetInteger(p); /* Get num-fields. */ - p = lpNext(lp,p); /* Skip num-fields. */ - to_skip *= 2; /* Fields and values. */ - } - - while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ - p = lpNext(lp,p); /* Skip the final lp-count field. */ - - /* Mark the entry as deleted if allowed. */ - if (!(flags & STREAM_ITEM_FLAG_DELETED)) { - int can_delete = 1; - if (delete_strategy == DELETE_STRATEGY_ACKED) { - /* Only delete entry that has been acknowledged by all consumer groups. */ - can_delete = (streamEntryIsReferenced(s, &currid) == 0); - } else if (delete_strategy == DELETE_STRATEGY_DELREF) { - /* Remove all consumer group references for this entry */ - streamCleanupEntryCGroupRefs(s, &currid); - } - - if (can_delete) { - /* Mark the entry as deleted. */ - intptr_t delta = p ? (p - lp) : 0; /* p may be NULL if this was the last entry */ - flags |= STREAM_ITEM_FLAG_DELETED; - lp = lpReplaceInteger(lp, &pcopy, flags); - deleted_from_lp++; - s->length--; - if (p) p = lp + delta; - } - } - } - deleted += deleted_from_lp; - /* If this node was originally eligible for removal but we couldn't remove it upfront - * due to delete strategy constraints, and now we've processed and deleted all entries - * in the node, we can finally remove the entire node. */ - if (node_eligible_for_remove && deleted_from_lp == entries) { - s->alloc_size -= oldsize; - lpFree(lp); - raxRemove(s->rax,ri.key,ri.key_len,NULL); - raxSeek(&ri,">=",ri.key,ri.key_len); - continue; - } - - /* Now we update the entries/deleted counters. */ - p = lpFirst(lp); - lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); - p = lpNext(lp,p); /* Skip deleted field. */ - int64_t marked_deleted = lpGetInteger(p); - lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp); - p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */ - s->alloc_size -= oldsize; - s->alloc_size += lpBytes(lp); - - /* Here we should perform garbage collection in case at this point - * there are too many entries deleted inside the listpack. */ - entries -= deleted_from_lp; - marked_deleted += deleted_from_lp; - if (entries + marked_deleted > 10 && marked_deleted > entries/2) { - /* TODO: perform a garbage collection. */ - } - - /* Update the node with the new pointer. */ - raxSetData(ri.node,lp); - - /* If the node is eligible for removal but we couldn't remove it due to delete strategy - * constraints (we need to check each entry individually), continue to the next node - * instead of stopping here. */ - if (node_eligible_for_remove) - continue; - - break; /* If we are here, there was enough to delete in the current - node, so no need to go to the next node. */ - } - raxStop(&ri); - - /* Update the stream's first ID after the trimming. */ - if (s->length == 0) { - s->first_id.ms = 0; - s->first_id.seq = 0; - } else if (deleted) { - streamGetEdgeID(s,1,1,&s->first_id); - } - - return deleted; -} - -/* Trims a stream by length. Returns the number of deleted items. */ -int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { - streamAddTrimArgs args = { - .trim_strategy = TRIM_STRATEGY_MAXLEN, - .approx_trim = approx, - .limit = approx ? 100 * server.stream_node_max_entries : 0, - .maxlen = maxlen, - .delete_strategy = DELETE_STRATEGY_KEEPREF - }; - return streamTrim(s, &args); -} - -/* Trims a stream by minimum ID. Returns the number of deleted items. */ -int64_t streamTrimByID(stream *s, streamID minid, int approx) { - streamAddTrimArgs args = { - .trim_strategy = TRIM_STRATEGY_MINID, - .approx_trim = approx, - .limit = approx ? 100 * server.stream_node_max_entries : 0, - .minid = minid, - .delete_strategy = DELETE_STRATEGY_KEEPREF - }; - return streamTrim(s, &args); -} - -/* Parse the arguments of XADD/XTRIM. - * - * See streamAddTrimArgs for more details about the arguments handled. - * - * This function returns the position of the ID argument (relevant only to XADD). - * On error -1 is returned and a reply is sent. */ -static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) { - /* Initialize arguments to defaults */ - memset(args, 0, sizeof(*args)); - args->delete_strategy = DELETE_STRATEGY_NONE; - - /* Parse options. */ - int i = 2; /* This is the first argument position where we could - find an option, or the ID. */ - int limit_given = 0; - for (; i < c->argc; i++) { - int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ - char *opt = c->argv[i]->ptr; - if (xadd && opt[0] == '*' && opt[1] == '\0') { - /* This is just a fast path for the common case of auto-ID - * creation. */ - break; - } else if (!strcasecmp(opt,"maxlen") && moreargs) { - if (args->trim_strategy != TRIM_STRATEGY_NONE) { - addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); - return -1; - } - args->approx_trim = 0; - char *next = c->argv[i+1]->ptr; - /* Check for the form MAXLEN ~ <count>. */ - if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { - args->approx_trim = 1; - i++; - } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { - i++; - } - if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL) - != C_OK) return -1; - - if (args->maxlen < 0) { - addReplyError(c,"The MAXLEN argument must be >= 0."); - return -1; - } - i++; - args->trim_strategy = TRIM_STRATEGY_MAXLEN; - args->trim_strategy_arg_idx = i; - } else if (!strcasecmp(opt,"minid") && moreargs) { - if (args->trim_strategy != TRIM_STRATEGY_NONE) { - addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); - return -1; - } - args->approx_trim = 0; - char *next = c->argv[i+1]->ptr; - /* Check for the form MINID ~ <id> */ - if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { - args->approx_trim = 1; - i++; - } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { - i++; - } - - if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK) - return -1; - - i++; - args->trim_strategy = TRIM_STRATEGY_MINID; - args->trim_strategy_arg_idx = i; - } else if (!strcasecmp(opt,"limit") && moreargs) { - /* Note about LIMIT: If it was not provided by the caller we set - * it to 100*server.stream_node_max_entries, and that's to prevent the - * trimming from taking too long, on the expense of not deleting entries - * that should be trimmed. - * If user wanted exact trimming (i.e. no '~') we never limit the number - * of trimmed entries */ - if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK) - return -1; - - if (args->limit < 0) { - addReplyError(c,"The LIMIT argument must be >= 0."); - return -1; - } - limit_given = 1; - i++; - } else if (!strcasecmp(opt,"keepref") && args->delete_strategy == DELETE_STRATEGY_NONE) { - args->delete_strategy = DELETE_STRATEGY_KEEPREF; - } else if (!strcasecmp(opt,"delref") && args->delete_strategy == DELETE_STRATEGY_NONE) { - args->delete_strategy = DELETE_STRATEGY_DELREF; - } else if (!strcasecmp(opt,"acked") && args->delete_strategy == DELETE_STRATEGY_NONE) { - args->delete_strategy = DELETE_STRATEGY_ACKED; - } else if (xadd && !strcasecmp(opt,"nomkstream")) { - args->no_mkstream = 1; - } else if (xadd && !strcasecmp(opt,"idmpauto") && moreargs) { - /* IDMPAUTO pid - auto-generate IID from field-value pairs */ - if (args->idmp_pid != NULL) { - addReplyError(c,"syntax error, IDMP/IDMPAUTO specified multiple times"); - return -1; - } - - size_t pid_len = sdslen((sds)c->argv[i+1]->ptr); - if (pid_len == 0) { - addReplyError(c, "syntax error, IDMPAUTO requires a non-empty producer ID"); - return -1; - } - - args->idmp_pid = c->argv[i+1]; - args->idmp_auto = 1; - i++; - } else if (xadd && !strcasecmp(opt,"idmp") && moreargs >= 2) { - /* IDMP pid iid - explicit producer ID and idempotent ID */ - if (args->idmp_pid != NULL) { - addReplyError(c,"syntax error, IDMP/IDMPAUTO specified multiple times"); - return -1; - } - - size_t pid_len = sdslen((sds)c->argv[i+1]->ptr); - if (pid_len == 0) { - addReplyError(c, "syntax error, IDMP requires a non-empty producer ID"); - return -1; - } - - size_t iid_len = sdslen((sds)c->argv[i+2]->ptr); - if (iid_len == 0) { - addReplyError(c, "syntax error, IDMP requires a non-empty idempotent ID"); - return -1; - } - - args->idmp_pid = c->argv[i+1]; - args->idmp_iid = c->argv[i+2]; - i += 2; - } else if (xadd) { - /* If we are here is a syntax error or a valid ID. */ - if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK) - return -1; - - /* mustObeyClient is needed because IDMP can only be used with * (auto-generated IDs), - * but when we replicate the message we replace the * with the actual StreamID. */ - if (args->idmp_pid && opt[0] != '*' && !mustObeyClient(c)) { - addReplyError(c,"syntax error, IDMP/IDMPAUTO can be used only with auto-generated IDs"); - return -1; - } - args->id_given = 1; - break; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return -1; - } - } - - if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) { - addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy"); - return -1; - } - - if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) { - addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy"); - return -1; - } - - if (mustObeyClient(c)) { - /* If command came from master or from AOF we must not enforce maxnodes - * (The maxlen/minid argument was re-written to make sure there's no - * inconsistency). */ - args->limit = 0; - } else { - /* We need to set the limit (only if we got '~') */ - if (limit_given) { - if (!args->approx_trim) { - /* LIMIT was provided without ~ */ - addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option"); - return -1; - } - } else { - /* User didn't provide LIMIT, we must set it. */ - if (args->approx_trim) { - /* In order to prevent from trimming to do too much work and - * cause latency spikes we limit the amount of work it can do. - * We have to cap args->limit from both sides in case - * stream_node_max_entries is 0 or too big (could cause overflow) - */ - args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */ - if (args->limit <= 0) args->limit = 10000; - if (args->limit > 1000000) args->limit = 1000000; - } else { - /* No LIMIT for exact trimming */ - args->limit = 0; - } - } - } - - /* Set default consumer group reference handling to KEEPREF if none was specified */ - if (args->delete_strategy == DELETE_STRATEGY_NONE) - args->delete_strategy = DELETE_STRATEGY_KEEPREF; - - return i; -} - -static int streamParseAckDelArgsOrReply(client *c, int start_pos, streamAckDelArgs *args) { - /* Initialize arguments to defaults */ - memset(args, 0, sizeof(*args)); - args->startidx = -1; - args->delete_strategy = DELETE_STRATEGY_NONE; - - /* Parse command options */ - int j = start_pos; - while (j < c->argc) { - char *opt = c->argv[j]->ptr; - if (!strcasecmp(opt, "KEEPREF") && args->delete_strategy == DELETE_STRATEGY_NONE) { - args->delete_strategy = DELETE_STRATEGY_KEEPREF; - j++; - } else if (!strcasecmp(opt, "DELREF") && args->delete_strategy == DELETE_STRATEGY_NONE) { - args->delete_strategy = DELETE_STRATEGY_DELREF; - j++; - } else if (!strcasecmp(opt, "ACKED") && args->delete_strategy == DELETE_STRATEGY_NONE) { - args->delete_strategy = DELETE_STRATEGY_ACKED; - j++; - } else if (!strcasecmp(opt, "IDS") && j+1 < c->argc) { - /* Parse the number of IDs */ - if (getRangeLongFromObjectOrReply(c, c->argv[j+1], 1, LONG_MAX, - &args->numids, "Number of IDs must be a positive integer") != C_OK) - { - return 0; - } - - /* Verify that the specified number of IDs matches the actual arguments */ - if (args->numids > (c->argc - j - 2)) { - addReplyError(c, "The `numids` parameter must match the number of arguments"); - return 0; - } - - args->startidx = j + 2; /* Skip "IDS" and numids */ - j = args->startidx + args->numids; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return 0; - } - } - - if (args->startidx == -1) { - addReplyError(c, "IDS option is required"); - return 0; - } - - /* Set default consumer group reference handling to KEEPREF if none was specified */ - if (args->delete_strategy == DELETE_STRATEGY_NONE) - args->delete_strategy = DELETE_STRATEGY_KEEPREF; - - return 1; -} - -/* Initialize the stream iterator, so that we can call iterating functions - * to get the next items. This requires a corresponding streamIteratorStop() - * at the end. The 'rev' parameter controls the direction. If it's zero the - * iteration is from the start to the end element (inclusive), otherwise - * if rev is non-zero, the iteration is reversed. - * - * Once the iterator is initialized, we iterate like this: - * - * streamIterator myiterator; - * streamIteratorStart(&myiterator,...); - * int64_t numfields; - * while(streamIteratorGetID(&myiterator,&ID,&numfields)) { - * while(numfields--) { - * unsigned char *key, *value; - * size_t key_len, value_len; - * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len); - * - * ... do what you want with key and value ... - * } - * } - * streamIteratorStop(&myiterator); */ -void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { - /* Initialize the iterator and translates the iteration start/stop - * elements into a 128 big big-endian number. */ - if (start) { - streamEncodeID(si->start_key,start); - } else { - si->start_key[0] = 0; - si->start_key[1] = 0; - } - - if (end) { - streamEncodeID(si->end_key,end); - } else { - si->end_key[0] = UINT64_MAX; - si->end_key[1] = UINT64_MAX; - } - - /* Decode the big-endian keys into native 64-bit integers - * for faster comparisons during iteration. */ - si->start_ms = htonu64(si->start_key[0]); - si->start_seq = htonu64(si->start_key[1]); - si->end_ms = htonu64(si->end_key[0]); - si->end_seq = htonu64(si->end_key[1]); - - /* Seek the correct node in the radix tree. */ - raxStart(&si->ri,s->rax); - if (!rev) { - if (start && (start->ms || start->seq)) { - raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, - sizeof(si->start_key)); - if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0); - } else { - raxSeek(&si->ri,"^",NULL,0); - } - } else { - if (end && (end->ms || end->seq)) { - raxSeek(&si->ri,"<=",(unsigned char*)si->end_key, - sizeof(si->end_key)); - if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0); - } else { - raxSeek(&si->ri,"$",NULL,0); - } - } - si->stream = s; - si->lp = NULL; /* There is no current listpack right now. */ - si->lp_last_ele = NULL; - si->lp_ele = NULL; /* Current listpack cursor. */ - si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ - si->skip_tombstones = 1; /* By default tombstones aren't emitted. */ -} - -/* Return 1 and store the current item ID at 'id' if there are still - * elements within the iteration range, otherwise return 0 in order to - * signal the iteration terminated. */ -int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { - while(1) { /* Will stop when element > stop_key or end of radix tree. */ - /* Record the previous lp_ele position to detect data corruption - * that might cause the iterator to move backwards unexpectedly. */ - if (si->lp_ele && si->lp_last_ele) - serverAssert(si->rev ? si->lp_ele < si->lp_last_ele : si->lp_ele > si->lp_last_ele); - si->lp_last_ele = si->lp_ele; - - /* If the current listpack is set to NULL, this is the start of the - * iteration or the previous listpack was completely iterated. - * Go to the next node. */ - if (si->lp == NULL || si->lp_ele == NULL) { - if (!si->rev && !raxNext(&si->ri)) return 0; - else if (si->rev && !raxPrev(&si->ri)) return 0; - serverAssert(si->ri.key_len == sizeof(streamID)); - /* Get the master ID. */ - streamDecodeID(si->ri.key,&si->master_id); - /* Get the master fields count. */ - si->lp = si->ri.data; - si->lp_ele = lpFirst(si->lp); /* Seek items count */ - si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */ - si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */ - si->master_fields_count = lpGetInteger(si->lp_ele); - si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ - si->master_fields_start = si->lp_ele; - /* We are now pointing to the first field of the master entry. - * We need to seek either the first or the last entry depending - * on the direction of the iteration. */ - if (!si->rev) { - /* If we are iterating in normal order, skip the master fields - * to seek the first actual entry. */ - for (uint64_t i = 0; i < si->master_fields_count; i++) - si->lp_ele = lpNext(si->lp,si->lp_ele); - } else { - /* If we are iterating in reverse direction, just seek the - * last part of the last entry in the listpack (that is, the - * fields count). */ - si->lp_ele = lpLast(si->lp); - } - } else if (si->rev) { - /* If we are iterating in the reverse order, and this is not - * the first entry emitted for this listpack, then we already - * emitted the current entry, and have to go back to the previous - * one. */ - int64_t lp_count = lpGetInteger(si->lp_ele); - while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); - /* Seek lp-count of prev entry. */ - si->lp_ele = lpPrev(si->lp,si->lp_ele); - } - - /* For every radix tree node, iterate the corresponding listpack, - * returning elements when they are within range. */ - while(1) { - if (!si->rev) { - /* If we are going forward, skip the previous entry - * lp-count field (or in case of the master entry, the zero - * term field) */ - si->lp_ele = lpNext(si->lp,si->lp_ele); - if (si->lp_ele == NULL) break; - } else { - /* If we are going backward, read the number of elements this - * entry is composed of, and jump backward N times to seek - * its start. */ - int64_t lp_count = lpGetInteger(si->lp_ele); - if (lp_count == 0) { /* We reached the master entry. */ - si->lp = NULL; - si->lp_ele = NULL; - break; - } - while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); - } - - /* Get the flags entry. */ - si->lp_flags = si->lp_ele; - int64_t flags = lpGetInteger(si->lp_ele); - si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ - - /* Get the ID: it is encoded as difference between the master - * ID and this entry ID. */ - *id = si->master_id; - id->ms += lpGetInteger(si->lp_ele); - si->lp_ele = lpNext(si->lp,si->lp_ele); - id->seq += lpGetInteger(si->lp_ele); - si->lp_ele = lpNext(si->lp,si->lp_ele); - - /* The number of entries is here or not depending on the - * flags. */ - if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { - *numfields = si->master_fields_count; - } else { - *numfields = lpGetInteger(si->lp_ele); - si->lp_ele = lpNext(si->lp,si->lp_ele); - } - serverAssert(*numfields>=0); - - /* If current >= start, and the entry is not marked as - * deleted or tombstones are included, emit it. */ - if (!si->rev) { - if ((id->ms > si->start_ms || - (id->ms == si->start_ms && id->seq >= si->start_seq)) && - (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) - { - if (id->ms > si->end_ms || - (id->ms == si->end_ms && id->seq > si->end_seq)) - return 0; /* We are already out of range. */ - si->entry_flags = flags; - if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) - si->master_fields_ptr = si->master_fields_start; - return 1; /* Valid item returned. */ - } - } else { - if ((id->ms < si->end_ms || - (id->ms == si->end_ms && id->seq <= si->end_seq)) && - (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) - { - if (id->ms < si->start_ms || - (id->ms == si->start_ms && id->seq < si->start_seq)) - return 0; /* We are already out of range. */ - si->entry_flags = flags; - if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) - si->master_fields_ptr = si->master_fields_start; - return 1; /* Valid item returned. */ - } - } - - /* If we do not emit, we have to discard if we are going - * forward, or seek the previous entry if we are going - * backward. */ - if (!si->rev) { - int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ? - *numfields : *numfields*2; - for (int64_t i = 0; i < to_discard; i++) - si->lp_ele = lpNext(si->lp,si->lp_ele); - } else { - int64_t prev_times = 4; /* flag + id ms + id seq + one more to - go back to the previous entry "count" - field. */ - /* If the entry was not flagged SAMEFIELD we also read the - * number of fields, so go back one more. */ - if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++; - while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele); - } - } - - /* End of listpack reached. Try the next/prev radix tree node. */ - } -} - -/* Get the field and value of the current item we are iterating. This should - * be called immediately after streamIteratorGetID(), and for each field - * according to the number of fields returned by streamIteratorGetID(). - * The function populates the field and value pointers and the corresponding - * lengths by reference, that are valid until the next iterator call, assuming - * no one touches the stream meanwhile. */ -void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { - if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) { - *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf); - si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr); - } else { - *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); - si->lp_ele = lpNext(si->lp,si->lp_ele); - } - *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); - si->lp_ele = lpNext(si->lp,si->lp_ele); -} - -/* Remove the current entry from the stream: can be called after the - * GetID() API or after any GetField() call, however we need to iterate - * a valid entry while calling this function. Moreover the function - * requires the entry ID we are currently iterating, that was previously - * returned by GetID(). - * - * Note that after calling this function, next calls to GetField() can't - * be performed: the entry is now deleted. Instead the iterator will - * automatically re-seek to the next entry, so the caller should continue - * with GetID(). */ -void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { - stream *s = si->stream; - unsigned char *lp = si->lp; - size_t oldsize = lpBytes(lp); - int64_t aux; - - /* We do not really delete the entry here. Instead we mark it as - * deleted by flagging it, and also incrementing the count of the - * deleted entries in the listpack header. - * - * We start flagging: */ - int64_t flags = lpGetInteger(si->lp_flags); - flags |= STREAM_ITEM_FLAG_DELETED; - lp = lpReplaceInteger(lp,&si->lp_flags,flags); - - /* Change the valid/deleted entries count in the master entry. */ - unsigned char *p = lpFirst(lp); - aux = lpGetInteger(p); - - if (aux == 1) { - /* If this is the last element in the listpack, we can remove the whole - * node. */ - s->alloc_size -= oldsize; - lpFree(lp); - raxRemove(s->rax,si->ri.key,si->ri.key_len,NULL); - } else { - /* In the base case we alter the counters of valid/deleted entries. */ - lp = lpReplaceInteger(lp,&p,aux-1); - p = lpNext(lp,p); /* Seek deleted field. */ - aux = lpGetInteger(p); - lp = lpReplaceInteger(lp,&p,aux+1); - s->alloc_size -= oldsize; - s->alloc_size += lpBytes(lp); - - /* Update the listpack with the new pointer. */ - if (si->lp != lp) - raxInsert(s->rax,si->ri.key,si->ri.key_len,lp,NULL); - } - - /* Update the number of entries counter. */ - s->length--; - - /* Re-seek the iterator to fix the now messed up state. */ - streamID start, end; - if (si->rev) { - streamDecodeID(si->start_key,&start); - end = *current; - } else { - start = *current; - streamDecodeID(si->end_key,&end); - } - streamIteratorStop(si); - streamIteratorStart(si,s,&start,&end,si->rev); - - /* TODO: perform a garbage collection here if the ratio between - * deleted and valid goes over a certain limit. */ -} - -/* Stop the stream iterator. The only cleanup we need is to free the rax - * iterator, since the stream iterator itself is supposed to be stack - * allocated. */ -void streamIteratorStop(streamIterator *si) { - raxStop(&si->ri); -} - -/* Return 1 if `id` exists in `s` (and not marked as deleted) */ -int streamEntryExists(stream *s, streamID *id) { - streamIterator si; - streamIteratorStart(&si,s,id,id,0); - streamID myid; - int64_t numfields; - int found = streamIteratorGetID(&si,&myid,&numfields); - streamIteratorStop(&si); - if (!found) - return 0; - serverAssert(streamCompareID(id,&myid) == 0); - return 1; -} - -/* Delete the specified item ID from the stream, returning 1 if the item - * was deleted 0 otherwise (if it does not exist). */ -int streamDeleteItem(stream *s, streamID *id) { - int deleted = 0; - streamIterator si; - streamIteratorStart(&si,s,id,id,0); - streamID myid; - int64_t numfields; - if (streamIteratorGetID(&si,&myid,&numfields)) { - streamIteratorRemoveEntry(&si,&myid); - deleted = 1; - } - streamIteratorStop(&si); - return deleted; -} - -/* Get the last valid (non-tombstone) streamID of 's'. */ -void streamLastValidID(stream *s, streamID *maxid) -{ - streamIterator si; - streamIteratorStart(&si,s,NULL,NULL,1); - int64_t numfields; - if (!streamIteratorGetID(&si,maxid,&numfields) && s->length) - serverPanic("Corrupt stream, length is %llu, but no max id", (unsigned long long)s->length); - streamIteratorStop(&si); -} - -/* Maximum size for a stream ID string. In theory 20*2+1 should be enough, - * But to avoid chance for off by one issues and null-term, in case this will - * be used as parsing buffer, we use a slightly larger buffer. On the other - * hand considering sds header is gonna add 4 bytes, we wanna keep below the - * allocator's 48 bytes bin. */ -#define STREAM_ID_STR_LEN 44 - -sds createStreamIDString(streamID *id) { - /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */ - sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN); - sdssetlen(str, 0); - return sdscatfmt(str,"%U-%U", id->ms,id->seq); -} - -/* Emit a reply in the client output buffer by formatting a Stream ID - * in the standard <ms>-<seq> format, using the simple string protocol - * of REPL. */ -void addReplyStreamID(client *c, streamID *id) { - addReplyBulkSds(c,createStreamIDString(id)); -} - -void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { - setDeferredReplyBulkSds(c, dr, createStreamIDString(id)); -} - -/* Similar to the above function, but just creates an object, usually useful - * for replication purposes to create arguments. */ -robj *createObjectFromStreamID(streamID *id) { - return createObject(OBJ_STRING, createStreamIDString(id)); -} - -/* Returns non-zero if the ID is 0-0. */ -int streamIDEqZero(streamID *id) { - return !(id->ms || id->seq); -} - -/* A helper that returns non-zero if the range from 'start' to `end` - * contains a tombstone. - * - * NOTE: this assumes that the caller had verified that 'start' is less than - * 's->last_id'. */ -int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) { - streamID start_id, end_id; - - if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) { - /* The stream is empty or has no tombstones. */ - return 0; - } - - if (start) { - start_id = *start; - } else { - start_id.ms = 0; - start_id.seq = 0; - } - - if (end) { - end_id = *end; - } else { - end_id.ms = UINT64_MAX; - end_id.seq = UINT64_MAX; - } - - if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 && - streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0) - { - /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */ - return 1; - } - - /* The range doesn't includes a tombstone. */ - return 0; -} - -/* Replies with a consumer group's current lag, that is the number of messages - * in the stream that are yet to be delivered. In case that the lag isn't - * available due to fragmentation, the reply to the client is a null. */ -void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) { - int valid = 0; - long long lag = 0; - - if (!s->entries_added) { - /* The lag of a newly-initialized stream is 0. */ - lag = 0; - valid = 1; - } else if (!s->length) { /* All entries deleted, now empty. */ - lag = 0; - valid = 1; - } else if (streamCompareID(&cg->last_id,&s->first_id) < 0 && - streamCompareID(&s->max_deleted_entry_id,&s->first_id) < 0) - { - /* When both the consumer group's last_id and the maximum tombstone are behind - * the stream's first entry, the consumer group's lag will always be equal to - * the number of remainin entries in the stream. */ - lag = s->length; - valid = 1; - } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) { - /* No fragmentation ahead means that the group's logical reads counter - * is valid for performing the lag calculation. */ - lag = (long long)s->entries_added - cg->entries_read; - valid = 1; - } else { - /* Attempt to retrieve the group's last ID logical read counter. */ - long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id); - if (entries_read != SCG_INVALID_ENTRIES_READ) { - /* A valid counter was obtained. */ - lag = (long long)s->entries_added - entries_read; - valid = 1; - } - } - - if (valid) { - addReplyLongLong(c,lag); - } else { - addReplyNull(c); - } -} - -/* This function returns a value that is the ID's logical read counter, or its - * distance (the number of entries) from the first entry ever to have been added - * to the stream. - * - * A counter is returned only in one of the following cases: - * 1. The ID is the same as the stream's last ID. In this case, the returned - * is the same as the stream's entries_added counter. - * 2. The ID equals that of the currently first entry in the stream, and the - * stream has no tombstones. The returned value, in this case, is the result - * of subtracting the stream's length from its added_entries, incremented by - * one. - * 3. The ID less than the stream's first current entry's ID, and there are no - * tombstones. Here the estimated counter is the result of subtracting the - * stream's length from its added_entries. - * 4. The stream's added_entries is zero, meaning that no entries were ever - * added. - * - * The special return value of ULLONG_MAX signals that the counter's value isn't - * obtainable. It is returned in these cases: - * 1. The provided ID, if it even exists, is somewhere between the stream's - * current first and last entries' IDs, or in the future. - * 2. The stream contains one or more tombstones. */ -long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) { - /* The counter of any ID in an empty, never-before-used stream is 0. */ - if (!s->entries_added) { - return 0; - } - - /* In the empty stream, if the ID is smaller or equal to the last ID, - * it can set to the current added_entries value. */ - if (!s->length && streamCompareID(id,&s->last_id) < 1) { - return s->entries_added; - } - - /* There are fragmentations between the `id` and the stream's last-generated-id. */ - if (!streamIDEqZero(id) && streamCompareID(id,&s->max_deleted_entry_id) < 0) - return SCG_INVALID_ENTRIES_READ; - - int cmp_last = streamCompareID(id,&s->last_id); - if (cmp_last == 0) { - /* Return the exact counter of the last entry in the stream. */ - return s->entries_added; - } else if (cmp_last > 0) { - /* The counter of a future ID is unknown. */ - return SCG_INVALID_ENTRIES_READ; - } - - int cmp_id_first = streamCompareID(id,&s->first_id); - int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id); - if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) { - /* There's definitely no fragmentation ahead. */ - if (cmp_id_first < 0) { - /* Return the estimated counter. */ - return s->entries_added - s->length; - } else if (cmp_id_first == 0) { - /* Return the exact counter of the first entry in the stream. */ - return s->entries_added - s->length + 1; - } - } - - /* The ID is either before an XDEL that fragments the stream or an arbitrary - * ID. Either case, so we can't make a prediction. */ - return SCG_INVALID_ENTRIES_READ; -} - -/* Copy-free version of streamPropagateXCLAIM that expects pre-created robj* arguments. - * This is useful when propagating multiple XCLAIMs in a loop to avoid repeated - * object creation/destruction overhead. */ -static inline void streamPropagateXCLAIMCopyFree(int dbid, robj *key, robj *group_last_id, robj *groupname, robj *id, robj *consumername, robj *delivery_time, robj *delivery_count) { - /* We need to generate an XCLAIM that will work in a idempotent fashion: - * - * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> - * RETRYCOUNT <count> FORCE JUSTID LASTID <id>. - * - * Note that JUSTID is useful in order to avoid that XCLAIM will do - * useless work in the slave side, trying to fetch the stream item. */ - robj *argv[14]; - argv[0] = shared.xclaim; - argv[1] = key; - argv[2] = groupname; - argv[3] = consumername; - argv[4] = shared.integers[0]; - argv[5] = id; - argv[6] = shared.time; - argv[7] = delivery_time; - argv[8] = shared.retrycount; - argv[9] = delivery_count; - argv[10] = shared.force; - argv[11] = shared.justid; - argv[12] = shared.lastid; - argv[13] = group_last_id; - - alsoPropagate(dbid,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); -} - -/* As a result of an explicit XCLAIM or XREADGROUP command, new entries - * are created in the pending list of the stream and consumers. We need - * to propagate this changes in the form of XCLAIM commands. */ -static inline void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { - robj *consumername = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); - robj *delivery_time = createStringObjectFromLongLong(nack->delivery_time); - robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count); - robj *group_last_id = createObjectFromStreamID(&group->last_id); - - streamPropagateXCLAIMCopyFree(c->db->id, key, group_last_id, groupname, id, consumername, delivery_time, delivery_count); - - decrRefCount(consumername); - decrRefCount(delivery_time); - decrRefCount(delivery_count); - decrRefCount(group_last_id); -} - -/* We need this when we want to propagate the new last-id of a consumer group - * that was consumed by XREADGROUP with the NOACK option: in that case we can't - * propagate the last ID just using the XCLAIM LASTID option, so we emit - * - * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read> - */ -void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { - robj *argv[7]; - argv[0] = shared.xgroup; - argv[1] = shared.setid; - argv[2] = key; - argv[3] = groupname; - argv[4] = createObjectFromStreamID(&group->last_id); - argv[5] = shared.entriesread; - argv[6] = createStringObjectFromLongLong(group->entries_read); - - alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL); - - decrRefCount(argv[4]); - decrRefCount(argv[6]); -} - -/* We need this when we want to propagate creation of consumer that was created - * by XREADGROUP with the NOACK option. In that case, the only way to create - * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) - * - * XGROUP CREATECONSUMER <key> <groupname> <consumername> - */ -void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { - robj *argv[5]; - argv[0] = shared.xgroup; - argv[1] = shared.createconsumer; - argv[2] = key; - argv[3] = groupname; - argv[4] = createObject(OBJ_STRING,sdsdup(consumername)); - - alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); - - decrRefCount(argv[4]); -} - -/* Send the stream items in the specified range to the client 'c'. The range - * the client will receive is between start and end inclusive, if 'count' is - * non zero, no more than 'count' elements are sent. - * - * The 'end' pointer can be NULL to mean that we want all the elements from - * 'start' till the end of the stream. If 'rev' is non zero, elements are - * produced in reversed order from end to start. - * - * The function returns the number of entries emitted. - * - * If 'min_idle_time' is not -1 and a group is specified, the function first - * processes pending entries (from the group's PEL) that have been idle for at - * least 'min_idle_time' milliseconds, claiming them for the specified consumer. - * Each claimed entry is returned as a four-element array: ID, field-value pairs, - * idle time, and delivery count. The NACK is transferred from the previous - * consumer to the new consumer with updated delivery metadata. - * - * If group and consumer are not NULL, the function performs additional work: - * 1. It updates the last delivered ID in the group in case we are - * sending IDs greater than the current last ID. - * 2. If the requested IDs are already assigned to some other consumer, the - * function will not return it to the client. - * 3. An entry in the pending list will be created for every entry delivered - * for the first time to this consumer. - * 4. The group's read counter is incremented if it is already valid and there - * are no future tombstones, or is invalidated (set to 0) otherwise. If the - * counter is invalid to begin with, we try to obtain it for the last - * delivered ID. - * - * The behavior may be modified passing non-zero flags: - * - * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above - * is not performed. - * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, - * and return the number of entries emitted as usually. - * This is used when the function is just used in order - * to emit data and there is some higher level logic. - * STREAM_RWR_HISTORY: Return entries from the consumer's own PEL history only. - * STREAM_RWR_CLAIMED: Return only claimable entries from the PEL. New entries - * from the stream are not returned. - * - * The final argument 'spi' (stream propagation info pointer) is a structure - * filled with information needed to propagate the command execution to AOF - * and slaves, in the case a consumer group was passed: we need to generate - * XCLAIM commands to create the pending list into AOF/slaves in that case. - * - * If 'spi' is set to NULL no propagation will happen even if the group was - * given, but currently such a feature is never used by the code base that - * will always pass 'spi' and propagate when a group is passed. - * - * Note that this function is recursive in certain cases. When it's called - * with a non NULL group and consumer argument, it may call - * streamReplyWithRangeFromConsumerPEL() in order to get entries from the - * consumer pending entries list. However such a function will then call - * streamReplyWithRange() in order to emit single entries (found in the - * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES - * flag. */ -#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ -#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array - boundaries, just the entries. */ -#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ -#define STREAM_RWR_CLAIMED (1<<3) /* Only serve claimed entries from PEL. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, long long min_idle_time, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount) { - void *arraylen_ptr = NULL; - size_t arraylen = 0; - streamIterator si; - int64_t numfields; - streamID id; - int propagate_last_id = 0; - int noack = flags & STREAM_RWR_NOACK; - const int db_id = c->db->id; - const mstime_t cmd_time_snapshot = commandTimeSnapshot(); - /* to be used in case of stream propagation */ - robj *consumername = NULL; - robj *delivery_time = NULL; - robj *group_last_id = NULL; - if (spi && consumer) { - consumername = createStringObject(consumer->name,sdslen(consumer->name)); - delivery_time = createStringObjectFromLongLong(cmd_time_snapshot); - group_last_id = createObjectFromStreamID(&group->last_id); - } - if (propCount) *propCount = 0; - - if (group && min_idle_time != -1) { - arraylen_ptr = addReplyDeferredLen(c); - /* Scan the group's pending entries list (PEL) to find messages that have been - * idle for at least min_idle_time milliseconds. The pel_by_time radix tree - * stores entries ordered by their last delivery timestamp, allowing us to - * efficiently iterate from oldest to newest. - * - * We collect eligible entries into a temporary list rather than processing - * them inline because: - * 1. We cannot safely modify a radix tree while iterating over it - * 2. The claiming process requires removing and re-inserting entries in - * both pel_by_time and the consumer PELs - * - * The iteration can terminate early in two cases: - * 1. We find an entry that hasn't been idle long enough - due to time-based - * ordering, all subsequent entries will be even newer - * 2. We've collected enough entries to satisfy the requested count limit */ - list *eligible_pels = listCreate(); - listSetFreeMethod(eligible_pels, zfree); - raxIterator ri; - raxStart(&ri, group->pel_by_time); - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - pelTimeKey pelKey; - decodePelTimeKey(ri.key, &pelKey); - uint64_t idle = cmd_time_snapshot - pelKey.delivery_time; - if (idle < (uint64_t)min_idle_time) - break; - - /* Store a copy of the key for later processing */ - pelTimeKey *keyCopy = zmalloc(sizeof(pelTimeKey)); - memcpy(keyCopy, &pelKey, sizeof(pelTimeKey)); - listAddNodeTail(eligible_pels, keyCopy); - - if (count && listLength(eligible_pels) >= count) break; - } - raxStop(&ri); - - /* Process each eligible pending entry, claiming it for the current consumer. - * For each entry we: - * 1. Fetch the actual message data from the stream - * 2. Send the message to the client with metadata (idle time, delivery count) - * 3. Transfer ownership from the previous consumer to the current consumer - * 4. Update all relevant data structures and propagate the claim operation */ - listIter li; - listNode *ln; - listRewind(eligible_pels, &li); - while ((ln = listNext(&li))) { - pelTimeKey *pelKey = (pelTimeKey*)listNodeValue(ln); - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, &pelKey->id); - - void *result; - streamNACK *nack = NULL; - uint64_t delivery_count = 0; - /* Must exist, we got the ID from pel_by_time */ - serverAssert(raxFind(group->pel,buf,sizeof(buf),&result)); - - nack = (streamNACK*)result; - delivery_count = nack->delivery_count; - - streamID pel_id; - streamIteratorStart(&si,s,&pelKey->id,&pelKey->id,rev); - if (streamIteratorGetID(&si,&pel_id,&numfields)) { - /* Emit a four elements array: ID, array of field-value pairs, - * idle time and delivery count. */ - robj *idarg = createObjectFromStreamID(&pel_id); - addReplyArrayLen(c,4); - addReplyBulk(c,idarg); - addReplyArrayLen(c,numfields*2); - - /* Emit the field-value pairs. */ - while (numfields--) { - unsigned char *key, *value; - int64_t key_len, value_len; - streamIteratorGetField(&si,&key,&value,&key_len,&value_len); - addReplyBulkCBuffer(c,key,key_len); - addReplyBulkCBuffer(c,value,value_len); - } - - uint64_t idle = cmd_time_snapshot - pelKey->delivery_time; - addReplyLongLong(c, idle); - addReplyLongLong(c, delivery_count); - - /* Remove the NACK from old consumer and time-based PEL. */ - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &pel_id); - - /* Transfer NACK to new consumer with updated metadata. */ - nack->consumer = consumer; - nack->delivery_time = cmd_time_snapshot; - nack->delivery_count++; - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &pel_id); - - consumer->active_time = cmd_time_snapshot; - - /* Propagate as XCLAIM. */ - if (spi) { - robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count); - streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count); - decrRefCount(delivery_count); - if (propCount) (*propCount)++; - } - decrRefCount(idarg); - arraylen++; - } - streamIteratorStop(&si); - } - listRelease(eligible_pels); - } - /* If the client is asking for some history, we serve it using a - * different function, so that we return entries *solely* from its - * own PEL. This ensures each consumer will always and only see - * the history of messages delivered to it and not yet confirmed - * as delivered. */ - if (group && (flags & STREAM_RWR_HISTORY)) { - if (spi && consumer) { - decrRefCount(delivery_time); - decrRefCount(consumername); - decrRefCount(group_last_id); - } - return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, - group, consumer); - } - - /* Stop here if client only wants claimed entries or count is satisfied. */ - if ((group && (flags & STREAM_RWR_CLAIMED)) || (count && count == arraylen)) { - if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); - if (spi && consumer) { - decrRefCount(delivery_time); - decrRefCount(consumername); - decrRefCount(group_last_id); - } - return arraylen; - } - - if (!(flags & STREAM_RWR_RAWENTRIES) && !arraylen_ptr) - arraylen_ptr = addReplyDeferredLen(c); - streamIteratorStart(&si,s,start,end,rev); - while (streamIteratorGetID(&si,&id,&numfields)) { - /* Update the group last_id if needed. */ - if (group && streamCompareID(&id,&group->last_id) > 0) { - if (group->entries_read != SCG_INVALID_ENTRIES_READ && - streamCompareID(&group->last_id, &s->first_id) >= 0 && - !streamRangeHasTombstones(s,&group->last_id,NULL)) - { - /* A valid counter and no tombstones between the group's last-delivered-id - * and the stream's last-generated-id mean we can increment the read counter - * to keep tracking the group's progress. */ - group->entries_read++; - } else if (s->entries_added) { - /* The group's counter may be invalid, so we try to obtain it. */ - group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id); - } - streamUpdateCGroupLastId(s, group, &id); - /* In the past, we would only set it when NOACK was specified. And in - * #9127, XCLAIM did not propagate entries_read in ACK, which would - * cause entries_read to be inconsistent between master and replicas, - * so here we call streamPropagateGroupID unconditionally. */ - propagate_last_id = 1; - } - - if (min_idle_time != -1) { - /* If min-idle-time is specified, we emit a four elements - * array: ID, array of field-value pairs, idle time and delivery count. */ - addReplyArrayLen(c,4); - } else { - /* Emit a two elements array for each item. The first is - * the ID, the second is an array of field-value pairs. */ - addReplyArrayLen(c,2); - } - robj *idarg = createObjectFromStreamID(&id); - addReplyBulk(c,idarg); - addReplyArrayLen(c,numfields*2); - - /* Emit the field-value pairs. */ - while (numfields--) { - unsigned char *key, *value; - int64_t key_len, value_len; - streamIteratorGetField(&si,&key,&value,&key_len,&value_len); - addReplyBulkCBuffer(c,key,key_len); - addReplyBulkCBuffer(c,value,value_len); - } - - if (min_idle_time != -1) { - /* For new entries idle time and delivery count is 0. */ - addReplyLongLong(c, 0); - addReplyLongLong(c, 0); - } - - /* If a group is passed, we need to create an entry in the - * PEL (pending entries list) of this group *and* this consumer. - * - * Note that we cannot be sure about the fact the message is not - * already owned by another consumer, because the admin is able - * to change the consumer group last delivered ID using the - * XGROUP SETID command. So if we find that there is already - * a NACK for the entry, we need to associate it to the new - * consumer. */ - if (group && !noack) { - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,&id); - - /* Try to add a new NACK. Most of the time this will work and - * will not require extra lookups. We'll fix the problem later - * if we find that there is already an entry for this ID. */ - streamNACK *nack = streamCreateNACK(s, consumer); - int group_inserted = - raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); - int consumer_inserted = - raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - - /* Now we can check if the entry was already busy, and - * in that case reassign the entry to the new consumer, - * or update it if the consumer is the same as before. */ - if (group_inserted == 0) { - streamFreeNACK(s,nack); - void *result; - int found = raxFind(group->pel,buf,sizeof(buf),&result); - serverAssert(found); - nack = result; - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - /* Remove old entry from the PEL by time. */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - /* Update the consumer and NACK metadata. */ - nack->consumer = consumer; - nack->delivery_time = cmd_time_snapshot; - nack->delivery_count = 1; - /* Add the entry in the new consumer local PEL. */ - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - } else if (group_inserted == 1 && consumer_inserted == 1) { - nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); - } else if (group_inserted == 1 && consumer_inserted == 0) { - serverPanic("NACK half-created. Should not be possible."); - } - - /* We have new NACK or updated existing one. */ - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); - - consumer->active_time = cmd_time_snapshot; - - /* Propagate as XCLAIM. */ - if (spi) { - robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count); - streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count); - decrRefCount(delivery_count); - if (propCount) (*propCount)++; - } - } - decrRefCount(idarg); - arraylen++; - if (count && count == arraylen) break; - } - - if (spi && consumer) { - decrRefCount(delivery_time); - decrRefCount(consumername); - decrRefCount(group_last_id); - } - - if (spi && propagate_last_id) { - streamPropagateGroupID(c,spi->keyname,group,spi->groupname); - if (propCount) (*propCount)++; - } - - streamIteratorStop(&si); - if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); - return arraylen; -} - -/* This is a helper function for streamReplyWithRange() when called with - * group and consumer arguments, but with a range that is referring to already - * delivered messages. In this case we just emit messages that are already - * in the history of the consumer, fetching the IDs from its PEL. - * - * Note that this function does not have a 'rev' argument because it's not - * possible to iterate in reverse using a group. Basically this function - * is only called as a result of the XREADGROUP command. - * - * This function is more expensive because it needs to inspect the PEL and then - * seek into the radix tree of the messages in order to emit the full message - * to the client. However clients only reach this code path when they are - * fetching the history of already retrieved messages, which is rare. */ -size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) { - raxIterator ri; - unsigned char startkey[sizeof(streamID)]; - unsigned char endkey[sizeof(streamID)]; - streamEncodeID(startkey,start); - if (end) streamEncodeID(endkey,end); - - size_t arraylen = 0; - void *arraylen_ptr = addReplyDeferredLen(c); - raxStart(&ri,consumer->pel); - raxSeek(&ri,">=",startkey,sizeof(startkey)); - while(raxNext(&ri) && (!count || arraylen < count)) { - if (end && memcmp(ri.key,endkey,ri.key_len) > 0) break; - streamID thisid; - streamDecodeID(ri.key,&thisid); - if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,-1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL,NULL) == 0) - { - /* Note that we may have a not acknowledged entry in the PEL - * about a message that's no longer here because was removed - * by the user by other means. In that case we signal it emitting - * the ID but then a NULL entry for the fields. */ - addReplyArrayLen(c,2); - addReplyStreamID(c,&thisid); - addReplyNullArray(c); - } else { - streamNACK *nack = ri.data; - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &thisid); - - nack->delivery_time = commandTimeSnapshot(); - nack->delivery_count++; - - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &thisid); - } - arraylen++; - } - raxStop(&ri); - setDeferredArrayLen(c,arraylen_ptr,arraylen); - return arraylen; -} - -/* ----------------------------------------------------------------------- - * Stream commands implementation - * ----------------------------------------------------------------------- */ - -/* Look the stream at 'key' and return the corresponding stream object. - * The function creates a key setting it to an empty stream if needed. */ -kvobj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { - dictEntryLink link; - kvobj *kv = lookupKeyWriteWithLink(c->db,key, &link); - if (checkType(c, kv, OBJ_STREAM)) return NULL; - if (kv != NULL) return kv; - - if (no_create) { - addReplyNull(c); - return NULL; - } - robj *o = createStreamObject(); - dbAddByLink(c->db, key, &o, &link); - return o; -} - -/* Parse a stream ID in the format given by clients to Redis, that is - * <ms>-<seq>, and converts it into a streamID structure. If - * the specified ID is invalid C_ERR is returned and an error is reported - * to the client, otherwise C_OK is returned. The ID may be in incomplete - * form, just stating the milliseconds time part of the stream. In such a case - * the missing part is set according to the value of 'missing_seq' parameter. - * - * The IDs "-" and "+" specify respectively the minimum and maximum IDs - * that can be represented. If 'strict' is set to 1, "-" and "+" will be - * treated as an invalid ID. - * - * The ID form <ms>-* specifies a milliseconds-only ID, leaving the sequence part - * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this - * form is accepted and the argument is set to 0 unless the sequence part is - * specified. - * - * If 'c' is set to NULL, no reply is sent to the client. */ -int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) { - char buf[128]; - if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; - memcpy(buf,o->ptr,sdslen(o->ptr)+1); - - if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0') - goto invalid; - - if (seq_given != NULL) { - *seq_given = 1; - } - - /* Handle the "-" and "+" special cases. */ - if (buf[0] == '-' && buf[1] == '\0') { - id->ms = 0; - id->seq = 0; - return C_OK; - } else if (buf[0] == '+' && buf[1] == '\0') { - id->ms = UINT64_MAX; - id->seq = UINT64_MAX; - return C_OK; - } - - /* Parse <ms>-<seq> form. */ - unsigned long long ms, seq; - char *dot = strchr(buf,'-'); - if (dot) *dot = '\0'; - if (string2ull(buf,&ms) == 0) goto invalid; - if (dot) { - size_t seqlen = strlen(dot+1); - if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') { - /* Handle the <ms>-* form. */ - seq = 0; - *seq_given = 0; - } else if (string2ull(dot+1,&seq) == 0) { - goto invalid; - } - } else { - seq = missing_seq; - } - id->ms = ms; - id->seq = seq; - return C_OK; - -invalid: - if (c) addReplyError(c,"Invalid stream ID specified as stream " - "command argument"); - return C_ERR; -} - -/* Wrapper for streamGenericParseIDOrReply() used by module API. */ -int streamParseID(const robj *o, streamID *id) { - return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL); -} - -/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to - * 0, to be used when - and + are acceptable IDs. */ -int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { - return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL); -} - -/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to - * 1, to be used when we want to return an error if the special IDs + or - - * are provided. */ -int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) { - return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given); -} - -/* Helper for parsing a stream ID that is a range query interval. When the - * exclude argument is NULL, streamParseIDOrReply() is called and the interval - * is treated as close (inclusive). Otherwise, the exclude argument is set if - * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is - * called in that case. - */ -int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) { - char *p = o->ptr; - size_t len = sdslen(p); - int invalid = 0; - - if (exclude != NULL) *exclude = (len > 1 && p[0] == '('); - if (exclude != NULL && *exclude) { - robj *t = createStringObject(p+1,len-1); - invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR); - decrRefCount(t); - } else - invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR); - if (invalid) - return C_ERR; - return C_OK; -} - -void streamRewriteApproxSpecifier(client *c, int idx) { - rewriteClientCommandArgument(c,idx,shared.special_equals); -} - -/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream> - * otherwise trimming is no longer deterministic on replicas / AOF. */ -void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) { - robj *arg; - if (trim_strategy == TRIM_STRATEGY_MAXLEN) { - arg = createStringObjectFromLongLong(s->length); - } else { - streamID first_id; - streamGetEdgeID(s,1,0,&first_id); - arg = createObjectFromStreamID(&first_id); - } - - rewriteClientCommandArgument(c,idx,arg); - decrRefCount(arg); -} - -/* XADD key [NOMKSTREAM] [KEEPREF | DELREF | ACKED] [IDMPAUTO pid | IDMP pid iid] [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] <ID or *> [field value] [field value] ... */ -void xaddCommand(client *c) { - /* Parse options. */ - streamAddTrimArgs parsed_args; - int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1); - if (idpos < 0) - return; /* streamParseAddOrTrimArgsOrReply already replied. */ - int field_pos = idpos+1; /* The ID is always one argument before the first field */ - - /* Check arity. */ - if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) { - addReplyErrorArity(c); - return; - } - - /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating - * a new stream and have streamAppendItem fail, leaving an empty key in the - * database. */ - if (parsed_args.id_given && parsed_args.seq_given && - parsed_args.id.ms == 0 && parsed_args.id.seq == 0) - { - addReplyError(c,"The ID specified in XADD must be greater than 0-0"); - return; - } - - /* Lookup the stream at key. */ - kvobj *kv; - stream *s; - if ((kv = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return; - s = kv->ptr; - size_t old_alloc = s->alloc_size; - - /* IDMP: Check if IID already exists, save IID for later insertion */ - XXH128_hash_t hash; - char *iid_str = NULL; - size_t iid_len = 0; - idmpProducer *producer = NULL; - idmpEntry *entry = NULL; - - if (parsed_args.idmp_pid != NULL) { - /* Get or create the producer for this pid */ - char *pid_str = parsed_args.idmp_pid->ptr; - size_t pid_len = sdslen((sds)pid_str); - producer = idmpGetOrCreateProducer(s, pid_str, pid_len); - - /* Get IID string based on option */ - if (parsed_args.idmp_auto) { - /* Auto-generate IID by hashing field-value pairs */ - int64_t numfields = (c->argc - field_pos) / 2; - if (createIdempotencyHash(&c->argv[field_pos], numfields, &hash) == C_ERR) { - addReplyError(c, "Failed to create idempotency hash"); - return; - } - iid_str = (char *)&hash; - iid_len = sizeof(hash); - } else { - /* Use user-provided IID directly */ - iid_str = parsed_args.idmp_iid->ptr; - iid_len = sdslen((sds)iid_str); - } - - /* Create entry for lookup and potential insertion */ - entry = idmpEntryCreate(iid_str, iid_len, &s->alloc_size); - - /* Check if IID already exists and reply if found */ - if (idmpLookupAndReply(s, producer, entry, c)) { - /* IID already exists, free the entry and return */ - idmpEntryFree(entry, &s->alloc_size); - return; - } - } - - /* Return ASAP if the stream has reached the last possible ID */ - if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { - addReplyError(c,"The stream has exhausted the last possible ID, " - "unable to add more items"); - idmpEntryFree(entry, &s->alloc_size); - return; - } - - /* Append using the low level function and return the ID. */ - errno = 0; - streamID id; - if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, - &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR) - { - serverAssert(errno != 0); - if (errno == EDOM) - addReplyError(c,"The ID specified in XADD is equal or smaller than " - "the target stream top item"); - else - addReplyError(c,"Elements are too large to be stored"); - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - idmpEntryFree(entry, &s->alloc_size); - return; - } - sds replyid = createStreamIDString(&id); - addReplyBulkCBuffer(c, replyid, sdslen(replyid)); - - /* IDMP: Insert the entry now that we have the actual ID */ - if (parsed_args.idmp_pid != NULL) { - idmpInsertEntry(s, producer, entry, &id); - trackStreamIdmpEntries(c, c->argv[1]); - } - - notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); - server.dirty++; - - /* Trim if needed. */ - if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) { - if (streamTrim(s, &parsed_args)) - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); - if (parsed_args.approx_trim) { - /* In case our trimming was limited (by LIMIT or by ~) we must - * re-write the relevant trim argument to make sure there will be - * no inconsistencies in AOF loading or in the replica. - * It's enough to check only args->approx because there is no - * way LIMIT is given without the ~ option. */ - streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); - streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); - } - } - - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - - keyModified(c,c->db,c->argv[1],kv,1); - - /* Let's rewrite the ID argument with the one actually generated for - * AOF/replication propagation. */ - if (!parsed_args.id_given || !parsed_args.seq_given) { - robj *idarg = createObject(OBJ_STRING, replyid); - rewriteClientCommandArgument(c, idpos, idarg); - decrRefCount(idarg); - } else { - sdsfree(replyid); - } - - /* We need to signal to blocked clients that there is new data on this - * stream. */ - signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM); -} - -/* XRANGE/XREVRANGE actual implementation. - * The 'start' and 'end' IDs are parsed as follows: - * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX. - * "-" and "+"" mean the minimal and maximal ID values, respectively. - * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0 - * will match anything from 1-1 and 1-UINT64_MAX. - */ -void xrangeGenericCommand(client *c, int rev) { - kvobj *kv; - stream *s; - streamID startid, endid; - long long count = -1; - robj *startarg = rev ? c->argv[3] : c->argv[2]; - robj *endarg = rev ? c->argv[2] : c->argv[3]; - int startex = 0, endex = 0; - size_t old_alloc; - - /* Parse start and end IDs. */ - if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK) - return; - if (startex && streamIncrID(&startid) != C_OK) { - addReplyError(c,"invalid start ID for the interval"); - return; - } - if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK) - return; - if (endex && streamDecrID(&endid) != C_OK) { - addReplyError(c,"invalid end ID for the interval"); - return; - } - - /* Parse the COUNT option if any. */ - if (c->argc > 4) { - for (int j = 4; j < c->argc; j++) { - int additional = c->argc-j-1; - if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) { - if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) - != C_OK) return; - if (count < 0) count = 0; - j++; /* Consume additional arg. */ - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - } - - /* Return the specified range to the user. */ - if ((kv = lookupKeyReadOrReply(c, c->argv[1], shared.emptyarray)) == NULL || - checkType(c, kv, OBJ_STREAM)) return; - - s = kv->ptr; - - if (count == 0) { - addReplyNullArray(c); - } else { - if (count == -1) count = 0; - old_alloc = s->alloc_size; - streamReplyWithRange(c,s,&startid,&endid,count,rev,-1,NULL,NULL,0,NULL,NULL); - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - } -} - -/* XRANGE key start end [COUNT <n>] */ -void xrangeCommand(client *c) { - xrangeGenericCommand(c,0); -} - -/* XREVRANGE key end start [COUNT <n>] */ -void xrevrangeCommand(client *c) { - xrangeGenericCommand(c,1); -} - -/* XLEN key*/ -void xlenCommand(client *c) { - kvobj *kv; - if ((kv = lookupKeyReadOrReply(c, c->argv[1], shared.czero)) == NULL - || checkType(c, kv, OBJ_STREAM)) return; - stream *s = kv->ptr; - addReplyLongLong(c,s->length); -} - -/* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N - * ID_1 ID_2 ... ID_N - * - * This function also implements the XREADGROUP command, which is like XREAD - * but accepting the [GROUP group-name consumer-name] additional option. - * This is useful because while XREAD is a read command and can be called - * on slaves, XREADGROUP is not. */ -#define XREAD_BLOCKED_DEFAULT_COUNT 1000 -void xreadCommand(client *c) { - long long min_idle_time = -1; /* -1 means, no IDLE argument given. */ - long long timeout = -1; /* -1 means, no BLOCK argument given. */ - long long count = 0; - int streams_count = 0; - int streams_arg = 0; - int noack = 0; /* True if NOACK option was specified. */ - streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; - streamID *ids = static_ids; - streamCG **groups = NULL; - int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ - robj *groupname = NULL; - robj *consumername = NULL; - size_t old_alloc; - - /* Parse arguments. */ - for (int i = 1; i < c->argc; i++) { - int moreargs = c->argc-i-1; - char *o = c->argv[i]->ptr; - if (!strcasecmp(o,"CLAIM") && moreargs) { - if (!xreadgroup) { - addReplyError(c,"The CLAIM option is only supported by " - "XREADGROUP. You called XREAD instead."); - return; - } - i++; - min_idle_time = -1; - if (getLongLongFromObjectOrReply(c, c->argv[i], &min_idle_time, - "min-idle-time is not an integer or out of range") != C_OK) - return; - if (min_idle_time < 0) { - addReplyError(c,"min-idle-time must be a positive integer"); - return; - } - } else if (!strcasecmp(o,"BLOCK") && moreargs) { - i++; - if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, - UNIT_MILLISECONDS) != C_OK) return; - } else if (!strcasecmp(o,"COUNT") && moreargs) { - i++; - if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK) - return; - if (count < 0) count = 0; - } else if (!strcasecmp(o,"STREAMS") && moreargs) { - streams_arg = i+1; - streams_count = (c->argc-streams_arg); - if ((streams_count % 2) != 0) { - const char *symbol = xreadgroup ? "ID or '>'" : "ID, '+', or '$'"; - addReplyErrorFormat(c,"Unbalanced '%s' list of streams: " - "for each stream key an %s must be " - "specified.", c->cmd->fullname,symbol); - return; - } - streams_count /= 2; /* We have two arguments for each stream. */ - break; - } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) { - if (!xreadgroup) { - addReplyError(c,"The GROUP option is only supported by " - "XREADGROUP. You called XREAD instead."); - return; - } - groupname = c->argv[i+1]; - consumername = c->argv[i+2]; - i += 2; - } else if (!strcasecmp(o,"NOACK")) { - if (!xreadgroup) { - addReplyError(c,"The NOACK option is only supported by " - "XREADGROUP. You called XREAD instead."); - return; - } - noack = 1; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - /* STREAMS option is mandatory. */ - if (streams_arg == 0) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - - /* If the user specified XREADGROUP then it must also - * provide the GROUP option. */ - if (xreadgroup && groupname == NULL) { - addReplyError(c,"Missing GROUP option for XREADGROUP"); - return; - } - - /* Parse the IDs and resolve the group name. */ - if (streams_count > STREAMID_STATIC_VECTOR_LEN) - ids = zmalloc(sizeof(streamID)*streams_count); - if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); - - for (int i = streams_arg + streams_count; i < c->argc; i++) { - /* Specifying "$" as last-known-id means that the client wants to be - * served with just the messages that will arrive into the stream - * starting from now. */ - int id_idx = i - streams_arg - streams_count; - robj *key = c->argv[i-streams_count]; - kvobj *o = lookupKeyRead(c->db, key); - if (checkType(c,o,OBJ_STREAM)) goto cleanup; - streamCG *group = NULL; - - /* If a group was specified, than we need to be sure that the - * key and group actually exist. */ - if (groupname) { - if (o == NULL || - (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) - { - addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " - "group '%s' in XREADGROUP with GROUP " - "option", - (char*)key->ptr,(char*)groupname->ptr); - goto cleanup; - } - groups[id_idx] = group; - } - - if (strcmp(c->argv[i]->ptr,"$") == 0) { - if (xreadgroup) { - addReplyError(c,"The $ ID is meaningless in the context of " - "XREADGROUP: you want to read the history of " - "this consumer by specifying a proper ID, or " - "use the > ID to get new messages. The $ ID would " - "just return an empty result set."); - goto cleanup; - } - if (o) { - stream *s = o->ptr; - ids[id_idx] = s->last_id; - } else { - ids[id_idx].ms = 0; - ids[id_idx].seq = 0; - } - continue; - } else if (strcmp(c->argv[i]->ptr,"+") == 0) { - if (xreadgroup) { - addReplyError(c,"The + ID is meaningless in the context of " - "XREADGROUP: you want to read the history of " - "this consumer by specifying a proper ID, or " - "use the > ID to get new messages. The + ID would " - "just return an empty result set."); - goto cleanup; - } - if (o && ((stream *)o->ptr)->length) { - stream *s = o->ptr; - /* We need to get the last valid ID. - * It is impossible to use s->last_id because - * entry with s->last_id may have been removed. */ - streamLastValidID(s, &ids[id_idx]); - streamDecrID(&ids[id_idx]); - } else { - ids[id_idx].ms = 0; - ids[id_idx].seq = 0; - } - continue; - } else if (strcmp(c->argv[i]->ptr,">") == 0) { - if (!xreadgroup) { - addReplyError(c,"The > ID can be specified only when calling " - "XREADGROUP using the GROUP <group> " - "<consumer> option."); - goto cleanup; - } - /* We use just the maximum ID to signal this is a ">" ID, anyway - * the code handling the blocking clients will have to update the - * ID later in order to match the changing consumer group last ID. */ - ids[id_idx].ms = UINT64_MAX; - ids[id_idx].seq = UINT64_MAX; - continue; - } - if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK) - goto cleanup; - } - - /* Try to serve the client synchronously. */ - size_t arraylen = 0; - void *arraylen_ptr = NULL; - uint64_t min_pel_delivery_time = UINT64_MAX; - for (int i = 0; i < streams_count; i++) { - kvobj *o = lookupKeyRead(c->db, c->argv[streams_arg + i]); - if (o == NULL) continue; - stream *s = o->ptr; - streamID *gt = ids+i; /* ID must be greater than this. */ - int serve_claimed = 0; - int serve_synchronously = 0; - int serve_history = 0; /* True for XREADGROUP with ID != ">". */ - streamConsumer *consumer = NULL; /* Unused if XREAD */ - streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */ - - /* Check if there are the conditions to serve the client - * synchronously. */ - if (groups) { - /* If min_idle_time is set we need to check is there any pending - * message in the PEL idle enough to be claimed. Also we need to - * get the minimum delivery time in the PEL, in order to use it - * later if block option is set. */ - if (min_idle_time != -1) { - raxIterator ri; - raxStart(&ri, groups[i]->pel_by_time); - raxSeek(&ri, "^", NULL, 0); - while(raxNext(&ri)) { - pelTimeKey timeKey; - decodePelTimeKey(ri.key, &timeKey); - if (!streamEntryExists(s, &timeKey.id)) - continue; - - if (timeKey.delivery_time < min_pel_delivery_time) { - min_pel_delivery_time = timeKey.delivery_time; - } - - uint64_t idle = commandTimeSnapshot() - timeKey.delivery_time; - if (idle >= (uint64_t)min_idle_time) { - serve_claimed = 1; - } - break; - } - raxStop(&ri); - } - - /* If the consumer is blocked on a group, we always serve it - * synchronously (serving its local history) if the ID specified - * was not the special ">" ID. */ - if (gt->ms != UINT64_MAX || - gt->seq != UINT64_MAX) - { - serve_synchronously = 1; - serve_history = 1; - } else if (s->length) { - /* We also want to serve a consumer in a consumer group - * synchronously in case the group top item delivered is smaller - * than what the stream has inside. */ - streamID maxid, *last = &groups[i]->last_id; - streamLastValidID(s, &maxid); - if (streamCompareID(&maxid, last) > 0) { - serve_synchronously = 1; - *gt = *last; - } - } - consumer = streamLookupConsumer(groups[i],consumername->ptr); - if (consumer == NULL) { - old_alloc = s->alloc_size; - consumer = streamCreateConsumer(s,groups[i],consumername->ptr, - c->argv[streams_arg+i], - c->db->id,SCC_DEFAULT); - if (server.memory_tracking_per_slot) - updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),old_alloc,s->alloc_size); - if (noack) - streamPropagateConsumerCreation(c,spi.keyname, - spi.groupname, - consumer->name); - } - consumer->seen_time = commandTimeSnapshot(); - keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ - } else if (s->length) { - /* For consumers without a group, we serve synchronously if we can - * actually provide at least one item from the stream. */ - streamID maxid; - streamLastValidID(s, &maxid); - if (streamCompareID(&maxid, gt) > 0) { - serve_synchronously = 1; - } - } - - int flags = 0; - if (serve_history) { - /* CLAIM option is ignored when we server from consumer history.*/ - min_idle_time = -1; - } else if (!serve_synchronously && serve_claimed) { - /* We serve the client synchronously if the CLAIM option was - * specified and there are messages in the PEL that are idle - * enough. */ - serve_synchronously = 1; - flags |= STREAM_RWR_CLAIMED; - } - - if (serve_synchronously) { - arraylen++; - if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); - /* streamReplyWithRange() handles the 'start' ID as inclusive, - * so start from the next ID, since we want only messages with - * IDs greater than start. */ - streamID start = *gt; - streamIncrID(&start); - - /* Emit the two elements sub-array consisting of the name - * of the stream and the data we extracted from it. */ - if (c->resp == 2) addReplyArrayLen(c,2); - addReplyBulk(c,c->argv[streams_arg+i]); - - unsigned long propCount = 0; - if (noack) flags |= STREAM_RWR_NOACK; - if (serve_history) flags |= STREAM_RWR_HISTORY; - old_alloc = s->alloc_size; - streamReplyWithRange(c,s,&start,NULL,count,0, min_idle_time, - groups ? groups[i] : NULL, - consumer, flags, &spi, &propCount); - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),old_alloc,s->alloc_size); - if (propCount) { - server.dirty++; - keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ - } - } - } - - /* We replied synchronously! Set the top array len and return to caller. */ - if (arraylen) { - if (c->resp == 2) - setDeferredArrayLen(c,arraylen_ptr,arraylen); - else - setDeferredMapLen(c,arraylen_ptr,arraylen); - goto cleanup; - } - - /* Block if needed. */ - if (timeout != -1) { - /* If we are not allowed to block the client, the only thing - * we can do is treating it as a timeout (even with timeout 0). */ - if (c->flags & CLIENT_DENY_BLOCKING) { - addReplyNullArray(c); - goto cleanup; - } - /* We change the '$' to the current last ID for this stream. this is - * Since later on when we unblock on arriving data - we would like to - * re-process the command and in case '$' stays we will spin-block forever. - */ - for (int id_idx = 0; id_idx < streams_count; id_idx++) { - int arg_idx = id_idx + streams_arg + streams_count; - if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) { - robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]); - rewriteClientCommandArgument(c, arg_idx, argv_streamid); - decrRefCount(argv_streamid); - } - } - /* If min_idle_time is set we need to unblock client if PEL entry became claimable - * before new messages arrive. min_pel_delivery_time is the minimum delivery time of all - * entries in the PELs of different streams specified in the command. We add it to - * min_idle_time to get the earliest time when an entry will be eligible for claiming. - * If there are no entries in the PELs we will unblock the client after min_idle_time. */ - if (min_idle_time != -1) { - uint64_t pel_expire_time = min_idle_time; - if (min_pel_delivery_time != UINT64_MAX) - pel_expire_time += min_pel_delivery_time; - else - pel_expire_time += commandTimeSnapshot(); - trackStreamClaimTimeouts(c, c->argv+streams_arg, streams_count, pel_expire_time); - } - blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup); - goto cleanup; - } - - /* No BLOCK option, nor any stream we can serve. Reply as with a - * timeout happened. */ - addReplyNullArray(c); - /* Continue to cleanup... */ - -cleanup: /* Cleanup. */ - - /* The command is propagated (in the READGROUP form) as a side effect - * of calling lower level APIs. So stop any implicit propagation. */ - preventCommandPropagation(c); - if (ids != static_ids) zfree(ids); - zfree(groups); -} - -/* ----------------------------------------------------------------------- - * Low level implementation of consumer groups - * ----------------------------------------------------------------------- */ - -/* Update a consumer group's last_id and handle minimum last_id tracking. - * we will recalculate the minimum last_id when needed. */ -void streamUpdateCGroupLastId(stream *s, streamCG *cg, streamID *id) { - /* When a consumer group's last_id is updated, we need to invalidate the cached - * minimum last_id in two cases: - * 1. If the consumer group's previous last_id equals the minimum last_id. - * 2. If the new ID being set is smaller than the current minimum last_id. */ - if (s->min_cgroup_last_id_valid && - (streamCompareID(&cg->last_id, &s->min_cgroup_last_id) == 0 || - streamCompareID(id, &s->min_cgroup_last_id) < 0)) - { - s->min_cgroup_last_id_valid = 0; - } - cg->last_id = *id; -} - -/* Link a consumer group to a stream entry in the cgroups_ref index. - * Returns a pointer to the list node, so that it can be used for future deletion. */ -listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key) { - list *cglist; - - if (!s->cgroups_ref) - s->cgroups_ref = raxNewWithMetadata(0, &s->alloc_size); - - /* Try to find the list for this stream ID, create it if it doesn't exist */ - if (!raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) { - cglist = listCreate(); - serverAssert(raxInsert(s->cgroups_ref, key, sizeof(streamID), cglist, NULL)); - } - - /* Add the consumer group to the list and return the list node */ - listAddNodeTail(cglist, cg); - return listLast(cglist); -} - -/* Unlink a consumer group reference from the entry index for a specific stream ID. - * This is called when a message is acknowledged or when a consumer group is deleted. */ -void streamUnlinkEntryFromCGroupRef(stream *s, streamNACK *na, unsigned char *key) { - list *cglist; - if (!s->cgroups_ref) return; - if (raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) { - listDelNode(cglist, na->cgroup_ref_node); - - /* If the list is now empty, remove it from the index. */ - if (listLength(cglist) == 0) { - raxRemove(s->cgroups_ref, key, sizeof(streamID), NULL); - listRelease(cglist); - } - } -} - -/* Remove all consumer group references to a specific stream message. */ -void streamCleanupEntryCGroupRefs(stream *s, streamID *id) { - if (!s->cgroups_ref) return; - list *cglist; - listIter li; - listNode *ln; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, id); - - /* If message is not in any consumer group, nothing to do */ - if (!raxFind(s->cgroups_ref, buf, sizeof(streamID), (void **)&cglist)) - return; - - listRewind(cglist, &li); - while ((ln = listNext(&li))) { - streamNACK *nack; - streamCG *group = listNodeValue(ln); - - /* Find the message in this consumer group's PEL */ - serverAssert(raxFind(group->pel, buf, sizeof(buf), (void **)&nack)); - - /* Remove from group and consumer PELs */ - raxRemove(group->pel, buf, sizeof(buf), NULL); - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id); - raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL); - /* Since we're removing all references from the cgroups_ref, we can directly - * free the NACK without unlinking it from the cgroups_ref. */ - streamFreeNACK(s, nack); - } - - raxRemove(s->cgroups_ref, buf, sizeof(streamID), NULL); - listRelease(cglist); -} - -/* Check if a stream entry is still referenced by any consumer group. - * - * An entry is considered referenced if: - * 1. Its ID is smaller than the minimum last_id of all consumer groups, - * which means at least one group hasn't read it yet. - * 2. It exists in any consumer group's PEL. - * - * Returns 1 if the entry is referenced, 0 if it's fully acknowledged by all groups. */ -int streamEntryIsReferenced(stream *s, streamID *id) { - if (!s->cgroups || !raxSize(s->cgroups)) return 0; - if (!s->min_cgroup_last_id_valid) { - /* If the cached minimum last_id is invalid, we need to recalculate it - * by iterating through all consumer groups to find the minimum last_id */ - s->min_cgroup_last_id_valid = 1; - s->min_cgroup_last_id.ms = UINT64_MAX; - s->min_cgroup_last_id.seq = UINT64_MAX; - raxIterator ri; - raxStart(&ri, s->cgroups); - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - streamCG *cg = ri.data; - if (streamCompareID(&cg->last_id, &s->min_cgroup_last_id) < 0) - s->min_cgroup_last_id = cg->last_id; - } - raxStop(&ri); - } - - /* The consume group doesn't read it. */ - if (streamCompareID(&s->min_cgroup_last_id, id) < 0) - return 1; - - /* Check if the message is in any consumer group's PEL */ - if (!s->cgroups_ref) return 0; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, id); - return raxFind(s->cgroups_ref, buf, sizeof(streamID), NULL); -} - -/* Create a NACK entry setting the delivery count to 1 and the delivery - * time to the current time. The NACK consumer will be set to the one - * specified as argument of the function. */ -streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer) { - size_t usable; - streamNACK *nack = zmalloc_usable(sizeof(*nack), &usable); - s->alloc_size += usable; - nack->delivery_time = commandTimeSnapshot(); - nack->delivery_count = 1; - nack->consumer = consumer; - nack->cgroup_ref_node = NULL; /* Will be set when added to cgroups_ref */ - return nack; -} - -/* Free a NACK entry. */ -void streamFreeNACK(stream *s, streamNACK *na) { - size_t usable; - zfree_usable(na, &usable); - s->alloc_size -= usable; -} - -/* Free a NACK entry and remove its reference from the cgroups_ref. - * This ensures proper cleanup of the consumer group list associated with the message ID. */ -void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key) { - size_t usable; - streamUnlinkEntryFromCGroupRef(s, na, key); - zfree_usable(na, &usable); - s->alloc_size -= usable; -} - -/* Generic version of streamFreeNACK. */ -void streamFreeNACKGeneric(void *na, void *s) { - streamFreeNACK((stream *)s, (streamNACK *)na); -} - -/* Free a consumer and associated data structures. Note that this function - * will not reassign the pending messages associated with this consumer - * nor will delete them from the stream, so when this function is called - * to delete a consumer, and not when the whole stream is destroyed, the caller - * should do some work before. */ -void streamFreeConsumer(stream *s, streamConsumer *sc) { - size_t usable; - raxFree(sc->pel); /* No value free callback: the PEL entries are shared - between the consumer and the main stream PEL. */ - s->alloc_size -= sdsAllocSize(sc->name); - sdsfree(sc->name); - zfree_usable(sc, &usable); - s->alloc_size -= usable; -} - -/* Generic version of streamFreeConsumer. */ -void streamFreeConsumerGeneric(void *sc, void *s) { - streamFreeConsumer((stream *)s, (streamConsumer *)sc); -} - -/* Create a new consumer group in the context of the stream 's', having the - * specified name, last server ID and reads counter. If a consumer group with - * the same name already exists NULL is returned, otherwise the pointer to the - * consumer group is returned. */ -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { - if (s->cgroups == NULL) - s->cgroups = raxNewWithMetadata(0, &s->alloc_size); - if (raxFind(s->cgroups,(unsigned char*)name,namelen,NULL)) - return NULL; - - size_t usable; - streamCG *cg = zmalloc_usable(sizeof(*cg), &usable); - s->alloc_size += usable; - cg->pel = raxNewWithMetadata(0, &s->alloc_size); - cg->pel_by_time = raxNewWithMetadata(0, &s->alloc_size); - cg->consumers = raxNewWithMetadata(0, &s->alloc_size); - cg->last_id.ms = 0; - cg->last_id.seq = 0; - streamUpdateCGroupLastId(s, cg, id); - cg->entries_read = entries_read; - raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); - return cg; -} - -/* Free a consumer group and all its associated data. */ -static void streamFreeCG(stream *s, streamCG *cg) { - raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, s); - raxFree(cg->pel_by_time); - raxFreeWithCbAndContext(cg->consumers, streamFreeConsumerGeneric, s); - size_t usable; - zfree_usable(cg, &usable); - s->alloc_size -= usable; -} - -/* Destroy a consumer group and clean up all associated references. */ -void streamDestroyCG(stream *s, streamCG *cg) { - /* Remove all references from the cgroups_ref. */ - raxIterator it; - raxStart(&it, cg->pel); - raxSeek(&it, "^", NULL, 0); - while (raxNext(&it)) { - streamNACK *nack = it.data; - streamUnlinkEntryFromCGroupRef(s, nack, it.key); - } - raxStop(&it); - - /* If we're destroying the group with the minimum last_id, the cached - * minimum is no longer valid and needs to be recalculated from the - * remaining groups. */ - if (s->min_cgroup_last_id_valid && streamCompareID(&s->min_cgroup_last_id, &cg->last_id) == 0) - s->min_cgroup_last_id_valid = 0; - - streamFreeCG(s, cg); -} - -/* Generic version of streamFreeCG. */ -void streamFreeCGGeneric(void *cg, void *s) { - streamFreeCG((stream *)s, (streamCG *)cg); -} - -/* Lookup the consumer group in the specified stream and returns its - * pointer, otherwise if there is no such group, NULL is returned. */ -streamCG *streamLookupCG(stream *s, sds groupname) { - if (s->cgroups == NULL) return NULL; - void *cg = NULL; - raxFind(s->cgroups,(unsigned char*)groupname,sdslen(groupname),&cg); - return cg; -} - -/* Create a consumer with the specified name in the group 'cg' and return. - * If the consumer exists, return NULL. As a side effect, when the consumer - * is successfully created, the key space will be notified and dirty++ unless - * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */ -streamConsumer *streamCreateConsumer(stream *s, streamCG *cg, sds name, robj *key, int dbid, int flags) { - if (cg == NULL) return NULL; - int notify = !(flags & SCC_NO_NOTIFY); - int dirty = !(flags & SCC_NO_DIRTIFY); - size_t usable; - streamConsumer *consumer = zmalloc_usable(sizeof(*consumer), &usable); - int success = raxTryInsert(cg->consumers,(unsigned char*)name, - sdslen(name),consumer,NULL); - if (!success) { - zfree(consumer); - return NULL; - } - s->alloc_size += usable; - consumer->name = sdsdup(name); - s->alloc_size += sdsAllocSize(consumer->name); - consumer->pel = raxNewWithMetadata(0, &s->alloc_size); - consumer->active_time = -1; - consumer->seen_time = commandTimeSnapshot(); - if (dirty) server.dirty++; - if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid); - return consumer; -} - -/* Lookup the consumer with the specified name in the group 'cg'. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { - if (cg == NULL) return NULL; - void *consumer = NULL; - raxFind(cg->consumers,(unsigned char*)name,sdslen(name),&consumer); - return consumer; -} - -/* Delete the consumer specified in the consumer group 'cg'. */ -void streamDelConsumer(stream *s, streamCG *cg, streamConsumer *consumer) { - /* Iterate all the consumer pending messages, deleting every corresponding - * entry from the global entry. */ - raxIterator ri; - raxStart(&ri,consumer->pel); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - streamNACK *nack = ri.data; - streamUnlinkEntryFromCGroupRef(s, nack, ri.key); - - streamID id; - streamDecodeID(ri.key, &id); - - raxRemovePelByTime(cg->pel_by_time, nack->delivery_time, &id); - raxRemove(cg->pel,ri.key,ri.key_len,NULL); - - streamFreeNACK(s, nack); - } - raxStop(&ri); - - /* Deallocate the consumer. */ - raxRemove(cg->consumers,(unsigned char*)consumer->name, - sdslen(consumer->name),NULL); - streamFreeConsumer(s,consumer); -} - -/* ----------------------------------------------------------------------- - * Consumer groups commands - * ----------------------------------------------------------------------- */ - -/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESREAD entries_read] - * XGROUP SETID <key> <groupname> <id or $> [ENTRIESREAD entries_read] - * XGROUP DESTROY <key> <groupname> - * XGROUP CREATECONSUMER <key> <groupname> <consumer> - * XGROUP DELCONSUMER <key> <groupname> <consumername> */ -void xgroupCommand(client *c) { - stream *s = NULL; - sds grpname = NULL; - streamCG *cg = NULL; - char *opt = c->argv[1]->ptr; /* Subcommand name. */ - int mkstream = 0; - long long entries_read = SCG_INVALID_ENTRIES_READ; - robj *o; - size_t old_alloc; - - /* Everything but the "HELP" option requires a key and group name. */ - if (c->argc >= 4) { - /* Parse optional arguments for CREATE and SETID */ - int i = 5; - int create_subcmd = !strcasecmp(opt,"CREATE"); - int setid_subcmd = !strcasecmp(opt,"SETID"); - while (i < c->argc) { - if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) { - mkstream = 1; - i++; - } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) { - if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK) - return; - if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) { - addReplyError(c,"value for ENTRIESREAD must be positive or -1"); - return; - } - i += 2; - } else { - addReplySubcommandSyntaxError(c); - return; - } - } - - o = lookupKeyWrite(c->db,c->argv[2]); - if (o) { - if (checkType(c,o,OBJ_STREAM)) return; - s = o->ptr; - } - grpname = c->argv[3]->ptr; - } - - /* Check for missing key/group. */ - if (c->argc >= 4 && !mkstream) { - /* At this point key must exist, or there is an error. */ - if (s == NULL) { - addReplyError(c, - "The XGROUP subcommand requires the key to exist. " - "Note that for CREATE you may want to use the MKSTREAM " - "option to create an empty stream automatically."); - return; - } - - /* Certain subcommands require the group to exist. */ - if ((cg = streamLookupCG(s,grpname)) == NULL && - (!strcasecmp(opt,"SETID") || - !strcasecmp(opt,"CREATECONSUMER") || - !strcasecmp(opt,"DELCONSUMER"))) - { - addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " - "for key name '%s'", - (char*)grpname, (char*)c->argv[2]->ptr); - return; - } - } - - /* Dispatch the different subcommands. */ - if (c->argc == 2 && !strcasecmp(opt,"HELP")) { - const char *help[] = { -"CREATE <key> <groupname> <id|$> [option]", -" Create a new consumer group. Options are:", -" * MKSTREAM", -" Create the empty stream if it does not exist.", -" * ENTRIESREAD entries_read", -" Set the group's entries_read counter (internal use).", -"CREATECONSUMER <key> <groupname> <consumer>", -" Create a new consumer in the specified group.", -"DELCONSUMER <key> <groupname> <consumer>", -" Remove the specified consumer.", -"DESTROY <key> <groupname>", -" Remove the specified group.", -"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]", -" Set the current group ID and entries_read counter.", -NULL - }; - addReplyHelp(c, help); - } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) { - streamID id; - if (!strcmp(c->argv[4]->ptr,"$")) { - if (s) { - id = s->last_id; - } else { - id.ms = 0; - id.seq = 0; - } - } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) { - return; - } - - /* Handle the MKSTREAM option now that the command can no longer fail. */ - if (s == NULL) { - serverAssert(mkstream); - o = createStreamObject(); - dbAdd(c->db, c->argv[2], &o); - s = o->ptr; - keyModified(c,c->db,c->argv[2],o,1); - } - - if (entries_read != SCG_INVALID_ENTRIES_READ && (uint64_t)entries_read > s->entries_added) { - entries_read = s->entries_added; - } - - old_alloc = s->alloc_size; - streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read); - if (cg) { - if (server.memory_tracking_per_slot) - updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size); - addReply(c,shared.ok); - server.dirty++; - notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create", - c->argv[2],c->db->id); - keyModified(c,c->db,c->argv[2],o,0); - } else { - addReplyError(c,"-BUSYGROUP Consumer Group name already exists"); - } - } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) { - streamID id; - if (!strcmp(c->argv[4]->ptr,"$")) { - id = s->last_id; - } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { - return; - } - - if (entries_read != SCG_INVALID_ENTRIES_READ && (uint64_t)entries_read > s->entries_added) { - entries_read = s->entries_added; - } - - streamUpdateCGroupLastId(s, cg, &id); - cg->entries_read = entries_read; - addReply(c,shared.ok); - server.dirty++; - notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id); - keyModified(c,c->db,c->argv[2],o,0); - } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { - if (cg) { - old_alloc = s->alloc_size; - raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); - streamDestroyCG(s, cg); - if (server.memory_tracking_per_slot) - updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size); - addReply(c,shared.cone); - server.dirty++; - notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", - c->argv[2],c->db->id); - keyModified(c,c->db,c->argv[2],o,0); - /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ - signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM); - } else { - addReply(c,shared.czero); - } - } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) { - old_alloc = s->alloc_size; - streamConsumer *created = streamCreateConsumer(s,cg,c->argv[4]->ptr,c->argv[2], - c->db->id,SCC_DEFAULT); - keyModified(c,c->db,c->argv[2],o,0); - if (server.memory_tracking_per_slot) - updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size); - addReplyLongLong(c,created ? 1 : 0); - } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { - long long pending = 0; - streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr); - if (consumer) { - /* Delete the consumer and returns the number of pending messages - * that were yet associated with such a consumer. */ - old_alloc = s->alloc_size; - pending = raxSize(consumer->pel); - streamDelConsumer(s,cg,consumer); - if (server.memory_tracking_per_slot) - updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size); - server.dirty++; - notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer", - c->argv[2],c->db->id); - keyModified(c,c->db,c->argv[2],o,0); - } - addReplyLongLong(c,pending); - } else { - addReplySubcommandSyntaxError(c); - } -} - -/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id] - * - * Set the internal "last ID", "added entries" and "maximal deleted entry ID" - * of a stream. */ -void xsetidCommand(client *c) { - streamID id, max_xdel_id = {0, 0}; - long long entries_added = -1; - - if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) - return; - - int i = 3; - while (i < c->argc) { - int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ - char *opt = c->argv[i]->ptr; - if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) { - if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) { - return; - } else if (entries_added < 0) { - addReplyError(c,"entries_added must be positive"); - return; - } - i += 2; - } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) { - if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) { - return; - } else if (streamCompareID(&id,&max_xdel_id) < 0) { - addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id"); - return; - } - i += 2; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.nokeyerr); - if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return; - stream *s = kv->ptr; - - if (streamCompareID(&id,&s->max_deleted_entry_id) < 0) { - addReplyError(c,"The ID specified in XSETID is smaller than current max_deleted_entry_id"); - return; - } - - /* If the stream has at least one item, we want to check that the user - * is setting a last ID that is equal or greater than the current top - * item, otherwise the fundamental ID monotonicity assumption is violated. */ - if (s->length > 0) { - streamID maxid; - streamLastValidID(s,&maxid); - - if (streamCompareID(&id,&maxid) < 0) { - addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item"); - return; - } - - /* If an entries_added was provided, it can't be lower than the length. */ - if (entries_added != -1 && s->length > (uint64_t)entries_added) { - addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length"); - return; - } - } - - s->last_id = id; - if (entries_added != -1) - s->entries_added = entries_added; - if (!streamIDEqZero(&max_xdel_id)) - s->max_deleted_entry_id = max_xdel_id; - addReply(c,shared.ok); - server.dirty++; - notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id); - keyModified(c,c->db,c->argv[1],kv,0); -} - -/* XACK <key> <group> <id> <id> ... <id> - * Acknowledge a message as processed. In practical terms we just check the - * pending entries list (PEL) of the group, and delete the PEL entry both from - * the group and the consumer (pending messages are referenced in both places). - * - * Return value of the command is the number of messages successfully - * acknowledged, that is, the IDs we were actually able to resolve in the PEL. - */ -void xackCommand(client *c) { - streamCG *group = NULL; - kvobj *kv = lookupKeyRead(c->db, c->argv[1]); - if (kv) { - if (checkType(c, kv, OBJ_STREAM)) return; /* Type error. */ - group = streamLookupCG(kv->ptr, c->argv[2]->ptr); - } - - /* No key or group? Nothing to ack. */ - if (kv == NULL || group == NULL) { - addReply(c,shared.czero); - return; - } - - /* Start parsing the IDs, so that we abort ASAP if there is a syntax - * error: the return value of this command cannot be an error in case - * the client successfully acknowledged some messages, so it should be - * executed in a "all or nothing" fashion. */ - streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; - streamID *ids = static_ids; - int id_count = c->argc-3; - if (id_count > STREAMID_STATIC_VECTOR_LEN) - ids = zmalloc(sizeof(streamID)*id_count); - for (int j = 3; j < c->argc; j++) { - if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup; - } - - int acknowledged = 0; - stream *s = kv->ptr; - size_t old_alloc = s->alloc_size; - for (int j = 3; j < c->argc; j++) { - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,&ids[j-3]); - - /* Lookup the ID in the group PEL: it will have a reference to the - * NACK structure that will have a reference to the consumer, so that - * we are able to remove the entry from both PELs. */ - void *result; - if (raxFind(group->pel,buf,sizeof(buf),&result)) { - streamNACK *nack = result; - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &ids[j-3]); - raxRemove(group->pel,buf,sizeof(buf),NULL); - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - streamDestroyNACK(kv->ptr, nack, buf); - acknowledged++; - server.dirty++; - keyModified(c,c->db,c->argv[1],kv,0); - } - } - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - addReplyLongLong(c,acknowledged); -cleanup: - if (ids != static_ids) zfree(ids); -} - -/* Used by xackdelCommand() */ -typedef enum XAckDelRes { - XACKDEL_NO_ID = -1, /* ID not found in PEL. */ - XACKDEL_DELETED = 1, /* Message acknowledged and deleted. */ - XACKDEL_STILL_REFERENCED = 2, /* Message acknowledged but not deleted (still referenced). */ -} XAckDelRes; - -/* XACKDEL <key> <group> [KEEPREF|DELREF|ACKED] [IDS <numids> <id ...>] - * Acknowledges messages as processed and deletes them from the stream. - * - * Returns an array of status codes for each ID, indicating whether it - * was deleted, still referenced, or not found. */ -void xackdelCommand(client *c) { - stream *s = NULL; - streamCG *group = NULL; - kvobj *kv = lookupKeyRead(c->db, c->argv[1]); - if (checkType(c, kv, OBJ_STREAM)) return; /* Type error. */ - - /* Parse command options */ - streamAckDelArgs args; - if (!streamParseAckDelArgsOrReply(c, 3, &args)) return; - - /* Reply null if the key doesn't exist or the group doesn't exist.*/ - if (!kv || !(group = streamLookupCG(kv->ptr, c->argv[2]->ptr))) { - addReplyArrayLen(c, args.numids); - for (int i = 0; i < args.numids; i++) - addReplyLongLong(c, XACKDEL_NO_ID); - return; - } - - /* Start parsing the IDs, so that we abort ASAP if there is a syntax - * error: the return value of this command cannot be an error in case - * the client successfully acknowledged some messages, so it should be - * executed in a "all or nothing" fashion. */ - streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; - streamID *ids = static_ids; - if (args.numids > STREAMID_STATIC_VECTOR_LEN) - ids = zmalloc(sizeof(streamID)*args.numids); - for (int j = 0; j < args.numids; j++) { - if (streamParseStrictIDOrReply(c,c->argv[j+args.startidx],&ids[j],0,NULL) != C_OK) - goto cleanup; - } - - s = kv->ptr; - size_t old_alloc = s->alloc_size; - int first_entry = 0; - int deleted = 0, dirty = server.dirty; - addReplyArrayLen(c, args.numids); - for (int j = 0; j < args.numids; j++) { - int res = XACKDEL_NO_ID; - streamID *id = &ids[j]; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,id); - - /* Lookup the ID in the group PEL: it will have a reference to the - * NACK structure that will have a reference to the consumer, so that - * we are able to remove the entry from both PELs. */ - void *result; - if (raxFind(group->pel,buf,sizeof(buf),&result)) { - streamNACK *nack = result; - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id); - raxRemove(group->pel,buf,sizeof(buf),NULL); - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - streamDestroyNACK(s, nack, buf); - server.dirty++; - - int can_delete = 1; - if (args.delete_strategy == DELETE_STRATEGY_ACKED) { - /* Only delete if acknowledged by all consumer groups */ - if (streamEntryIsReferenced(s, id)) - can_delete = 0; - } else if (args.delete_strategy == DELETE_STRATEGY_DELREF) { - streamCleanupEntryCGroupRefs(s, id); - } - - if (can_delete && streamDeleteItem(s,id)) { - /* We want to know if the first entry in the stream was deleted - * so we can later set the new one. */ - if (streamCompareID(id,&s->first_id) == 0) { - first_entry = 1; - } - /* Update the stream's maximal tombstone if needed. */ - if (streamCompareID(id,&s->max_deleted_entry_id) > 0) { - s->max_deleted_entry_id = *id; - } - deleted++; - } - - /* If the entry was in the PEL but not found in the stream, - * we still consider it successfully deleted. */ - res = can_delete ? XACKDEL_DELETED : XACKDEL_STILL_REFERENCED; - } - addReplyLongLong(c, res); - } - - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - - /* Update the stream's first ID. */ - if (deleted) { - if (s->length == 0) { - s->first_id.ms = 0; - s->first_id.seq = 0; - } else if (first_entry) { - streamGetEdgeID(s,1,1,&s->first_id); - } - - /* Propagate the write. */ - keyModified(c,c->db,c->argv[1],kv,1); - notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); - } else if (server.dirty > dirty) { - /* Only ACK succeeded without deleting elements, just update LRM without signaling */ - keyModified(c,c->db,c->argv[1],kv,0); - } - -cleanup: - if (ids != static_ids) zfree(ids); -} - -/* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]] - * - * If start and stop are omitted, the command just outputs information about - * the amount of pending messages for the key/group pair, together with - * the minimum and maximum ID of pending messages. - * - * If start and stop are provided instead, the pending messages are returned - * with information about the current owner, number of deliveries and last - * delivery time and so forth. */ -void xpendingCommand(client *c) { - int justinfo = c->argc == 3; /* Without the range just outputs general - information about the PEL. */ - robj *key = c->argv[1]; - robj *groupname = c->argv[2]; - robj *consumername = NULL; - streamID startid, endid; - long long count = 0; - long long minidle = 0; - int startex = 0, endex = 0; - - /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */ - if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - - /* Parse start/end/count arguments ASAP if needed, in order to report - * syntax errors before any other error. */ - if (c->argc >= 6) { - int startidx = 3; /* Without IDLE */ - - if (!strcasecmp(c->argv[3]->ptr, "IDLE")) { - if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR) - return; - if (c->argc < 8) { - /* If IDLE was provided we must have at least 'start end count' */ - addReplyErrorObject(c,shared.syntaxerr); - return; - } - /* Search for rest of arguments after 'IDLE <idle>' */ - startidx += 2; - } - - /* count argument. */ - if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR) - return; - if (count < 0) count = 0; - - /* start and end arguments. */ - if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK) - return; - if (startex && streamIncrID(&startid) != C_OK) { - addReplyError(c,"invalid start ID for the interval"); - return; - } - if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK) - return; - if (endex && streamDecrID(&endid) != C_OK) { - addReplyError(c,"invalid end ID for the interval"); - return; - } - - if (startidx+3 < c->argc) { - /* 'consumer' was provided */ - consumername = c->argv[startidx+3]; - } - } - - /* Lookup the key and the group inside the stream. */ - kvobj *kv = lookupKeyRead(c->db, c->argv[1]); - streamCG *group; - - if (checkType(c, kv, OBJ_STREAM)) return; - if (kv == NULL || - (group = streamLookupCG(kv->ptr, groupname->ptr)) == NULL) - { - addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " - "group '%s'", - (char*)key->ptr,(char*)groupname->ptr); - return; - } - - /* XPENDING <key> <group> variant. */ - if (justinfo) { - addReplyArrayLen(c,4); - /* Total number of messages in the PEL. */ - addReplyLongLong(c,raxSize(group->pel)); - /* First and last IDs. */ - if (raxSize(group->pel) == 0) { - addReplyNull(c); /* Start. */ - addReplyNull(c); /* End. */ - addReplyNullArray(c); /* Clients. */ - } else { - /* Start. */ - raxIterator ri; - raxStart(&ri,group->pel); - raxSeek(&ri,"^",NULL,0); - raxNext(&ri); - streamDecodeID(ri.key,&startid); - addReplyStreamID(c,&startid); - - /* End. */ - raxSeek(&ri,"$",NULL,0); - raxNext(&ri); - streamDecodeID(ri.key,&endid); - addReplyStreamID(c,&endid); - raxStop(&ri); - - /* Consumers with pending messages. */ - raxStart(&ri,group->consumers); - raxSeek(&ri,"^",NULL,0); - void *arraylen_ptr = addReplyDeferredLen(c); - size_t arraylen = 0; - while(raxNext(&ri)) { - streamConsumer *consumer = ri.data; - if (raxSize(consumer->pel) == 0) continue; - addReplyArrayLen(c,2); - addReplyBulkCBuffer(c,ri.key,ri.key_len); - addReplyBulkLongLong(c,raxSize(consumer->pel)); - arraylen++; - } - setDeferredArrayLen(c,arraylen_ptr,arraylen); - raxStop(&ri); - } - } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */ - streamConsumer *consumer = NULL; - if (consumername) { - consumer = streamLookupConsumer(group,consumername->ptr); - - /* If a consumer name was mentioned but it does not exist, we can - * just return an empty array. */ - if (consumer == NULL) { - addReplyArrayLen(c,0); - return; - } - } - - rax *pel = consumer ? consumer->pel : group->pel; - unsigned char startkey[sizeof(streamID)]; - unsigned char endkey[sizeof(streamID)]; - raxIterator ri; - mstime_t now = commandTimeSnapshot(); - - streamEncodeID(startkey,&startid); - streamEncodeID(endkey,&endid); - raxStart(&ri,pel); - raxSeek(&ri,">=",startkey,sizeof(startkey)); - void *arraylen_ptr = addReplyDeferredLen(c); - size_t arraylen = 0; - - while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { - streamNACK *nack = ri.data; - - if (minidle) { - mstime_t this_idle = now - nack->delivery_time; - if (this_idle < minidle) continue; - } - - arraylen++; - count--; - addReplyArrayLen(c,4); - - /* Entry ID. */ - streamID id; - streamDecodeID(ri.key,&id); - addReplyStreamID(c,&id); - - /* Consumer name. */ - addReplyBulkCBuffer(c,nack->consumer->name, - sdslen(nack->consumer->name)); - - /* Milliseconds elapsed since last delivery. */ - mstime_t elapsed = now - nack->delivery_time; - if (elapsed < 0) elapsed = 0; - addReplyLongLong(c,elapsed); - - /* Number of deliveries. */ - addReplyLongLong(c,nack->delivery_count); - } - raxStop(&ri); - setDeferredArrayLen(c,arraylen_ptr,arraylen); - } -} - -/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> - * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] - * [FORCE] [JUSTID] - * - * Changes ownership of one or multiple messages in the Pending Entries List - * of a given stream consumer group. - * - * If the message ID (among the specified ones) exists, and its idle - * time greater or equal to <min-idle-time>, then the message new owner - * becomes the specified <consumer>. If the minimum idle time specified - * is zero, messages are claimed regardless of their idle time. - * - * All the messages that cannot be found inside the pending entries list - * are ignored, but in case the FORCE option is used. In that case we - * create the NACK (representing a not yet acknowledged message) entry in - * the consumer group PEL. - * - * This command creates the consumer as side effect if it does not yet - * exists. Moreover the command reset the idle time of the message to 0, - * even if by using the IDLE or TIME options, the user can control the - * new idle time. - * - * The options at the end can be used in order to specify more attributes - * to set in the representation of the pending message: - * - * 1. IDLE <ms>: - * Set the idle time (last time it was delivered) of the message. - * If IDLE is not specified, an IDLE of 0 is assumed, that is, - * the time count is reset because the message has now a new - * owner trying to process it. - * - * 2. TIME <ms-unix-time>: - * This is the same as IDLE but instead of a relative amount of - * milliseconds, it sets the idle time to a specific unix time - * (in milliseconds). This is useful in order to rewrite the AOF - * file generating XCLAIM commands. - * - * 3. RETRYCOUNT <count>: - * Set the retry counter to the specified value. This counter is - * incremented every time a message is delivered again. Normally - * XCLAIM does not alter this counter, which is just served to clients - * when the XPENDING command is called: this way clients can detect - * anomalies, like messages that are never processed for some reason - * after a big number of delivery attempts. - * - * 4. FORCE: - * Creates the pending message entry in the PEL even if certain - * specified IDs are not already in the PEL assigned to a different - * client. However the message must be exist in the stream, otherwise - * the IDs of non existing messages are ignored. - * - * 5. JUSTID: - * Return just an array of IDs of messages successfully claimed, - * without returning the actual message. - * - * 6. LASTID <id>: - * Update the consumer group last ID with the specified ID if the - * current last ID is smaller than the provided one. - * This is used for replication / AOF, so that when we read from a - * consumer group, the XCLAIM that gets propagated to give ownership - * to the consumer, is also used in order to update the group current - * ID. - * - * The command returns an array of messages that the user - * successfully claimed, so that the caller is able to understand - * what messages it is now in charge of. */ -void xclaimCommand(client *c) { - streamCG *group = NULL; - kvobj *o = lookupKeyRead(c->db,c->argv[1]); - long long minidle; /* Minimum idle time argument. */ - long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ - mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ - int force = 0; - int justid = 0; - - if (o) { - if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ - group = streamLookupCG(o->ptr,c->argv[2]->ptr); - } - - /* No key or group? Send an error given that the group creation - * is mandatory. */ - if (o == NULL || group == NULL) { - addReplyErrorFormat(c,"-NOGROUP No such key '%s' or " - "consumer group '%s'", (char*)c->argv[1]->ptr, - (char*)c->argv[2]->ptr); - return; - } - - if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle, - "Invalid min-idle-time argument for XCLAIM") - != C_OK) return; - if (minidle < 0) minidle = 0; - - /* Start parsing the IDs, so that we abort ASAP if there is a syntax - * error: the return value of this command cannot be an error in case - * the client successfully claimed some message, so it should be - * executed in a "all or nothing" fashion. */ - int j; - streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; - streamID *ids = static_ids; - int id_count = c->argc-5; - if (id_count > STREAMID_STATIC_VECTOR_LEN) - ids = zmalloc(sizeof(streamID)*id_count); - for (j = 5; j < c->argc; j++) { - if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break; - } - int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ - - /* If we stopped because some IDs cannot be parsed, perhaps they - * are trailing options. */ - mstime_t now = commandTimeSnapshot(); - streamID last_id = {0,0}; - int propagate_last_id = 0; - for (; j < c->argc; j++) { - int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ - char *opt = c->argv[j]->ptr; - if (!strcasecmp(opt,"FORCE")) { - force = 1; - } else if (!strcasecmp(opt,"JUSTID")) { - justid = 1; - } else if (!strcasecmp(opt,"IDLE") && moreargs) { - j++; - if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, - "Invalid IDLE option argument for XCLAIM") - != C_OK) goto cleanup; - deliverytime = now - deliverytime; - } else if (!strcasecmp(opt,"TIME") && moreargs) { - j++; - if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, - "Invalid TIME option argument for XCLAIM") - != C_OK) goto cleanup; - } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { - j++; - if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, - "Invalid RETRYCOUNT option argument for XCLAIM") - != C_OK) goto cleanup; - } else if (!strcasecmp(opt,"LASTID") && moreargs) { - j++; - if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup; - } else { - addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); - goto cleanup; - } - } - - if (streamCompareID(&last_id,&group->last_id) > 0) { - streamUpdateCGroupLastId(o->ptr, group, &last_id); - propagate_last_id = 1; - } - - if (deliverytime != -1) { - /* If a delivery time was passed, either with IDLE or TIME, we - * do some sanity check on it, and set the deliverytime to now - * (which is a sane choice usually) if the value is bogus. - * To raise an error here is not wise because clients may compute - * the idle time doing some math starting from their local time, - * and this is not a good excuse to fail in case, for instance, - * the computer time is a bit in the future from our POV. */ - if (deliverytime < 0 || deliverytime > now) deliverytime = now; - } else { - /* If no IDLE/TIME option was passed, we want the last delivery - * time to be now, so that the idle time of the message will be - * zero. */ - deliverytime = now; - } - - /* Do the actual claiming. */ - stream *s = o->ptr; - size_t old_alloc = s->alloc_size; - streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); - if (consumer == NULL) { - consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); - } - consumer->seen_time = commandTimeSnapshot(); - - void *arraylenptr = addReplyDeferredLen(c); - size_t arraylen = 0; - for (int j = 5; j <= last_id_arg; j++) { - streamID id = ids[j-5]; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,&id); - - /* Lookup the ID in the group PEL. */ - void *result = NULL; - raxFind(group->pel,buf,sizeof(buf),&result); - streamNACK *nack = result; - - /* Item must exist for us to transfer it to another consumer. */ - if (!streamEntryExists(s,&id)) { - /* Clear this entry from the PEL, it no longer exists */ - if (nack != NULL) { - /* Propagate this change (we are going to delete the NACK). */ - streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); - propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ - server.dirty++; - /* Release the NACK */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - raxRemove(group->pel,buf,sizeof(buf),NULL); - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - streamDestroyNACK(s, nack, buf); - } - continue; - } - - /* If FORCE is passed, let's check if at least the entry - * exists in the Stream. In such case, we'll create a new - * entry in the PEL from scratch, so that XCLAIM can also - * be used to create entries in the PEL. Useful for AOF - * and replication of consumer groups. */ - if (force && nack == NULL) { - /* Create the NACK. */ - nack = streamCreateNACK(s,NULL); - raxInsert(group->pel,buf,sizeof(buf),nack,NULL); - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); - nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); - } - - if (nack != NULL) { - /* We need to check if the minimum idle time requested - * by the caller is satisfied by this entry. - * - * Note that the nack could be created by FORCE, in this - * case there was no pre-existing entry and minidle should - * be ignored, but in that case nack->consumer is NULL. */ - if (nack->consumer && minidle) { - mstime_t this_idle = now - nack->delivery_time; - if (this_idle < minidle) continue; - } - - if (nack->consumer != consumer) { - /* Remove the entry from the old consumer. - * Note that nack->consumer is NULL if we created the - * NACK above because of the FORCE option. */ - if (nack->consumer) { - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - } - } - - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - nack->delivery_time = deliverytime; - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); - - /* Set the delivery attempts counter if given, otherwise - * autoincrement unless JUSTID option provided */ - if (retrycount >= 0) { - nack->delivery_count = retrycount; - } else if (!justid) { - nack->delivery_count++; - } - if (nack->consumer != consumer) { - /* Add the entry in the new consumer local PEL. */ - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - nack->consumer = consumer; - } - /* Send the reply for this entry. */ - if (justid) { - addReplyStreamID(c,&id); - } else { - serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,-1,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1); - } - arraylen++; - - consumer->active_time = commandTimeSnapshot(); - - /* Propagate this change. */ - streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); - propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ - server.dirty++; - } - } - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - if (propagate_last_id) { - streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); - server.dirty++; - } - setDeferredArrayLen(c,arraylenptr,arraylen); - preventCommandPropagation(c); - keyModified(c,c->db,c->argv[1],o,0); -cleanup: - if (ids != static_ids) zfree(ids); -} - -/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] - * - * Changes ownership of one or multiple messages in the Pending Entries List - * of a given stream consumer group. - * - * For each PEL entry, if its idle time greater or equal to <min-idle-time>, - * then the message new owner becomes the specified <consumer>. - * If the minimum idle time specified is zero, messages are claimed - * regardless of their idle time. - * - * This command creates the consumer as side effect if it does not yet - * exists. Moreover the command reset the idle time of the message to 0. - * - * The command returns an array of messages that the user - * successfully claimed, so that the caller is able to understand - * what messages it is now in charge of. */ -void xautoclaimCommand(client *c) { - streamCG *group = NULL; - kvobj *o = lookupKeyRead(c->db,c->argv[1]); - long long minidle; /* Minimum idle time argument, in milliseconds. */ - long count = 100; /* Maximum entries to claim. */ - const unsigned attempts_factor = 10; - streamID startid; - int startex; - int justid = 0; - - /* Parse idle/start/end/count arguments ASAP if needed, in order to report - * syntax errors before any other error. */ - if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK) - return; - if (minidle < 0) minidle = 0; - - if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK) - return; - if (startex && streamIncrID(&startid) != C_OK) { - addReplyError(c,"invalid start ID for the interval"); - return; - } - - int j = 6; /* options start at argv[6] */ - while(j < c->argc) { - int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ - char *opt = c->argv[j]->ptr; - if (!strcasecmp(opt,"COUNT") && moreargs) { - long max_count = LONG_MAX / (max(sizeof(streamID), attempts_factor)); - if (getRangeLongFromObjectOrReply(c,c->argv[j+1],1,max_count,&count,"COUNT must be > 0") != C_OK) - return; - j++; - } else if (!strcasecmp(opt,"JUSTID")) { - justid = 1; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - j++; - } - - if (o) { - if (checkType(c,o,OBJ_STREAM)) - return; /* Type error. */ - group = streamLookupCG(o->ptr,c->argv[2]->ptr); - } - - /* No key or group? Send an error given that the group creation - * is mandatory. */ - if (o == NULL || group == NULL) { - addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'", - (char*)c->argv[1]->ptr, - (char*)c->argv[2]->ptr); - return; - } - - streamID *deleted_ids = ztrymalloc(count * sizeof(streamID)); - if (!deleted_ids) { - addReplyError(c, "Insufficient memory, failed allocating transient memory, COUNT too high."); - return; - } - - /* Do the actual claiming. */ - stream *s = o->ptr; - size_t old_alloc = s->alloc_size; - streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); - if (consumer == NULL) { - consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); - } - consumer->seen_time = commandTimeSnapshot(); - - long long attempts = count * attempts_factor; - - addReplyArrayLen(c, 3); /* We add another reply later */ - void *endidptr = addReplyDeferredLen(c); /* reply[0] */ - void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */ - - unsigned char startkey[sizeof(streamID)]; - streamEncodeID(startkey,&startid); - raxIterator ri; - raxStart(&ri,group->pel); - raxSeek(&ri,">=",startkey,sizeof(startkey)); - size_t arraylen = 0; - mstime_t now = commandTimeSnapshot(); - int deleted_id_num = 0; - while (attempts-- && count && raxNext(&ri)) { - streamNACK *nack = ri.data; - - streamID id; - streamDecodeID(ri.key, &id); - - /* Item must exist for us to transfer it to another consumer. */ - if (!streamEntryExists(s,&id)) { - /* Propagate this change (we are going to delete the NACK). */ - robj *idstr = createObjectFromStreamID(&id); - streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); - decrRefCount(idstr); - server.dirty++; - /* Clear this entry from the PEL, it no longer exists */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - raxRemove(group->pel,ri.key,ri.key_len,NULL); - raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); - streamDestroyNACK(s, nack, ri.key); - /* Remember the ID for later */ - deleted_ids[deleted_id_num++] = id; - raxSeek(&ri,">=",ri.key,ri.key_len); - count--; /* Count is a limit of the command response size. */ - continue; - } - - if (minidle) { - mstime_t this_idle = now - nack->delivery_time; - if (this_idle < minidle) - continue; - } - - if (nack->consumer != consumer) { - /* Remove the entry from the old consumer. - * Note that nack->consumer is NULL if we created the - * NACK above because of the FORCE option. */ - if (nack->consumer) { - raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); - } - } - - /* Update the consumer and idle time. */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - nack->delivery_time = now; - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); - - /* Increment the delivery attempts counter unless JUSTID option provided */ - if (!justid) - nack->delivery_count++; - - if (nack->consumer != consumer) { - /* Add the entry in the new consumer local PEL. */ - raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL); - nack->consumer = consumer; - } - - /* Send the reply for this entry. */ - if (justid) { - addReplyStreamID(c,&id); - } else { - serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,-1,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1); - } - arraylen++; - count--; - - consumer->active_time = commandTimeSnapshot(); - - /* Propagate this change. */ - robj *idstr = createObjectFromStreamID(&id); - streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); - decrRefCount(idstr); - server.dirty++; - } - - /* We need to return the next entry as a cursor for the next XAUTOCLAIM call */ - raxNext(&ri); - - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - - streamID endid; - if (raxEOF(&ri)) { - endid.ms = endid.seq = 0; - } else { - streamDecodeID(ri.key, &endid); - } - raxStop(&ri); - - setDeferredArrayLen(c,arraylenptr,arraylen); - setDeferredReplyStreamID(c,endidptr,&endid); - - addReplyArrayLen(c, deleted_id_num); /* reply[2] */ - for (int i = 0; i < deleted_id_num; i++) { - addReplyStreamID(c, &deleted_ids[i]); - } - zfree(deleted_ids); - - preventCommandPropagation(c); - /* Update LRM but don't signal. */ - keyModified(c,c->db,c->argv[1],o,0); -} - -/* XDEL <key> [<ID1> <ID2> ... <IDN>] - * - * Removes the specified entries from the stream. Returns the number - * of items actually deleted, that may be different from the number - * of IDs passed in case certain IDs do not exist. */ -void xdelCommand(client *c) { - kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.czero); - if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return; - stream *s = kv->ptr; - size_t old_alloc = s->alloc_size; - - /* We need to sanity check the IDs passed to start. Even if not - * a big issue, it is not great that the command is only partially - * executed because at some point an invalid ID is parsed. */ - streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; - streamID *ids = static_ids; - int id_count = c->argc-2; - if (id_count > STREAMID_STATIC_VECTOR_LEN) - ids = zmalloc(sizeof(streamID)*id_count); - for (int j = 2; j < c->argc; j++) { - if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup; - } - - /* Actually apply the command. */ - int deleted = 0; - int first_entry = 0; - for (int j = 2; j < c->argc; j++) { - streamID *id = &ids[j-2]; - if (streamDeleteItem(s,id)) { - /* We want to know if the first entry in the stream was deleted - * so we can later set the new one. */ - if (streamCompareID(id,&s->first_id) == 0) { - first_entry = 1; - } - /* Update the stream's maximal tombstone if needed. */ - if (streamCompareID(id,&s->max_deleted_entry_id) > 0) { - s->max_deleted_entry_id = *id; - } - deleted++; - }; - } - - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - - /* Update the stream's first ID. */ - if (deleted) { - if (s->length == 0) { - s->first_id.ms = 0; - s->first_id.seq = 0; - } else if (first_entry) { - streamGetEdgeID(s,1,1,&s->first_id); - } - } - - /* Propagate the write if needed. */ - if (deleted) { - keyModified(c,c->db,c->argv[1],kv,1); - notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); - server.dirty += deleted; - } - addReplyLongLong(c,deleted); -cleanup: - if (ids != static_ids) zfree(ids); -} - -/* Used by xdelexCommand() */ -typedef enum XDelexRes { - XDELEX_NO_ID = -1, /* ID not found in the stream. */ - XDELEX_DELETED = 1, /* Message deleted. */ - XDELEX_STILL_REFERENCED = 2, /* Message not deleted (still referenced). */ -} XDelexRes; - -/* XDELEX <key> [KEEPREF|DELREF|ACKED] [IDS <numids> <id ...>] - * - * Removes specified entries from the stream. Returns an array of status codes for - * each ID, indicating whether it was deleted, still referenced, or not found. */ -void xdelexCommand(client *c) { - kvobj *kv = lookupKeyWrite(c->db, c->argv[1]); - if (checkType(c, kv, OBJ_STREAM)) return; - - /* Parse command options */ - streamAckDelArgs args; - if (!streamParseAckDelArgsOrReply(c, 2, &args)) return; - - /* Non-existing keys and empty stream are the same thing. Reply null if the - * key does not exist.*/ - if (!kv) { - addReplyArrayLen(c, args.numids); - for (int i = 0; i < args.numids; i++) - addReplyLongLong(c, XDELEX_NO_ID); - return; - } - - /* We need to sanity check the IDs passed to start. Even if not - * a big issue, it is not great that the command is only partially - * executed because at some point an invalid ID is parsed. */ - streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; - streamID *ids = static_ids; - if (args.numids > STREAMID_STATIC_VECTOR_LEN) - ids = zmalloc(sizeof(streamID)*args.numids); - for (int j = 0; j < args.numids; j++) { - if (streamParseStrictIDOrReply(c,c->argv[j+args.startidx],&ids[j],0,NULL) != C_OK) - goto cleanup; - } - - stream *s = kv->ptr; - size_t old_alloc = s->alloc_size; - int first_entry = 0; - int deleted = 0; - addReplyArrayLen(c, args.numids); - for (int j = 0; j < args.numids; j++) { - int res = XDELEX_NO_ID; - streamID *id = &ids[j]; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,id); - - int can_delete = 1; - if (args.delete_strategy == DELETE_STRATEGY_ACKED) { - /* Only delete if acknowledged by all consumer groups */ - if (streamEntryIsReferenced(s, id)) - can_delete = 0; - } else if (args.delete_strategy == DELETE_STRATEGY_DELREF) { - streamCleanupEntryCGroupRefs(s, id); - } - - if (can_delete) { /* can_delete being true doesn't guarantee the ID exists */ - if (streamDeleteItem(s,id)) { - /* We want to know if the first entry in the stream was deleted - * so we can later set the new one. */ - if (streamCompareID(id,&s->first_id) == 0) { - first_entry = 1; - } - /* Update the stream's maximal tombstone if needed. */ - if (streamCompareID(id,&s->max_deleted_entry_id) > 0) { - s->max_deleted_entry_id = *id; - } - deleted++; - res = XDELEX_DELETED; - } else { - /* This id doesn't exist. */ - } - } else { - res = XDELEX_STILL_REFERENCED; - } - - addReplyLongLong(c, res); - } - - /* Update the stream's first ID. */ - if (deleted) { - if (server.memory_tracking_per_slot) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - if (s->length == 0) { - s->first_id.ms = 0; - s->first_id.seq = 0; - } else if (first_entry) { - streamGetEdgeID(s,1,1,&s->first_id); - } - - /* Propagate the write. */ - keyModified(c,c->db,c->argv[1],kv,1); - notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); - server.dirty += deleted; - } - -cleanup: - if (ids != static_ids) zfree(ids); -} - -/* General form: XTRIM <key> [... options ...] - * - * List of options: - * - * Trim strategies: - * - * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at - * the specified length. Use ~ before the - * count in order to demand approximated trimming - * (like XADD MAXLEN option). - * MINID [~|=] <id> -- Trim so that the stream will not contain entries - * with IDs smaller than 'id'. Use ~ before the - * count in order to demand approximated trimming - * (like XADD MINID option). - * - * Consumer group reference handling (optional, defaults to KEEPREF): - * - * KEEPREF -- Keeps existing consumer group references - * DELREF -- Clean up all consumer group references - * ACKED -- Only delete messages that are acknowledged - * - * Other options: - * - * LIMIT <entries> -- The maximum number of entries to trim. - * 0 means unlimited. Unless specified, it is set - * to a default of 100*server.stream_node_max_entries, - * and that's in order to keep the trimming time sane. - * Has meaning only if `~` was provided. - */ -void xtrimCommand(client *c) { - /* Argument parsing. */ - streamAddTrimArgs parsed_args; - if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 0) < 0) - return; /* streamParseAddOrTrimArgsOrReply already replied. */ - - /* If the key does not exist, we are ok returning zero, that is, the - * number of elements removed from the stream. */ - kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.czero); - if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return; - stream *s = kv->ptr; - - /* Perform the trimming. */ - size_t old_alloc = s->alloc_size; - int64_t deleted = streamTrim(s, &parsed_args); - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - if (deleted) { - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); - if (parsed_args.approx_trim) { - /* In case our trimming was limited (by LIMIT or by ~) we must - * re-write the relevant trim argument to make sure there will be - * no inconsistencies in AOF loading or in the replica. - * It's enough to check only args->approx because there is no - * way LIMIT is given without the ~ option. */ - streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); - streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); - } - - /* Propagate the write. */ - keyModified(c, c->db,c->argv[1], kv, 1); - server.dirty += deleted; - } - addReplyLongLong(c,deleted); -} - -/* Helper function for xinfoCommand. - * Handles the variants of XINFO STREAM */ -void xinfoReplyWithStreamInfo(client *c, stream *s) { - int full = 1; - long long count = 10; /* Default COUNT is 10 so we don't block the server */ - robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */ - int optc = c->argc - 3; - - /* Parse options. */ - if (optc == 0) { - full = 0; - } else { - /* Valid options are [FULL] or [FULL COUNT <count>] */ - if (optc != 1 && optc != 3) { - addReplySubcommandSyntaxError(c); - return; - } - - /* First option must be "FULL" */ - if (strcasecmp(optv[0]->ptr,"full")) { - addReplySubcommandSyntaxError(c); - return; - } - - if (optc == 3) { - /* First option must be "FULL" */ - if (strcasecmp(optv[1]->ptr,"count")) { - addReplySubcommandSyntaxError(c); - return; - } - if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR) - return; - if (count < 0) count = 10; - } - } - - addReplyMapLen(c,full ? 15 : 16); - addReplyBulkCString(c,"length"); - addReplyLongLong(c,s->length); - addReplyBulkCString(c,"radix-tree-keys"); - addReplyLongLong(c,raxSize(s->rax)); - addReplyBulkCString(c,"radix-tree-nodes"); - addReplyLongLong(c,s->rax->numnodes); - addReplyBulkCString(c,"last-generated-id"); - addReplyStreamID(c,&s->last_id); - addReplyBulkCString(c,"max-deleted-entry-id"); - addReplyStreamID(c,&s->max_deleted_entry_id); - addReplyBulkCString(c,"entries-added"); - addReplyLongLong(c,s->entries_added); - addReplyBulkCString(c,"recorded-first-entry-id"); - addReplyStreamID(c,&s->first_id); - addReplyBulkCString(c,"idmp-duration"); - addReplyLongLong(c,s->idmp_duration); - addReplyBulkCString(c,"idmp-maxsize"); - addReplyLongLong(c,s->idmp_max_entries); - addReplyBulkCString(c,"pids-tracked"); - addReplyLongLong(c, s->idmp_producers ? raxSize(s->idmp_producers) : 0); - addReplyBulkCString(c,"iids-tracked"); - /* Count total IIDs across all producers */ - size_t total_iids = 0; - if (s->idmp_producers) { - raxIterator ri; - raxStart(&ri, s->idmp_producers); - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - idmpProducer *producer = ri.data; - total_iids += dictSize(producer->idmp_dict); - } - raxStop(&ri); - } - addReplyLongLong(c, total_iids); - addReplyBulkCString(c,"iids-added"); - addReplyLongLong(c,s->iids_added); - addReplyBulkCString(c,"iids-duplicates"); - addReplyLongLong(c,s->iids_duplicates); - - size_t old_alloc = s->alloc_size; - if (!full) { - /* XINFO STREAM <key> */ - - addReplyBulkCString(c,"groups"); - addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); - - /* To emit the first/last entry we use streamReplyWithRange(). */ - int emitted; - streamID start, end; - start.ms = start.seq = 0; - end.ms = end.seq = UINT64_MAX; - addReplyBulkCString(c,"first-entry"); - emitted = streamReplyWithRange(c,s,&start,&end,1,0,-1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL,NULL); - if (!emitted) addReplyNull(c); - addReplyBulkCString(c,"last-entry"); - emitted = streamReplyWithRange(c,s,&start,&end,1,1,-1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL,NULL); - if (!emitted) addReplyNull(c); - } else { - /* XINFO STREAM <key> FULL [COUNT <count>] */ - - /* Stream entries */ - addReplyBulkCString(c,"entries"); - streamReplyWithRange(c,s,NULL,NULL,count,0,-1,NULL,NULL,0,NULL,NULL); - - /* Consumer groups */ - addReplyBulkCString(c,"groups"); - if (s->cgroups == NULL) { - addReplyArrayLen(c,0); - } else { - addReplyArrayLen(c,raxSize(s->cgroups)); - raxIterator ri_cgroups; - raxStart(&ri_cgroups,s->cgroups); - raxSeek(&ri_cgroups,"^",NULL,0); - while(raxNext(&ri_cgroups)) { - streamCG *cg = ri_cgroups.data; - addReplyMapLen(c,7); - - /* Name */ - addReplyBulkCString(c,"name"); - addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len); - - /* Last delivered ID */ - addReplyBulkCString(c,"last-delivered-id"); - addReplyStreamID(c,&cg->last_id); - - /* Read counter of the last delivered ID */ - addReplyBulkCString(c,"entries-read"); - if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { - addReplyLongLong(c,cg->entries_read); - } else { - addReplyNull(c); - } - - /* Group lag */ - addReplyBulkCString(c,"lag"); - streamReplyWithCGLag(c,s,cg); - - /* Group PEL count */ - addReplyBulkCString(c,"pel-count"); - addReplyLongLong(c,raxSize(cg->pel)); - - /* Group PEL */ - addReplyBulkCString(c,"pending"); - long long arraylen_cg_pel = 0; - void *arrayptr_cg_pel = addReplyDeferredLen(c); - raxIterator ri_cg_pel; - raxStart(&ri_cg_pel,cg->pel); - raxSeek(&ri_cg_pel,"^",NULL,0); - while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { - streamNACK *nack = ri_cg_pel.data; - addReplyArrayLen(c,4); - - /* Entry ID. */ - streamID id; - streamDecodeID(ri_cg_pel.key,&id); - addReplyStreamID(c,&id); - - /* Consumer name. */ - serverAssert(nack->consumer); /* assertion for valgrind (avoid NPD) */ - addReplyBulkCBuffer(c,nack->consumer->name, - sdslen(nack->consumer->name)); - - /* Last delivery. */ - addReplyLongLong(c,nack->delivery_time); - - /* Number of deliveries. */ - addReplyLongLong(c,nack->delivery_count); - - arraylen_cg_pel++; - } - setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); - raxStop(&ri_cg_pel); - - /* Consumers */ - addReplyBulkCString(c,"consumers"); - addReplyArrayLen(c,raxSize(cg->consumers)); - raxIterator ri_consumers; - raxStart(&ri_consumers,cg->consumers); - raxSeek(&ri_consumers,"^",NULL,0); - while(raxNext(&ri_consumers)) { - streamConsumer *consumer = ri_consumers.data; - addReplyMapLen(c,5); - - /* Consumer name */ - addReplyBulkCString(c,"name"); - addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); - - /* Seen-time */ - addReplyBulkCString(c,"seen-time"); - addReplyLongLong(c,consumer->seen_time); - - /* Active-time */ - addReplyBulkCString(c,"active-time"); - addReplyLongLong(c,consumer->active_time); - - /* Consumer PEL count */ - addReplyBulkCString(c,"pel-count"); - addReplyLongLong(c,raxSize(consumer->pel)); - - /* Consumer PEL */ - addReplyBulkCString(c,"pending"); - long long arraylen_cpel = 0; - void *arrayptr_cpel = addReplyDeferredLen(c); - raxIterator ri_cpel; - raxStart(&ri_cpel,consumer->pel); - raxSeek(&ri_cpel,"^",NULL,0); - while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { - streamNACK *nack = ri_cpel.data; - addReplyArrayLen(c,3); - - /* Entry ID. */ - streamID id; - streamDecodeID(ri_cpel.key,&id); - addReplyStreamID(c,&id); - - /* Last delivery. */ - addReplyLongLong(c,nack->delivery_time); - - /* Number of deliveries. */ - addReplyLongLong(c,nack->delivery_count); - - arraylen_cpel++; - } - setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); - raxStop(&ri_cpel); - } - raxStop(&ri_consumers); - } - raxStop(&ri_cgroups); - } - } - if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) - updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); -} - -/* XINFO CONSUMERS <key> <group> - * XINFO GROUPS <key> - * XINFO STREAM <key> [FULL [COUNT <count>]] - * XINFO HELP. */ -void xinfoCommand(client *c) { - stream *s = NULL; - char *opt; - robj *key; - - /* HELP is special. Handle it ASAP. */ - if (!strcasecmp(c->argv[1]->ptr,"HELP")) { - const char *help[] = { -"CONSUMERS <key> <groupname>", -" Show consumers of <groupname>.", -"GROUPS <key>", -" Show the stream consumer groups.", -"STREAM <key> [FULL [COUNT <count>]", -" Show information about the stream.", -NULL - }; - addReplyHelp(c, help); - return; - } - - /* With the exception of HELP handled before any other sub commands, all - * the ones are in the form of "<subcommand> <key>". */ - opt = c->argv[1]->ptr; - key = c->argv[2]; - - /* Lookup the key now, this is common for all the subcommands but HELP. */ - kvobj *kv = lookupKeyReadOrReply(c, key, shared.nokeyerr); - if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return; - s = kv->ptr; - - /* Dispatch the different subcommands. */ - if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { - /* XINFO CONSUMERS <key> <group>. */ - streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); - if (cg == NULL) { - addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " - "for key name '%s'", - (char*)c->argv[3]->ptr, (char*)key->ptr); - return; - } - - addReplyArrayLen(c,raxSize(cg->consumers)); - raxIterator ri; - raxStart(&ri,cg->consumers); - raxSeek(&ri,"^",NULL,0); - mstime_t now = commandTimeSnapshot(); - while(raxNext(&ri)) { - streamConsumer *consumer = ri.data; - mstime_t inactive = consumer->active_time != -1 ? now - consumer->active_time : consumer->active_time; - mstime_t idle = now - consumer->seen_time; - if (idle < 0) idle = 0; - - addReplyMapLen(c,4); - addReplyBulkCString(c,"name"); - addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); - addReplyBulkCString(c,"pending"); - addReplyLongLong(c,raxSize(consumer->pel)); - addReplyBulkCString(c,"idle"); - addReplyLongLong(c,idle); - addReplyBulkCString(c,"inactive"); - addReplyLongLong(c,inactive); - } - raxStop(&ri); - } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { - /* XINFO GROUPS <key>. */ - if (s->cgroups == NULL) { - addReplyArrayLen(c,0); - return; - } - - addReplyArrayLen(c,raxSize(s->cgroups)); - raxIterator ri; - raxStart(&ri,s->cgroups); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - streamCG *cg = ri.data; - addReplyMapLen(c,6); - addReplyBulkCString(c,"name"); - addReplyBulkCBuffer(c,ri.key,ri.key_len); - addReplyBulkCString(c,"consumers"); - addReplyLongLong(c,raxSize(cg->consumers)); - addReplyBulkCString(c,"pending"); - addReplyLongLong(c,raxSize(cg->pel)); - addReplyBulkCString(c,"last-delivered-id"); - addReplyStreamID(c,&cg->last_id); - addReplyBulkCString(c,"entries-read"); - if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { - addReplyLongLong(c,cg->entries_read); - } else { - addReplyNull(c); - } - addReplyBulkCString(c,"lag"); - streamReplyWithCGLag(c,s,cg); - } - raxStop(&ri); - } else if (!strcasecmp(opt,"STREAM")) { - /* XINFO STREAM <key> [FULL [COUNT <count>]]. */ - xinfoReplyWithStreamInfo(c,s); - } else { - addReplySubcommandSyntaxError(c); - } -} - -/* XCFGSET <key> [IDMP-DURATION <duration>] [IDMP-MAXSIZE <maxsize>] */ -void xcfgsetCommand(client *c) { - robj *key = c->argv[1]; - - /* Lookup the stream key */ - kvobj *kv = lookupKeyWriteOrReply(c,key,shared.nokeyerr); - if (kv == NULL || checkType(c,kv,OBJ_STREAM)) return; - stream *s = kv->ptr; - - /* XCFGSET <key> [IDMP-DURATION <duration>] [IDMP-MAXSIZE <maxsize>] */ - long long duration = -1; - long long maxsize = -1; - - /* Parse parameters */ - for (int i = 2; i < c->argc; i++) { - int moreargs = c->argc - i - 1; - char *param = c->argv[i]->ptr; - if (!strcasecmp(param,"IDMP-DURATION") && moreargs) { - if (duration != -1) { - addReplyError(c,"IDMP-DURATION specified multiple times"); - return; - } - i++; - if (getLongLongFromObjectOrReply(c,c->argv[i],&duration,NULL) != C_OK) - return; - if (duration < CONFIG_STREAM_IDMP_MIN_DURATION || - duration > CONFIG_STREAM_IDMP_MAX_DURATION) { - addReplyErrorFormat(c,"IDMP-DURATION must be between %d and %d seconds", - CONFIG_STREAM_IDMP_MIN_DURATION,CONFIG_STREAM_IDMP_MAX_DURATION); - return; - } - } else if (!strcasecmp(param,"IDMP-MAXSIZE") && moreargs) { - if (maxsize != -1) { - addReplyError(c,"IDMP-MAXSIZE specified multiple times"); - return; - } - i++; - if (getLongLongFromObjectOrReply(c,c->argv[i],&maxsize,NULL) != C_OK) - return; - if (maxsize < CONFIG_STREAM_IDMP_MIN_MAXSIZE || - maxsize > CONFIG_STREAM_IDMP_MAX_MAXSIZE) { - addReplyErrorFormat(c,"IDMP-MAXSIZE must be between %d and %d entries", - CONFIG_STREAM_IDMP_MIN_MAXSIZE,CONFIG_STREAM_IDMP_MAX_MAXSIZE); - return; - } - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - /* At least one parameter must be specified */ - if (duration == -1 && maxsize == -1) { - addReplyError(c,"At least one parameter must be specified"); - return; - } - - /* Track if we made any changes */ - int changed = 0; - - /* Update the stream configuration. When we set IDMP-DURATION or IDMP-MAXSIZE to a - * different value, we clear all existing producer IDMP maps for the stream. - * If the value is the same, we don't clear to allow multiple publishers - * to call this before starting to publish without clearing each time. */ - if (duration != -1 && s->idmp_duration != (uint64_t)duration) { - s->idmp_duration = duration; - streamClearIdmpEntries(s); - changed = 1; - } - if (maxsize != -1 && s->idmp_max_entries != (uint64_t)maxsize) { - s->idmp_max_entries = maxsize; - streamClearIdmpEntries(s); - changed = 1; - } - - /* Mark the key as dirty for replication only if we changed something */ - if (changed) { - keyModified(c,c->db,key,kv,0); - server.dirty++; - } - addReply(c,shared.ok); -} - -/* Validate the integrity stream listpack entries structure. Both in term of a - * valid listpack, but also that the structure of the entries matches a valid - * stream. return 1 if valid 0 if not valid. */ -int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) { - int valid_record; - unsigned char *p, *next; - - /* Since we don't want to run validation of all records twice, we'll - * run the listpack validation of just the header and do the rest here. */ - if (!lpValidateIntegrity(lp, size, 0, NULL, NULL)) - return 0; - - /* In non-deep mode we just validated the listpack header (encoded size) */ - if (!deep) return 1; - - next = p = lpValidateFirst(lp); - if (!lpValidateNext(lp, &next, size)) return 0; - if (!p) return 0; - - /* entry count */ - int64_t entry_count = lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - - /* deleted */ - int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - - /* num-of-fields */ - int64_t master_fields = lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - - /* the field names */ - for (int64_t j = 0; j < master_fields; j++) { - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - } - - /* the zero master entry terminator. */ - int64_t zero = lpGetIntegerIfValid(p, &valid_record); - if (!valid_record || zero != 0) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - - entry_count += deleted_count; - while (entry_count--) { - if (!p) return 0; - int64_t fields = master_fields, extra_fields = 3; - int64_t flags = lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - - /* entry id */ - lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - - if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) { - /* num-of-fields */ - fields = lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - - /* the field names */ - for (int64_t j = 0; j < fields; j++) { - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - } - - extra_fields += fields + 1; - } - - /* the values */ - for (int64_t j = 0; j < fields; j++) { - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - } - - /* lp-count */ - int64_t lp_count = lpGetIntegerIfValid(p, &valid_record); - if (!valid_record) return 0; - if (lp_count != fields + extra_fields) return 0; - p = next; if (!lpValidateNext(lp, &next, size)) return 0; - } - - if (next) - return 0; - - return 1; -} - -/* Convert the specified pelTimeKey as a 192 bit big endian number, so - * that the key can be sorted lexicographically. */ -void encodePelTimeKey(void *buf, pelTimeKey *key) { - uint64_t e[3]; - e[0] = htonu64(key->delivery_time); - e[1] = htonu64(key->id.ms); - e[2] = htonu64(key->id.seq); - memcpy(buf,e,sizeof(e)); -} - -/* This is the reverse of encodePelTimeKey(): the decoded key will be stored - * in the 'key' structure passed by reference. The buffer 'buf' must point - * to a 192 bit big-endian encoded key. */ -void decodePelTimeKey(void *buf, pelTimeKey *key) { - uint64_t e[3]; - memcpy(e,buf,sizeof(e)); - key->delivery_time = ntohu64(e[0]); - key->id.ms = ntohu64(e[1]); - key->id.seq = ntohu64(e[2]); -} - -/* Helper function to prepare an encoded PEL time key. - * This encapsulates the creation and encoding of a pelTimeKey structure. */ -static inline void preparePelTimeKey(unsigned char *keyBuf, uint64_t delivery_time, streamID *id) { - pelTimeKey timeKey; - timeKey.delivery_time = delivery_time; - timeKey.id = *id; - encodePelTimeKey(keyBuf, &timeKey); -} - -/* Helper function to insert a NACK into the PEL by time index. - * This encapsulates the encoding and insertion into the pel_by_time rax tree. */ -void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) { - unsigned char keyBuf[sizeof(pelTimeKey)]; - preparePelTimeKey(keyBuf, delivery_time, id); - raxInsert(pel_by_time, keyBuf, sizeof(keyBuf), NULL, NULL); -} - -/* Helper function to remove a NACK from the PEL by time index. - * This encapsulates the encoding and removal from the pel_by_time rax tree. */ -void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) { - unsigned char keyBuf[sizeof(pelTimeKey)]; - preparePelTimeKey(keyBuf, delivery_time, id); - raxRemove(pel_by_time, keyBuf, sizeof(keyBuf), NULL); -} - -/* Register stream keys for monitoring of expired pending entries to enable - * reactive blocking behavior for XREADGROUP commands with CLAIM. When a client - * blocks waiting for either new messages or expired pending entries, this - * function records the earliest timestamp when pending entries will expire - * (satisfy the min-idle-time requirement). - * - * For multi-client coordination, when multiple clients are blocked on the same - * stream with different min-idle-time values, the dictionary stores the minimum - * (earliest) expire_time across all clients to ensure the earliest possible - * wakeup when any pending entry expires and becomes available for claiming. - * - * 'c' is the client that is blocking on the stream(s). - * 'keys' is an array of stream key objects to monitor. - * 'numkeys' is the number of keys in the array. - * 'expire_time' is the absolute timestamp (in milliseconds) when the next - * pending entry will expire for this client, calculated as - * next_delivery_time + min_idle_time, where next_delivery_time is the - * delivery timestamp of the oldest pending entry in the stream. - * - * For new entries, the key is added with the given expire_time and the - * reference count is incremented. For existing entries, the expire_time - * is updated to the minimum value if the new expire_time is earlier, - * ensuring the earliest wakeup time is preserved for multi-client scenarios. - * Note that the reference count is only incremented for newly added keys, - * not for updates to existing entries. */ -void trackStreamClaimTimeouts(client *c, robj **keys, int numkeys, uint64_t expire_time) { - dictEntry *db_watch_entry, *db_watch_existing_entry; - uint64_t old_expire_time; - int j; - - for (j = 0; j < numkeys; j++) { - db_watch_entry = dictAddRaw(c->db->stream_claim_pending_keys, keys[j], &db_watch_existing_entry); - if (db_watch_entry != NULL) { - dictSetUnsignedIntegerVal(db_watch_entry, expire_time); - incrRefCount(keys[j]); - } else { - old_expire_time = dictGetUnsignedIntegerVal(db_watch_existing_entry); - if (expire_time < old_expire_time) { - dictSetUnsignedIntegerVal(db_watch_existing_entry, expire_time); - } - } - } -} - -/* Check and wake clients waiting for expired pending entries. This function - * is invoked regularly from blockedBeforeSleep() to monitor streams being - * watched for expired pending entries and wake up blocked clients when - * entries expire and become available for claiming. - * - * The function processes up to CRON_DBS_PER_CALL databases per call in a - * round-robin fashion, cycling through all databases over multiple invocations. - * For each database, it iterates through the stream_claim_pending_keys dictionary. - * For each watched stream, it compares the registered expire_time against the - * current server time. When expire_time is less than the current server time, - * the pending entry has expired and the stream is signaled as ready via - * signalKeyAsReady(), which wakes all blocked clients waiting on that stream. - * The entry is then removed from stream_claim_pending_keys. */ -void handleClaimableStreamEntries(void) { - static unsigned int current_db = 0; - int dbs_per_call = CRON_DBS_PER_CALL; - int j; - - if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; - - for (j = 0; j < dbs_per_call; j++) { - redisDb *db = &server.db[current_db % server.dbnum]; - current_db++; - - if (dictIsEmpty(db->stream_claim_pending_keys)) - continue; - - dictEntry *de; - dictIterator di; - dictInitSafeIterator(&di, db->stream_claim_pending_keys); - while ((de = dictNext(&di)) != NULL) { - robj *key = dictGetKey(de); - uint64_t expire_time = dictGetUnsignedIntegerVal(de); - kvobj *kv = dbFind(db, key->ptr); - - if (!kv || kv->type != OBJ_STREAM) { - dictDelete(db->stream_claim_pending_keys, key); - continue; - } - - if (expire_time < (uint64_t)server.mstime) { - signalKeyAsReady(db, key, kv->type); - dictDelete(db->stream_claim_pending_keys, key); - } - } - dictResetIterator(&di); - } -} - -/* ----------------------------------------------------------------------- - * IDMP (Idempotent Message Producer) Functions - * ----------------------------------------------------------------------- */ - -/* Hash function for idmpEntry - hashes the embedded iid buffer */ -static uint64_t idmpDictHashFunction(const void *key) { - const idmpEntry *entry = (const idmpEntry *)key; - return dictGenHashFunction((const char *)entry->iid, entry->iid_len); -} - -/* Key comparison function for idmpEntry - compares embedded iid buffers */ -static int idmpDictKeyCompare(dictCmpCache *cache, const void *key1, const void *key2) { - UNUSED(cache); - const idmpEntry *e1 = (const idmpEntry *)key1; - const idmpEntry *e2 = (const idmpEntry *)key2; - if (e1->iid_len != e2->iid_len) return 0; - return memcmp((const char *)e1->iid, (const char *)e2->iid, e1->iid_len) == 0; -} - -/* Dictionary type for IDMP entries - keys are idmpEntry pointers, values are NULL */ -dictType idmpDictType = { - idmpDictHashFunction, /* hash function */ - NULL, /* key dup */ - NULL, /* val dup */ - idmpDictKeyCompare, /* key compare */ - NULL, /* key destructor - handled manually with linked list */ - NULL, /* val destructor */ - NULL, /* resize allowed */ - NULL, /* rehashing started */ - NULL, /* rehashing completed */ - NULL, /* bucket changed */ - NULL, /* dict metadata bytes */ - NULL, /* userdata */ - .no_value = 0, /* Use regular dict entries with NULL values to support defrag */ - .keys_are_odd = 0, /* keys are not odd */ - .force_full_rehash = 0, /* no force full rehash */ - NULL, /* key from stored key */ - NULL, /* on dict release */ -}; - -/* Create a new idmpEntry with the given IID string. - * The entry and IID are allocated together using flexible array member. - * alloc_size must not be NULL and will be updated with the allocation size. */ -idmpEntry *idmpEntryCreate(const char *iid, size_t iid_len, size_t *alloc_size) { - size_t usable; - idmpEntry *entry = zmalloc_usable(sizeof(idmpEntry) + iid_len, &usable); - - entry->next = NULL; - entry->iid_len = iid_len; - memcpy(entry->iid, iid, iid_len); - - *alloc_size += usable; - - return entry; -} - -/* Free an idmpEntry (iid is embedded via flexible array member). - * alloc_size must not be NULL and will be updated with the freed size. */ -void idmpEntryFree(idmpEntry *entry, size_t *alloc_size) { - if (entry == NULL) return; - - size_t usable; - zfree_usable(entry, &usable); - *alloc_size -= usable; -} - -/* Create a new idmpProducer with an empty dict and linked list. - * alloc_size must not be NULL and will be updated with the allocation size. */ -idmpProducer *idmpProducerCreate(size_t *alloc_size) { - size_t usable; - idmpProducer *producer = zmalloc_usable(sizeof(idmpProducer), &usable); - producer->idmp_dict = dictCreate(&idmpDictType); - producer->idmp_head = NULL; - producer->idmp_tail = NULL; - - *alloc_size += usable; - - return producer; -} - -/* Free an idmpProducer including its dict and all linked list entries. - * alloc_size must not be NULL and will be updated with the freed size. */ -void idmpProducerFree(idmpProducer *producer, size_t *alloc_size) { - if (producer == NULL) return; - - /* Release the dict */ - if (producer->idmp_dict) - dictRelease(producer->idmp_dict); - - /* Free IDMP linked list entries */ - idmpEntry *entry = producer->idmp_head; - while (entry) { - idmpEntry *next = entry->next; - idmpEntryFree(entry, alloc_size); - entry = next; - } - - size_t usable; - zfree_usable(producer, &usable); - *alloc_size -= usable; -} - -/* Check if an IID already exists in the producer's idmp_dict. - * If found, sends the existing stream ID as a reply and returns 1. - * Returns 0 if the IID was not found. - * - * The 'entry' parameter should be an idmpEntry with the IID already set - * (iid and iid_len fields must be initialized). */ -static int idmpLookupAndReply(stream *s, idmpProducer *producer, idmpEntry *entry, client *c) { - dictEntry *de = dictFind(producer->idmp_dict, entry); - if (de != NULL) { - /* IID already exists, return the existing stream ID */ - idmpEntry *existing = (idmpEntry *)dictGetKey(de); - addReplyStreamID(c, &existing->id); - s->iids_duplicates++; - return 1; - } - return 0; -} - -/* Insert an idmpEntry into the producer's dict and linked list with the given stream ID. */ -static void idmpInsertEntry(stream *s, idmpProducer *producer, idmpEntry *entry, const streamID *id) { - /* Set the stream ID and initialize next pointer */ - entry->next = NULL; - entry->id = *id; - - /* Insert into dict (should always succeed since we already checked with lookup) */ - serverAssert(dictAdd(producer->idmp_dict, entry, NULL) == DICT_OK); - - /* Add to linked list tail */ - if (producer->idmp_tail == NULL) { - producer->idmp_head = producer->idmp_tail = entry; - } else { - producer->idmp_tail->next = entry; - producer->idmp_tail = entry; - } - - s->iids_added++; - - /* Remove oldest entry if exceeding max entries */ - idmpEvictOldestEntry(s, producer); -} - -/* Get or create an idmpProducer for the given producer ID. - * Returns the producer, or NULL on allocation failure. */ -static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t pid_len) { - /* Create the producers rax tree if it doesn't exist */ - if (s->idmp_producers == NULL) { - s->idmp_producers = raxNew(); - } - - /* Look up the producer */ - idmpProducer *producer = NULL; - int found = raxFind(s->idmp_producers, (unsigned char *)pid, pid_len, (void **)&producer); - if (!found) { - /* Create a new producer */ - producer = idmpProducerCreate(&s->alloc_size); - /* Insert into the rax tree - must succeed since we checked it doesn't exist */ - serverAssert(raxInsert(s->idmp_producers, (unsigned char *)pid, pid_len, producer, NULL)); - } - - return producer; -} - -/* Register a stream key for IDMP entry tracking. - * This registers a stream key in the database's stream_idmp_keys dictionary, - * allowing the cron job handleExpiredIdmpEntries() to periodically check - * and clean up expired idempotency entries from the stream's idmp_dict. - * - * 'c' is the client that is performing the XADD operation with IDMP. - * 'key' is the stream key object to track. - * - * If the key is not already tracked, it is added to stream_idmp_keys and its - * reference count is incremented. If the key is already being tracked (added - * by a previous XADD operation), this function does nothing, as the stream - * is already registered for periodic cleanup. */ -static void trackStreamIdmpEntries(client *c, robj *key) { - if (dictAddRaw(c->db->stream_idmp_keys, key, NULL)) { - incrRefCount(key); - } -} - -/* Clean up expired idempotency entries from tracked streams. This function - * is invoked regularly from serverCron() to remove expired entries - * from the idmp_dict of streams that have idempotency tracking enabled, - * keeping memory usage under control. - * - * The function processes up to CRON_DBS_PER_CALL databases per call in a - * round-robin fashion, cycling through all databases over multiple invocations. - * For each database, it iterates through the stream_idmp_keys dictionary. - * For each tracked stream, it compares the timestamp of entries in the stream's - * idmp linked list against the expiration threshold (current time - idmp_duration). - * Entries with timestamps older than the threshold are removed from the head - * of the linked list. When all entries have been removed and the list becomes empty, - * the stream key is removed from stream_idmp_keys to stop tracking it. */ -void handleExpiredIdmpEntries(void) { - static unsigned int current_db = 0; - int dbs_per_call = CRON_DBS_PER_CALL; - int j; - - if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; - - for (j = 0; j < dbs_per_call; j++) { - redisDb *db = &server.db[current_db % server.dbnum]; - current_db++; - - if (dictIsEmpty(db->stream_idmp_keys)) - continue; - - dictEntry *de; - dictIterator di; - dictInitSafeIterator(&di, db->stream_idmp_keys); - while ((de = dictNext(&di)) != NULL) { - robj *key = dictGetKey(de); - kvobj *kv = dbFind(db, key->ptr); - - if (!kv || kv->type != OBJ_STREAM) { - dictDelete(db->stream_idmp_keys, key); - continue; - } - - stream *s = kv->ptr; - uint64_t expire_time = server.mstime - (s->idmp_duration * 1000); - - /* Skip if no producers */ - if (s->idmp_producers == NULL) { - dictDelete(db->stream_idmp_keys, key); - continue; - } - - /* Iterate through all producers and remove expired entries */ - raxIterator ri; - raxStart(&ri, s->idmp_producers); - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - idmpProducer *producer = ri.data; - - /* Remove expired entries from the head of this producer's linked list */ - while (producer->idmp_head != NULL) { - idmpEntry *entry = producer->idmp_head; - if (entry->id.ms <= expire_time) { - /* Remove from dict */ - dictDelete(producer->idmp_dict, entry); - /* Remove from linked list head */ - producer->idmp_head = entry->next; - if (producer->idmp_head == NULL) { - producer->idmp_tail = NULL; - } - /* Free the entry */ - idmpEntryFree(entry, &s->alloc_size); - } else { - break; - } - } - - /* If this producer has no entries left, remove it from the rax tree */ - if (producer->idmp_head == NULL) { - raxRemove(s->idmp_producers, ri.key, ri.key_len, NULL); - idmpProducerFree(producer, &s->alloc_size); - raxSeek(&ri, ">=", ri.key, ri.key_len); - } - } - raxStop(&ri); - - /* If no producers remain, free the entire rax tree */ - if (raxSize(s->idmp_producers) == 0) { - raxFree(s->idmp_producers); - s->idmp_producers = NULL; - dictDelete(db->stream_idmp_keys, key); - continue; - } - } - dictResetIterator(&di); - } -} - -/* 64-bit left rotation helper for hash combination */ -static inline uint64_t rotl64(uint64_t x, int r) { - return (x << r) | (x >> (64 - r)); -} - -/* Hash field-value pairs using XXH3_128bits for AUTOIDMP. The function takes - * an array of robj pointers in 'argv' representing field-value pairs (field1, - * value1, field2, value2, ...) and 'numfields' indicating the number of pairs - * (not the array length). Each field-value pair is hashed using streaming - * XXH3_128bits with the field length included as a separator to prevent hash - * collisions from ambiguous concatenations. The resulting pair hashes are - * combined using an order-independent Sum + XOR approach with rotation to - * produce a final 128-bit hash stored in 'out_hash'. Returns C_OK on success, - * C_ERR on error. XXH128 is a non-cryptographic hash function: fast and - * well-distributed, but does NOT prevent intentional collision attacks. */ -static int createIdempotencyHash(robj **argv, int64_t numfields, XXH128_hash_t *out_hash) { - uint64_t sum_lo = 0, sum_hi = 0; - uint64_t xor_lo = 0, xor_hi = 0; - XXH3_state_t* state = XXH3_createState(); - if (state == NULL) return C_ERR; - - char llbuf[LONG_STR_SIZE]; - XXH_errorcode err; - - /* Process each field-value pair */ - for (int64_t i = 0; i < numfields; i++) { - robj *field = argv[i * 2]; - robj *value = argv[i * 2 + 1]; - - /* Initialize hash state for this pair */ - err = XXH3_128bits_reset(state); - if (err != XXH_OK) goto cleanup; - - /* Hash the field */ - long field_len; - unsigned char *field_data = getObjectReadOnlyString(field, &field_len, llbuf); - err = XXH3_128bits_update(state, field_data, field_len); - if (err != XXH_OK) goto cleanup; - - /* Hash the field length as separator to prevent collisions */ - err = XXH3_128bits_update(state, &field_len, sizeof(field_len)); - if (err != XXH_OK) goto cleanup; - - /* Hash the value */ - long value_len; - unsigned char *value_data = getObjectReadOnlyString(value, &value_len, llbuf); - err = XXH3_128bits_update(state, value_data, value_len); - if (err != XXH_OK) goto cleanup; - - /* Get the hash for this pair */ - XXH128_hash_t pair_hash = XXH3_128bits_digest(state); - - /* Accumulate with both sum and xor for order-independent combination */ - sum_lo += pair_hash.low64; - sum_hi += pair_hash.high64; - xor_lo ^= pair_hash.low64; - xor_hi ^= pair_hash.high64; - } - - /* Combine sum and xor with rotation for better distribution */ - XXH128_hash_t hash_result; - hash_result.low64 = sum_lo ^ rotl64(xor_hi, 1); - hash_result.high64 = sum_hi ^ rotl64(xor_lo, 1); - - XXH3_freeState(state); - *out_hash = hash_result; - return C_OK; - -cleanup: - XXH3_freeState(state); - return C_ERR; -} - -/* Clear all IDMP entries from a stream - free all producers and their entries */ -static void streamClearIdmpEntries(stream *s) { - if (s->idmp_producers == NULL) return; - - /* Iterate through all producers and free them */ - raxIterator ri; - raxStart(&ri, s->idmp_producers); - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - idmpProducerFree(ri.data, &s->alloc_size); - } - raxStop(&ri); - - /* Free the producers rax tree and reset */ - raxFree(s->idmp_producers); - s->idmp_producers = NULL; -} - -/* Evict the oldest entry from the IDMP producer when max entries is exceeded. - * This function checks if the number of entries exceeds the stream's max limit, - * and if so, removes the oldest entry from the producer's linked list and - * dictionary, maintaining the integrity of both data structures. If the list - * becomes empty after removal, both head and tail pointers are set to NULL. */ -static void idmpEvictOldestEntry(stream *s, idmpProducer *producer) { - if (dictSize(producer->idmp_dict) <= s->idmp_max_entries) { - return; - } - - idmpEntry *oldest = producer->idmp_head; - producer->idmp_head = oldest->next; - if (producer->idmp_head == NULL) { - producer->idmp_tail = NULL; - } - dictDelete(producer->idmp_dict, oldest); - idmpEntryFree(oldest, &s->alloc_size); -} |
