summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/t_stream.c')
-rw-r--r--examples/redis-unstable/src/t_stream.c5755
1 files changed, 0 insertions, 5755 deletions
diff --git a/examples/redis-unstable/src/t_stream.c b/examples/redis-unstable/src/t_stream.c
deleted file mode 100644
index 497c714..0000000
--- a/examples/redis-unstable/src/t_stream.c
+++ /dev/null
@@ -1,5755 +0,0 @@
-/*
- * Copyright (c) 2017-Present, Redis Ltd.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- */
-
-#include "server.h"
-#include "endianconv.h"
-#include "stream.h"
-#include "xxhash.h"
-#include <string.h>
-
-/* Every stream item inside the listpack, has a flags field that is used to
- * mark the entry as deleted, or having the same field as the "master"
- * entry at the start of the listpack> */
-#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
-#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
-#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
-
-/* For stream commands that require multiple IDs
- * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN',
- * avoid malloc allocation.*/
-#define STREAMID_STATIC_VECTOR_LEN 8
-
-/* Max pre-allocation for listpack. This is done to avoid abuse of a user
- * setting stream_node_max_bytes to a huge number. */
-#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096
-
-/* Don't let listpacks grow too big, even if the user config allows it.
- * doing so can lead to an overflow (trying to store more than 32bit length
- * into the listpack header), or actually an assertion since lpInsert
- * will return NULL. */
-#define STREAM_LISTPACK_MAX_SIZE (1<<30)
-
-void streamFreeCGGeneric(void *cg, void *s);
-void streamFreeNACK(stream *s, streamNACK *na);
-size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer);
-int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
-int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
-
-int streamEntryIsReferenced(stream *s, streamID *id);
-void streamCleanupEntryCGroupRefs(stream *s, streamID *id);
-void streamUpdateCGroupLastId(stream *s, streamCG *cg, streamID *id);
-void trackStreamClaimTimeouts(client *c, robj **keys, int numkeys, uint64_t expire_time);
-
-/* Forward declarations for IDMP functions (defined at end of file) */
-static void trackStreamIdmpEntries(client *c, robj *key);
-static void streamClearIdmpEntries(stream *s);
-static void idmpInsertEntry(stream *s, idmpProducer *producer, idmpEntry *entry, const streamID *id);
-static int idmpLookupAndReply(stream *s, idmpProducer *producer, idmpEntry *entry, client *c);
-static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t pid_len);
-static int createIdempotencyHash(robj **argv, int64_t numfields, XXH128_hash_t *out_hash);
-static void idmpEvictOldestEntry(stream *s, idmpProducer *producer);
-
-/* -----------------------------------------------------------------------
- * Low level stream encoding: a radix tree of listpacks.
- * ----------------------------------------------------------------------- */
-
-/* Create a new stream data structure. */
-stream *streamNew(void) {
- size_t usable;
- stream *s = zmalloc_usable(sizeof(*s), &usable);
- s->alloc_size = usable;
- s->rax = raxNewWithMetadata(0, &s->alloc_size);
- s->length = 0;
- s->first_id.ms = 0;
- s->first_id.seq = 0;
- s->last_id.ms = 0;
- s->last_id.seq = 0;
- s->max_deleted_entry_id.seq = 0;
- s->max_deleted_entry_id.ms = 0;
- s->entries_added = 0;
- s->cgroups = NULL; /* Created on demand to save memory when not used. */
- s->cgroups_ref = NULL;
- s->min_cgroup_last_id.ms = UINT64_MAX;
- s->min_cgroup_last_id.seq = UINT64_MAX;
- s->min_cgroup_last_id_valid = 0;
- s->idmp_duration = server.stream_idmp_duration; /* Default from server config */
- s->idmp_max_entries = server.stream_idmp_maxsize; /* Default from server config */
- s->idmp_producers = NULL; /* Created on demand to save memory when not used. */
- s->iids_added = 0;
- s->iids_duplicates = 0;
- return s;
-}
-
-static void streamLpFreeGeneric(void *lp, void *strm) {
- stream *s = strm;
- s->alloc_size -= lpBytes(lp);
- lpFree(lp);
-}
-
-void streamFreeIdmpProducerGeneric(void *producer, void *strm) {
- stream *s = strm;
- idmpProducerFree((idmpProducer *)producer, &s->alloc_size);
-}
-
-/* Free a stream, including the listpacks stored inside the radix tree. */
-void freeStream(stream *s) {
- raxFreeWithCbAndContext(s->rax, streamLpFreeGeneric, s);
- if (s->cgroups)
- raxFreeWithCbAndContext(s->cgroups, streamFreeCGGeneric, s);
- if (s->cgroups_ref)
- raxFreeWithCallback(s->cgroups_ref, listReleaseGeneric);
- /* Free IDMP producers rax tree */
- if (s->idmp_producers)
- raxFreeWithCbAndContext(s->idmp_producers, streamFreeIdmpProducerGeneric, s);
- debugServerAssert(s->alloc_size == zmalloc_usable_size(s));
- zfree(s);
-}
-
-/* Return the length of a stream. */
-unsigned long streamLength(const robj *subject) {
- stream *s = subject->ptr;
- return s->length;
-}
-
-/* Set 'id' to be its successor stream ID.
- * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a
- * C_ERR is returned. */
-int streamIncrID(streamID *id) {
- int ret = C_OK;
- if (id->seq == UINT64_MAX) {
- if (id->ms == UINT64_MAX) {
- /* Special case where 'id' is the last possible streamID... */
- id->ms = id->seq = 0;
- ret = C_ERR;
- } else {
- id->ms++;
- id->seq = 0;
- }
- } else {
- id->seq++;
- }
- return ret;
-}
-
-/* Set 'id' to be its predecessor stream ID.
- * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is
- * returned. */
-int streamDecrID(streamID *id) {
- int ret = C_OK;
- if (id->seq == 0) {
- if (id->ms == 0) {
- /* Special case where 'id' is the first possible streamID... */
- id->ms = id->seq = UINT64_MAX;
- ret = C_ERR;
- } else {
- id->ms--;
- id->seq = UINT64_MAX;
- }
- } else {
- id->seq--;
- }
- return ret;
-}
-
-/* Generate the next stream item ID given the previous one. If the current
- * milliseconds Unix time is greater than the previous one, just use this
- * as time part and start with sequence part of zero. Otherwise we use the
- * previous time (and never go backward) and increment the sequence. */
-void streamNextID(streamID *last_id, streamID *new_id) {
- uint64_t ms = commandTimeSnapshot();
- if (ms > last_id->ms) {
- new_id->ms = ms;
- new_id->seq = 0;
- } else {
- *new_id = *last_id;
- streamIncrID(new_id);
- }
-}
-
-/* This is a helper function for the COPY command.
- * Duplicate a Stream object, with the guarantee that the returned object
- * has the same encoding as the original one.
- *
- * The resulting object always has refcount set to 1 */
-robj *streamDup(robj *o) {
- robj *sobj;
-
- serverAssert(o->type == OBJ_STREAM);
-
- switch (o->encoding) {
- case OBJ_ENCODING_STREAM:
- sobj = createStreamObject();
- break;
- default:
- serverPanic("Wrong encoding.");
- break;
- }
-
- stream *s;
- stream *new_s;
- s = o->ptr;
- new_s = sobj->ptr;
-
- raxIterator ri;
- raxStart(&ri, s->rax);
- raxSeek(&ri, "^", NULL, 0);
- size_t lp_bytes = 0; /* Total bytes in the listpack. */
- unsigned char *lp = NULL; /* listpack pointer. */
- /* Get a reference to the listpack node. */
- while (raxNext(&ri)) {
- serverAssert(ri.key_len == sizeof(streamID));
- lp = ri.data;
- lp_bytes = lpBytes(lp);
- unsigned char *new_lp = zmalloc(lp_bytes);
- new_s->alloc_size += lp_bytes;
- memcpy(new_lp, lp, lp_bytes);
- raxInsert(new_s->rax, ri.key, ri.key_len,
- new_lp, NULL);
- }
- new_s->length = s->length;
- new_s->first_id = s->first_id;
- new_s->last_id = s->last_id;
- new_s->max_deleted_entry_id = s->max_deleted_entry_id;
- new_s->entries_added = s->entries_added;
- raxStop(&ri);
-
- if (s->cgroups == NULL) return sobj;
-
- /* Consumer Groups */
- raxIterator ri_cgroups;
- raxStart(&ri_cgroups, s->cgroups);
- raxSeek(&ri_cgroups, "^", NULL, 0);
- while (raxNext(&ri_cgroups)) {
- streamCG *cg = ri_cgroups.data;
- streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
- ri_cgroups.key_len, &cg->last_id,
- cg->entries_read);
-
- serverAssert(new_cg != NULL);
-
- /* Consumer Group PEL */
- raxIterator ri_cg_pel;
- raxStart(&ri_cg_pel,cg->pel);
- raxSeek(&ri_cg_pel,"^",NULL,0);
- while(raxNext(&ri_cg_pel)){
- streamNACK *nack = ri_cg_pel.data;
- streamNACK *new_nack = streamCreateNACK(new_s, NULL);
- new_nack->delivery_time = nack->delivery_time;
- new_nack->delivery_count = nack->delivery_count;
- new_nack->cgroup_ref_node = streamLinkCGroupToEntry(new_s, new_cg, ri_cg_pel.key);
- raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
-
- streamID id;
- streamDecodeID(ri_cg_pel.key, &id);
- raxInsertPelByTime(new_cg->pel_by_time, new_nack->delivery_time, &id);
- }
- raxStop(&ri_cg_pel);
-
- /* Consumers */
- raxIterator ri_consumers;
- raxStart(&ri_consumers, cg->consumers);
- raxSeek(&ri_consumers, "^", NULL, 0);
- while (raxNext(&ri_consumers)) {
- streamConsumer *consumer = ri_consumers.data;
- streamConsumer *new_consumer;
- size_t usable;
- new_consumer = zmalloc_usable(sizeof(*new_consumer), &usable);
- new_s->alloc_size += usable;
- new_consumer->name = sdsdup(consumer->name);
- new_s->alloc_size += sdsAllocSize(new_consumer->name);
- new_consumer->pel = raxNewWithMetadata(0, &new_s->alloc_size);
- raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
- sdslen(new_consumer->name), new_consumer, NULL);
- new_consumer->seen_time = consumer->seen_time;
- new_consumer->active_time = consumer->active_time;
-
- /* Consumer PEL */
- raxIterator ri_cpel;
- raxStart(&ri_cpel, consumer->pel);
- raxSeek(&ri_cpel, "^", NULL, 0);
- while (raxNext(&ri_cpel)) {
- void *result;
- int found = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID),&result);
-
- serverAssert(found);
-
- streamNACK *new_nack = result;
- new_nack->consumer = new_consumer;
- raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL);
- }
- raxStop(&ri_cpel);
- }
- raxStop(&ri_consumers);
- }
- raxStop(&ri_cgroups);
- return sobj;
-}
-
-/* This is a wrapper function for lpGet() to directly get an integer value
- * from the listpack (that may store numbers as a string), converting
- * the string if needed.
- * The 'valid' argument is an optional output parameter to get an indication
- * if the record was valid, when this parameter is NULL, the function will
- * fail with an assertion. */
-static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) {
- int64_t v;
- unsigned char *e = lpGet(ele,&v,NULL);
- if (e == NULL) {
- if (valid)
- *valid = 1;
- return v;
- }
- /* The following code path should never be used for how listpacks work:
- * they should always be able to store an int64_t value in integer
- * encoded form. However the implementation may change. */
- long long ll = 0;
- int ret = string2ll((char*)e,v,&ll);
- if (valid)
- *valid = ret;
- else
- serverAssert(ret != 0);
- v = ll;
- return v;
-}
-
-#define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL)
-
-/* Get an edge streamID of a given listpack.
- * 'master_id' is an input param, used to build the 'edge_id' output param */
-int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id)
-{
- if (lp == NULL)
- return 0;
-
- unsigned char *lp_ele;
-
- /* We need to seek either the first or the last entry depending
- * on the direction of the iteration. */
- if (first) {
- /* Get the master fields count. */
- lp_ele = lpFirst(lp); /* Seek items count */
- lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */
- lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */
- int64_t master_fields_count = lpGetInteger(lp_ele);
- lp_ele = lpNext(lp, lp_ele); /* Seek first field. */
-
- /* If we are iterating in normal order, skip the master fields
- * to seek the first actual entry. */
- for (int64_t i = 0; i < master_fields_count; i++)
- lp_ele = lpNext(lp, lp_ele);
-
- /* If we are going forward, skip the previous entry's
- * lp-count field (or in case of the master entry, the zero
- * term field) */
- lp_ele = lpNext(lp, lp_ele);
- if (lp_ele == NULL)
- return 0;
- } else {
- /* If we are iterating in reverse direction, just seek the
- * last part of the last entry in the listpack (that is, the
- * fields count). */
- lp_ele = lpLast(lp);
-
- /* If we are going backward, read the number of elements this
- * entry is composed of, and jump backward N times to seek
- * its start. */
- int64_t lp_count = lpGetInteger(lp_ele);
- if (lp_count == 0) /* We reached the master entry. */
- return 0;
-
- while (lp_count--)
- lp_ele = lpPrev(lp, lp_ele);
- }
-
- lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */
-
- /* Get the ID: it is encoded as difference between the master
- * ID and this entry ID. */
- streamID id = *master_id;
- id.ms += lpGetInteger(lp_ele);
- lp_ele = lpNext(lp, lp_ele);
- id.seq += lpGetInteger(lp_ele);
- *edge_id = id;
- return 1;
-}
-
-/* Debugging function to log the full content of a listpack. Useful
- * for development and debugging. */
-void streamLogListpackContent(unsigned char *lp) {
- unsigned char *p = lpFirst(lp);
- while(p) {
- unsigned char buf[LP_INTBUF_SIZE];
- int64_t v;
- unsigned char *ele = lpGet(p,&v,buf);
- serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
- p = lpNext(lp,p);
- }
-}
-
-/* Convert the specified stream entry ID as a 128 bit big endian number, so
- * that the IDs can be sorted lexicographically. */
-void streamEncodeID(void *buf, streamID *id) {
- uint64_t e[2];
- e[0] = htonu64(id->ms);
- e[1] = htonu64(id->seq);
- memcpy(buf,e,sizeof(e));
-}
-
-/* This is the reverse of streamEncodeID(): the decoded ID will be stored
- * in the 'id' structure passed by reference. The buffer 'buf' must point
- * to a 128 bit big-endian encoded ID. */
-void streamDecodeID(void *buf, streamID *id) {
- uint64_t e[2];
- memcpy(e,buf,sizeof(e));
- id->ms = ntohu64(e[0]);
- id->seq = ntohu64(e[1]);
-}
-
-/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */
-int streamCompareID(streamID *a, streamID *b) {
- if (a->ms > b->ms) return 1;
- else if (a->ms < b->ms) return -1;
- /* The ms part is the same. Check the sequence part. */
- else if (a->seq > b->seq) return 1;
- else if (a->seq < b->seq) return -1;
- /* Everything is the same: IDs are equal. */
- return 0;
-}
-
-/* Retrieves the ID of the stream edge entry. An edge is either the first or
- * the last ID in the stream, and may be a tombstone. To filter out tombstones,
- * set the'skip_tombstones' argument to 1. */
-void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id)
-{
- streamIterator si;
- int64_t numfields;
- streamIteratorStart(&si,s,NULL,NULL,!first);
- si.skip_tombstones = skip_tombstones;
- int found = streamIteratorGetID(&si,edge_id,&numfields);
- if (!found) {
- streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
- *edge_id = first ? max_id : min_id;
- }
- streamIteratorStop(&si);
-}
-
-/* Adds a new item into the stream 's' having the specified number of
- * field-value pairs as specified in 'numfields' and stored into 'argv'.
- * Returns the new entry ID populating the 'added_id' structure.
- *
- * If 'use_id' is not NULL, the ID is not auto-generated by the function,
- * but instead the passed ID is used to add the new entry. In this case
- * adding the entry may fail as specified later in this comment.
- *
- * When 'use_id' is used alongside with a zero 'seq-given', the sequence
- * part of the passed ID is ignored and the function will attempt to use an
- * auto-generated sequence.
- *
- * The function returns C_OK if the item was added, this is always true
- * if the ID was generated by the function. However the function may return
- * C_ERR in several cases:
- * 1. If an ID was given via 'use_id', but adding it failed since the
- * current top ID is greater or equal. errno will be set to EDOM.
- * 2. If a size of a single element or the sum of the elements is too big to
- * be stored into the stream. errno will be set to ERANGE. */
-int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) {
-
- /* Generate the new entry ID. */
- streamID id;
- if (use_id) {
- if (seq_given) {
- id = *use_id;
- } else {
- /* The automatically generated sequence can be either zero (new
- * timestamps) or the incremented sequence of the last ID. In the
- * latter case, we need to prevent an overflow/advancing forward
- * in time. */
- if (s->last_id.ms == use_id->ms) {
- if (s->last_id.seq == UINT64_MAX) {
- errno = EDOM;
- return C_ERR;
- }
- id = s->last_id;
- id.seq++;
- } else {
- id = *use_id;
- }
- }
- } else {
- streamNextID(&s->last_id,&id);
- }
-
- /* Check that the new ID is greater than the last entry ID
- * or return an error. Automatically generated IDs might
- * overflow (and wrap-around) when incrementing the sequence
- part. */
- if (streamCompareID(&id,&s->last_id) <= 0) {
- errno = EDOM;
- return C_ERR;
- }
-
- /* Avoid overflow when trying to add an element to the stream (listpack
- * can only host up to 32bit length strings, and also a total listpack size
- * can't be bigger than 32bit length. */
- size_t totelelen = 0;
- for (int64_t i = 0; i < numfields*2; i++) {
- sds ele = argv[i]->ptr;
- totelelen += sdslen(ele);
- }
- if (totelelen > STREAM_LISTPACK_MAX_SIZE) {
- errno = ERANGE;
- return C_ERR;
- }
-
- /* Add the new entry. */
- raxIterator ri;
- raxStart(&ri,s->rax);
- raxSeek(&ri,"$",NULL,0);
-
- size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
- unsigned char *lp = NULL; /* Tail listpack pointer. */
-
- if (!raxEOF(&ri)) {
- /* Get a reference to the tail node listpack. */
- lp = ri.data;
- lp_bytes = lpBytes(lp);
- }
- raxStop(&ri);
-
- /* We have to add the key into the radix tree in lexicographic order,
- * to do so we consider the ID as a single 128 bit number written in
- * big endian, so that the most significant bytes are the first ones. */
- uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
- streamID master_id; /* ID of the master entry in the listpack. */
-
- /* Create a new listpack and radix tree node if needed. Note that when
- * a new listpack is created, we populate it with a "master entry". This
- * is just a set of fields that is taken as references in order to compress
- * the stream entries that we'll add inside the listpack.
- *
- * Note that while we use the first added entry fields to create
- * the master entry, the first added entry is NOT represented in the master
- * entry, which is a stand alone object. But of course, the first entry
- * will compress well because it's used as reference.
- *
- * The master entry is composed like in the following example:
- *
- * +-------+---------+------------+---------+--/--+---------+---------+-+
- * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
- * +-------+---------+------------+---------+--/--+---------+---------+-+
- *
- * count and deleted just represent respectively the total number of
- * entries inside the listpack that are valid, and marked as deleted
- * (deleted flag in the entry flags set). So the total number of items
- * actually inside the listpack (both deleted and not) is count+deleted.
- *
- * The real entries will be encoded with an ID that is just the
- * millisecond and sequence difference compared to the key stored at
- * the radix tree node containing the listpack (delta encoding), and
- * if the fields of the entry are the same as the master entry fields, the
- * entry flags will specify this fact and the entry fields and number
- * of fields will be omitted (see later in the code of this function).
- *
- * The "0" entry at the end is the same as the 'lp-count' entry in the
- * regular stream entries (see below), and marks the fact that there are
- * no more entries, when we scan the stream from right to left. */
-
- /* First of all, check if we can append to the current macro node or
- * if we need to switch to the next one. 'lp' will be set to NULL if
- * the current node is full. */
- if (lp != NULL) {
- int new_node = 0;
- size_t node_max_bytes = server.stream_node_max_bytes;
- if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
- node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
- if (lp_bytes + totelelen >= node_max_bytes) {
- new_node = 1;
- } else if (server.stream_node_max_entries) {
- unsigned char *lp_ele = lpFirst(lp);
- /* Count both live entries and deleted ones. */
- int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele));
- if (count >= server.stream_node_max_entries) new_node = 1;
- }
-
- if (new_node) {
- /* Shrink extra pre-allocated memory */
- lp = lpShrinkToFit(lp);
- s->alloc_size -= lp_bytes;
- s->alloc_size += lpBytes(lp);
- if (ri.data != lp)
- raxSetData(ri.node, lp);
- lp = NULL;
- }
- }
-
- int flags = STREAM_ITEM_FLAG_NONE;
- if (lp == NULL) {
- master_id = id;
- streamEncodeID(rax_key,&id);
- /* Create the listpack having the master entry ID and fields.
- * Pre-allocate some bytes when creating listpack to avoid realloc on
- * every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
- * and won't realloc on every XADD.
- * When listpack reaches max number of entries, we'll shrink the
- * allocation to fit the data. */
- size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
- if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
- prealloc = server.stream_node_max_bytes;
- }
- lp = lpNew(prealloc);
- lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
- lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
- lp = lpAppendInteger(lp,numfields);
- for (int64_t i = 0; i < numfields; i++) {
- sds field = argv[i*2]->ptr;
- lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
- }
- lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
- s->alloc_size += lpBytes(lp);
- raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
- /* The first entry we insert, has obviously the same fields of the
- * master entry. */
- flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
- } else {
- serverAssert(ri.key_len == sizeof(rax_key));
- memcpy(rax_key,ri.key,sizeof(rax_key));
-
- /* Read the master ID from the radix tree key. */
- streamDecodeID(rax_key,&master_id);
- unsigned char *lp_ele = lpFirst(lp);
-
- /* Update count and skip the deleted fields. */
- int64_t count = lpGetInteger(lp_ele);
- size_t oldsize = lpBytes(lp);
- lp = lpReplaceInteger(lp,&lp_ele,count+1);
- s->alloc_size -= oldsize;
- s->alloc_size += lpBytes(lp);
- lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
- lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
-
- /* Check if the entry we are adding, have the same fields
- * as the master entry. */
- int64_t master_fields_count = lpGetInteger(lp_ele);
- lp_ele = lpNext(lp,lp_ele);
- if (numfields == master_fields_count) {
- int64_t i;
- for (i = 0; i < master_fields_count; i++) {
- sds field = argv[i*2]->ptr;
- int64_t e_len;
- unsigned char buf[LP_INTBUF_SIZE];
- unsigned char *e = lpGet(lp_ele,&e_len,buf);
- /* Stop if there is a mismatch. */
- if (sdslen(field) != (size_t)e_len ||
- memcmp(e,field,e_len) != 0) break;
- lp_ele = lpNext(lp,lp_ele);
- }
- /* All fields are the same! We can compress the field names
- * setting a single bit in the flags. */
- if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
- }
- }
-
- /* Populate the listpack with the new entry. We use the following
- * encoding:
- *
- * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
- * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
- * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
- *
- * However if the SAMEFIELD flag is set, we have just to populate
- * the entry with the values, so it becomes:
- *
- * +-----+--------+-------+-/-+-------+--------+
- * |flags|entry-id|value-1|...|value-N|lp-count|
- * +-----+--------+-------+-/-+-------+--------+
- *
- * The entry-id field is actually two separated fields: the ms
- * and seq difference compared to the master entry.
- *
- * The lp-count field is a number that states the number of listpack pieces
- * that compose the entry, so that it's possible to travel the entry
- * in reverse order: we can just start from the end of the listpack, read
- * the entry, and jump back N times to seek the "flags" field to read
- * the stream full entry. */
- size_t oldsize = lpBytes(lp);
- lp = lpAppendInteger(lp,flags);
- lp = lpAppendInteger(lp,id.ms - master_id.ms);
- lp = lpAppendInteger(lp,id.seq - master_id.seq);
- if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
- lp = lpAppendInteger(lp,numfields);
- for (int64_t i = 0; i < numfields; i++) {
- sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
- if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
- lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
- lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
- }
- /* Compute and store the lp-count field. */
- int64_t lp_count = numfields;
- lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
- if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
- /* If the item is not compressed, it also has the fields other than
- * the values, and an additional num-fields field. */
- lp_count += numfields+1;
- }
- lp = lpAppendInteger(lp,lp_count);
- s->alloc_size -= oldsize;
- s->alloc_size += lpBytes(lp);
-
- /* Insert back into the tree in order to update the listpack pointer. */
- if (ri.data != lp)
- raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
- s->length++;
- s->entries_added++;
- s->last_id = id;
- if (s->length == 1) s->first_id = id;
- if (added_id) *added_id = id;
- return C_OK;
-}
-
-typedef struct {
- /* XADD options */
- streamID id; /* User-provided ID, for XADD only. */
- int id_given; /* Was an ID different than "*" specified? for XADD only. */
- int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
- int no_mkstream; /* if set to 1 do not create new stream */
- robj *idmp_pid; /* IDMP producer id parameter, for XADD only. */
- robj *idmp_iid; /* IDMP idempotent id parameter, for XADD only. */
- int idmp_auto; /* If set to 1, auto-generate IID from field-value pairs, for XADD only. */
-
- /* XADD + XTRIM common options */
- int trim_strategy; /* TRIM_STRATEGY_* */
- int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
- int delete_strategy; /* DELETE_STRATEGY_* */
- int approx_trim; /* If 1 only delete whole radix tree nodes, so
- * the trim argument is not applied verbatim.
- * Note: This flag is ignored when delete_strategy is non-KEEPREF.
- * Individual entries may still be processed for consumer groups. */
- long long limit; /* Maximum amount of entries to trim. If 0, no limitation
- * on the amount of trimming work is enforced. */
- /* TRIM_STRATEGY_MAXLEN options */
- long long maxlen; /* After trimming, leave stream at this length . */
- /* TRIM_STRATEGY_MINID options */
- streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
-} streamAddTrimArgs;
-
-#define TRIM_STRATEGY_NONE 0
-#define TRIM_STRATEGY_MAXLEN 1
-#define TRIM_STRATEGY_MINID 2
-
-typedef struct {
- int startidx; /* Starting index of IDs in argv */
- long numids; /* Number of IDs to process */
- int delete_strategy; /* DELETE_STRATEGY_* */
-} streamAckDelArgs;
-
-#define DELETE_STRATEGY_NONE 0
-#define DELETE_STRATEGY_KEEPREF 1 /* Delete and keep references */
-#define DELETE_STRATEGY_DELREF 2 /* Delete from pending entries list */
-#define DELETE_STRATEGY_ACKED 3 /* Only delete messages that are acknowledged */
-
-/* Trim the stream 's' according to args->trim_strategy, and return the
- * number of elements removed from the stream. The 'approx' option, if non-zero,
- * specifies that the trimming must be performed in a approximated way in
- * order to maximize performances. This means that the stream may contain
- * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen'
- * in case of MAXLEN), and elements are only removed if we can remove
- * a *whole* node of the radix tree. The elements are removed from the head
- * of the stream (older elements).
- *
- * The function may return zero if:
- *
- * 1) The minimal entry ID of the stream is already < 'id' (MINID); or
- * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or
- * 3) The 'approx' option is true and the head node did not have enough elements
- * to be deleted.
- *
- * args->limit is the maximum number of entries to delete. The purpose is to
- * prevent this function from taking to long.
- * If 'limit' is 0 then we do not limit the number of deleted entries.
- * Much like the 'approx', if 'limit' is smaller than the number of entries
- * that should be trimmed, there is a chance we will still have entries with
- * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
- */
-int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
- size_t maxlen = args->maxlen;
- streamID *id = &args->minid;
- int approx = args->approx_trim;
- int64_t limit = args->limit;
- int trim_strategy = args->trim_strategy;
- int delete_strategy = args->delete_strategy;
-
- if (trim_strategy == TRIM_STRATEGY_NONE)
- return 0;
-
- raxIterator ri;
- raxStart(&ri,s->rax);
- raxSeek(&ri,"^",NULL,0);
-
- int64_t deleted = 0;
- while (raxNext(&ri)) {
- if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
- break;
-
- unsigned char *lp = ri.data, *p = lpFirst(lp);
- int64_t entries = lpGetInteger(p);
-
- /* Check if we exceeded the amount of work we could do */
- if (limit && (deleted + entries) > limit)
- break;
-
- /* Check if we can remove the whole node */
- int remove_node = 0; /* Final decision flag for node removal */
- int node_eligible_for_remove = 0; /* Whether node meets the basic criteria for removal */
- streamID master_id = {0};
- /* Read the master ID from the radix tree key. */
- streamDecodeID(ri.key, &master_id);
- if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
- node_eligible_for_remove = s->length - entries >= maxlen;
- } else {
- /* Read last ID. */
- streamID last_id = {0,0};
- lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
-
- /* We can remove the entire node id its last ID < 'id' */
- node_eligible_for_remove = streamCompareID(&last_id, id) < 0;
- }
-
- if (node_eligible_for_remove && delete_strategy == DELETE_STRATEGY_KEEPREF) {
- /* With KEEPREF strategy, we can remove the whole node directly since we don't need
- * to check or clean up consumer group references. */
- remove_node = 1;
- }
-
- if (remove_node) {
- s->alloc_size -= lpBytes(lp);
- lpFree(lp);
- raxRemove(s->rax,ri.key,ri.key_len,NULL);
- raxSeek(&ri,">=",ri.key,ri.key_len);
- s->length -= entries;
- deleted += entries;
- continue;
- }
-
- /* If we cannot remove a whole element, and approx is true,
- * stop here. However, for non-KEEPREF strategies, if the node was
- * eligible for removal but we couldn't remove it (because we need
- * to check consumer group references), we should continue to process
- * entries within this node. */
- if (approx && delete_strategy == DELETE_STRATEGY_KEEPREF) break;
-
- /* Now we have to trim entries from within 'lp' */
- size_t oldsize = lpBytes(lp);
- int64_t deleted_from_lp = 0;
-
- p = lpNext(lp, p); /* Skip deleted field. */
- p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */
-
- /* Skip all the master fields. */
- int64_t master_fields_count = lpGetInteger(p);
- p = lpNext(lp,p); /* Skip the first field. */
- for (int64_t j = 0; j < master_fields_count; j++)
- p = lpNext(lp,p); /* Skip all master fields. */
- p = lpNext(lp,p); /* Skip the zero master entry terminator. */
-
- /* 'p' is now pointing to the first entry inside the listpack.
- * We have to run entry after entry, marking entries as deleted
- * if they are already not deleted. */
- while (p) {
- /* We keep a copy of p (which point to flags part) in order to
- * update it after (and if) we actually remove the entry */
- unsigned char *pcopy = p;
-
- int64_t flags = lpGetInteger(p);
- p = lpNext(lp, p); /* Skip flags. */
- int64_t to_skip;
-
- int64_t ms_delta = lpGetInteger(p);
- p = lpNext(lp, p); /* Skip ID ms delta */
- int64_t seq_delta = lpGetInteger(p);
- p = lpNext(lp, p); /* Skip ID seq delta */
-
- streamID currid = {0};
- currid.ms = master_id.ms + ms_delta;
- currid.seq = master_id.seq + seq_delta;
-
- int stop;
- if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
- stop = s->length <= maxlen;
- } else {
- /* Following IDs will definitely be greater because the rax
- * tree is sorted, no point of continuing. */
- stop = streamCompareID(&currid, id) >= 0;
- }
- if (stop)
- break;
-
- if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
- to_skip = master_fields_count;
- } else {
- to_skip = lpGetInteger(p); /* Get num-fields. */
- p = lpNext(lp,p); /* Skip num-fields. */
- to_skip *= 2; /* Fields and values. */
- }
-
- while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
- p = lpNext(lp,p); /* Skip the final lp-count field. */
-
- /* Mark the entry as deleted if allowed. */
- if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
- int can_delete = 1;
- if (delete_strategy == DELETE_STRATEGY_ACKED) {
- /* Only delete entry that has been acknowledged by all consumer groups. */
- can_delete = (streamEntryIsReferenced(s, &currid) == 0);
- } else if (delete_strategy == DELETE_STRATEGY_DELREF) {
- /* Remove all consumer group references for this entry */
- streamCleanupEntryCGroupRefs(s, &currid);
- }
-
- if (can_delete) {
- /* Mark the entry as deleted. */
- intptr_t delta = p ? (p - lp) : 0; /* p may be NULL if this was the last entry */
- flags |= STREAM_ITEM_FLAG_DELETED;
- lp = lpReplaceInteger(lp, &pcopy, flags);
- deleted_from_lp++;
- s->length--;
- if (p) p = lp + delta;
- }
- }
- }
- deleted += deleted_from_lp;
- /* If this node was originally eligible for removal but we couldn't remove it upfront
- * due to delete strategy constraints, and now we've processed and deleted all entries
- * in the node, we can finally remove the entire node. */
- if (node_eligible_for_remove && deleted_from_lp == entries) {
- s->alloc_size -= oldsize;
- lpFree(lp);
- raxRemove(s->rax,ri.key,ri.key_len,NULL);
- raxSeek(&ri,">=",ri.key,ri.key_len);
- continue;
- }
-
- /* Now we update the entries/deleted counters. */
- p = lpFirst(lp);
- lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
- p = lpNext(lp,p); /* Skip deleted field. */
- int64_t marked_deleted = lpGetInteger(p);
- lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp);
- p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */
- s->alloc_size -= oldsize;
- s->alloc_size += lpBytes(lp);
-
- /* Here we should perform garbage collection in case at this point
- * there are too many entries deleted inside the listpack. */
- entries -= deleted_from_lp;
- marked_deleted += deleted_from_lp;
- if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
- /* TODO: perform a garbage collection. */
- }
-
- /* Update the node with the new pointer. */
- raxSetData(ri.node,lp);
-
- /* If the node is eligible for removal but we couldn't remove it due to delete strategy
- * constraints (we need to check each entry individually), continue to the next node
- * instead of stopping here. */
- if (node_eligible_for_remove)
- continue;
-
- break; /* If we are here, there was enough to delete in the current
- node, so no need to go to the next node. */
- }
- raxStop(&ri);
-
- /* Update the stream's first ID after the trimming. */
- if (s->length == 0) {
- s->first_id.ms = 0;
- s->first_id.seq = 0;
- } else if (deleted) {
- streamGetEdgeID(s,1,1,&s->first_id);
- }
-
- return deleted;
-}
-
-/* Trims a stream by length. Returns the number of deleted items. */
-int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
- streamAddTrimArgs args = {
- .trim_strategy = TRIM_STRATEGY_MAXLEN,
- .approx_trim = approx,
- .limit = approx ? 100 * server.stream_node_max_entries : 0,
- .maxlen = maxlen,
- .delete_strategy = DELETE_STRATEGY_KEEPREF
- };
- return streamTrim(s, &args);
-}
-
-/* Trims a stream by minimum ID. Returns the number of deleted items. */
-int64_t streamTrimByID(stream *s, streamID minid, int approx) {
- streamAddTrimArgs args = {
- .trim_strategy = TRIM_STRATEGY_MINID,
- .approx_trim = approx,
- .limit = approx ? 100 * server.stream_node_max_entries : 0,
- .minid = minid,
- .delete_strategy = DELETE_STRATEGY_KEEPREF
- };
- return streamTrim(s, &args);
-}
-
-/* Parse the arguments of XADD/XTRIM.
- *
- * See streamAddTrimArgs for more details about the arguments handled.
- *
- * This function returns the position of the ID argument (relevant only to XADD).
- * On error -1 is returned and a reply is sent. */
-static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) {
- /* Initialize arguments to defaults */
- memset(args, 0, sizeof(*args));
- args->delete_strategy = DELETE_STRATEGY_NONE;
-
- /* Parse options. */
- int i = 2; /* This is the first argument position where we could
- find an option, or the ID. */
- int limit_given = 0;
- for (; i < c->argc; i++) {
- int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
- char *opt = c->argv[i]->ptr;
- if (xadd && opt[0] == '*' && opt[1] == '\0') {
- /* This is just a fast path for the common case of auto-ID
- * creation. */
- break;
- } else if (!strcasecmp(opt,"maxlen") && moreargs) {
- if (args->trim_strategy != TRIM_STRATEGY_NONE) {
- addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
- return -1;
- }
- args->approx_trim = 0;
- char *next = c->argv[i+1]->ptr;
- /* Check for the form MAXLEN ~ <count>. */
- if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
- args->approx_trim = 1;
- i++;
- } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
- i++;
- }
- if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL)
- != C_OK) return -1;
-
- if (args->maxlen < 0) {
- addReplyError(c,"The MAXLEN argument must be >= 0.");
- return -1;
- }
- i++;
- args->trim_strategy = TRIM_STRATEGY_MAXLEN;
- args->trim_strategy_arg_idx = i;
- } else if (!strcasecmp(opt,"minid") && moreargs) {
- if (args->trim_strategy != TRIM_STRATEGY_NONE) {
- addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
- return -1;
- }
- args->approx_trim = 0;
- char *next = c->argv[i+1]->ptr;
- /* Check for the form MINID ~ <id> */
- if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
- args->approx_trim = 1;
- i++;
- } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
- i++;
- }
-
- if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK)
- return -1;
-
- i++;
- args->trim_strategy = TRIM_STRATEGY_MINID;
- args->trim_strategy_arg_idx = i;
- } else if (!strcasecmp(opt,"limit") && moreargs) {
- /* Note about LIMIT: If it was not provided by the caller we set
- * it to 100*server.stream_node_max_entries, and that's to prevent the
- * trimming from taking too long, on the expense of not deleting entries
- * that should be trimmed.
- * If user wanted exact trimming (i.e. no '~') we never limit the number
- * of trimmed entries */
- if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK)
- return -1;
-
- if (args->limit < 0) {
- addReplyError(c,"The LIMIT argument must be >= 0.");
- return -1;
- }
- limit_given = 1;
- i++;
- } else if (!strcasecmp(opt,"keepref") && args->delete_strategy == DELETE_STRATEGY_NONE) {
- args->delete_strategy = DELETE_STRATEGY_KEEPREF;
- } else if (!strcasecmp(opt,"delref") && args->delete_strategy == DELETE_STRATEGY_NONE) {
- args->delete_strategy = DELETE_STRATEGY_DELREF;
- } else if (!strcasecmp(opt,"acked") && args->delete_strategy == DELETE_STRATEGY_NONE) {
- args->delete_strategy = DELETE_STRATEGY_ACKED;
- } else if (xadd && !strcasecmp(opt,"nomkstream")) {
- args->no_mkstream = 1;
- } else if (xadd && !strcasecmp(opt,"idmpauto") && moreargs) {
- /* IDMPAUTO pid - auto-generate IID from field-value pairs */
- if (args->idmp_pid != NULL) {
- addReplyError(c,"syntax error, IDMP/IDMPAUTO specified multiple times");
- return -1;
- }
-
- size_t pid_len = sdslen((sds)c->argv[i+1]->ptr);
- if (pid_len == 0) {
- addReplyError(c, "syntax error, IDMPAUTO requires a non-empty producer ID");
- return -1;
- }
-
- args->idmp_pid = c->argv[i+1];
- args->idmp_auto = 1;
- i++;
- } else if (xadd && !strcasecmp(opt,"idmp") && moreargs >= 2) {
- /* IDMP pid iid - explicit producer ID and idempotent ID */
- if (args->idmp_pid != NULL) {
- addReplyError(c,"syntax error, IDMP/IDMPAUTO specified multiple times");
- return -1;
- }
-
- size_t pid_len = sdslen((sds)c->argv[i+1]->ptr);
- if (pid_len == 0) {
- addReplyError(c, "syntax error, IDMP requires a non-empty producer ID");
- return -1;
- }
-
- size_t iid_len = sdslen((sds)c->argv[i+2]->ptr);
- if (iid_len == 0) {
- addReplyError(c, "syntax error, IDMP requires a non-empty idempotent ID");
- return -1;
- }
-
- args->idmp_pid = c->argv[i+1];
- args->idmp_iid = c->argv[i+2];
- i += 2;
- } else if (xadd) {
- /* If we are here is a syntax error or a valid ID. */
- if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK)
- return -1;
-
- /* mustObeyClient is needed because IDMP can only be used with * (auto-generated IDs),
- * but when we replicate the message we replace the * with the actual StreamID. */
- if (args->idmp_pid && opt[0] != '*' && !mustObeyClient(c)) {
- addReplyError(c,"syntax error, IDMP/IDMPAUTO can be used only with auto-generated IDs");
- return -1;
- }
- args->id_given = 1;
- break;
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return -1;
- }
- }
-
- if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) {
- addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy");
- return -1;
- }
-
- if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) {
- addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy");
- return -1;
- }
-
- if (mustObeyClient(c)) {
- /* If command came from master or from AOF we must not enforce maxnodes
- * (The maxlen/minid argument was re-written to make sure there's no
- * inconsistency). */
- args->limit = 0;
- } else {
- /* We need to set the limit (only if we got '~') */
- if (limit_given) {
- if (!args->approx_trim) {
- /* LIMIT was provided without ~ */
- addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option");
- return -1;
- }
- } else {
- /* User didn't provide LIMIT, we must set it. */
- if (args->approx_trim) {
- /* In order to prevent from trimming to do too much work and
- * cause latency spikes we limit the amount of work it can do.
- * We have to cap args->limit from both sides in case
- * stream_node_max_entries is 0 or too big (could cause overflow)
- */
- args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */
- if (args->limit <= 0) args->limit = 10000;
- if (args->limit > 1000000) args->limit = 1000000;
- } else {
- /* No LIMIT for exact trimming */
- args->limit = 0;
- }
- }
- }
-
- /* Set default consumer group reference handling to KEEPREF if none was specified */
- if (args->delete_strategy == DELETE_STRATEGY_NONE)
- args->delete_strategy = DELETE_STRATEGY_KEEPREF;
-
- return i;
-}
-
-static int streamParseAckDelArgsOrReply(client *c, int start_pos, streamAckDelArgs *args) {
- /* Initialize arguments to defaults */
- memset(args, 0, sizeof(*args));
- args->startidx = -1;
- args->delete_strategy = DELETE_STRATEGY_NONE;
-
- /* Parse command options */
- int j = start_pos;
- while (j < c->argc) {
- char *opt = c->argv[j]->ptr;
- if (!strcasecmp(opt, "KEEPREF") && args->delete_strategy == DELETE_STRATEGY_NONE) {
- args->delete_strategy = DELETE_STRATEGY_KEEPREF;
- j++;
- } else if (!strcasecmp(opt, "DELREF") && args->delete_strategy == DELETE_STRATEGY_NONE) {
- args->delete_strategy = DELETE_STRATEGY_DELREF;
- j++;
- } else if (!strcasecmp(opt, "ACKED") && args->delete_strategy == DELETE_STRATEGY_NONE) {
- args->delete_strategy = DELETE_STRATEGY_ACKED;
- j++;
- } else if (!strcasecmp(opt, "IDS") && j+1 < c->argc) {
- /* Parse the number of IDs */
- if (getRangeLongFromObjectOrReply(c, c->argv[j+1], 1, LONG_MAX,
- &args->numids, "Number of IDs must be a positive integer") != C_OK)
- {
- return 0;
- }
-
- /* Verify that the specified number of IDs matches the actual arguments */
- if (args->numids > (c->argc - j - 2)) {
- addReplyError(c, "The `numids` parameter must match the number of arguments");
- return 0;
- }
-
- args->startidx = j + 2; /* Skip "IDS" and numids */
- j = args->startidx + args->numids;
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return 0;
- }
- }
-
- if (args->startidx == -1) {
- addReplyError(c, "IDS option is required");
- return 0;
- }
-
- /* Set default consumer group reference handling to KEEPREF if none was specified */
- if (args->delete_strategy == DELETE_STRATEGY_NONE)
- args->delete_strategy = DELETE_STRATEGY_KEEPREF;
-
- return 1;
-}
-
-/* Initialize the stream iterator, so that we can call iterating functions
- * to get the next items. This requires a corresponding streamIteratorStop()
- * at the end. The 'rev' parameter controls the direction. If it's zero the
- * iteration is from the start to the end element (inclusive), otherwise
- * if rev is non-zero, the iteration is reversed.
- *
- * Once the iterator is initialized, we iterate like this:
- *
- * streamIterator myiterator;
- * streamIteratorStart(&myiterator,...);
- * int64_t numfields;
- * while(streamIteratorGetID(&myiterator,&ID,&numfields)) {
- * while(numfields--) {
- * unsigned char *key, *value;
- * size_t key_len, value_len;
- * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
- *
- * ... do what you want with key and value ...
- * }
- * }
- * streamIteratorStop(&myiterator); */
-void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
- /* Initialize the iterator and translates the iteration start/stop
- * elements into a 128 big big-endian number. */
- if (start) {
- streamEncodeID(si->start_key,start);
- } else {
- si->start_key[0] = 0;
- si->start_key[1] = 0;
- }
-
- if (end) {
- streamEncodeID(si->end_key,end);
- } else {
- si->end_key[0] = UINT64_MAX;
- si->end_key[1] = UINT64_MAX;
- }
-
- /* Decode the big-endian keys into native 64-bit integers
- * for faster comparisons during iteration. */
- si->start_ms = htonu64(si->start_key[0]);
- si->start_seq = htonu64(si->start_key[1]);
- si->end_ms = htonu64(si->end_key[0]);
- si->end_seq = htonu64(si->end_key[1]);
-
- /* Seek the correct node in the radix tree. */
- raxStart(&si->ri,s->rax);
- if (!rev) {
- if (start && (start->ms || start->seq)) {
- raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
- sizeof(si->start_key));
- if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
- } else {
- raxSeek(&si->ri,"^",NULL,0);
- }
- } else {
- if (end && (end->ms || end->seq)) {
- raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
- sizeof(si->end_key));
- if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
- } else {
- raxSeek(&si->ri,"$",NULL,0);
- }
- }
- si->stream = s;
- si->lp = NULL; /* There is no current listpack right now. */
- si->lp_last_ele = NULL;
- si->lp_ele = NULL; /* Current listpack cursor. */
- si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
- si->skip_tombstones = 1; /* By default tombstones aren't emitted. */
-}
-
-/* Return 1 and store the current item ID at 'id' if there are still
- * elements within the iteration range, otherwise return 0 in order to
- * signal the iteration terminated. */
-int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
- while(1) { /* Will stop when element > stop_key or end of radix tree. */
- /* Record the previous lp_ele position to detect data corruption
- * that might cause the iterator to move backwards unexpectedly. */
- if (si->lp_ele && si->lp_last_ele)
- serverAssert(si->rev ? si->lp_ele < si->lp_last_ele : si->lp_ele > si->lp_last_ele);
- si->lp_last_ele = si->lp_ele;
-
- /* If the current listpack is set to NULL, this is the start of the
- * iteration or the previous listpack was completely iterated.
- * Go to the next node. */
- if (si->lp == NULL || si->lp_ele == NULL) {
- if (!si->rev && !raxNext(&si->ri)) return 0;
- else if (si->rev && !raxPrev(&si->ri)) return 0;
- serverAssert(si->ri.key_len == sizeof(streamID));
- /* Get the master ID. */
- streamDecodeID(si->ri.key,&si->master_id);
- /* Get the master fields count. */
- si->lp = si->ri.data;
- si->lp_ele = lpFirst(si->lp); /* Seek items count */
- si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
- si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
- si->master_fields_count = lpGetInteger(si->lp_ele);
- si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
- si->master_fields_start = si->lp_ele;
- /* We are now pointing to the first field of the master entry.
- * We need to seek either the first or the last entry depending
- * on the direction of the iteration. */
- if (!si->rev) {
- /* If we are iterating in normal order, skip the master fields
- * to seek the first actual entry. */
- for (uint64_t i = 0; i < si->master_fields_count; i++)
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- } else {
- /* If we are iterating in reverse direction, just seek the
- * last part of the last entry in the listpack (that is, the
- * fields count). */
- si->lp_ele = lpLast(si->lp);
- }
- } else if (si->rev) {
- /* If we are iterating in the reverse order, and this is not
- * the first entry emitted for this listpack, then we already
- * emitted the current entry, and have to go back to the previous
- * one. */
- int64_t lp_count = lpGetInteger(si->lp_ele);
- while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
- /* Seek lp-count of prev entry. */
- si->lp_ele = lpPrev(si->lp,si->lp_ele);
- }
-
- /* For every radix tree node, iterate the corresponding listpack,
- * returning elements when they are within range. */
- while(1) {
- if (!si->rev) {
- /* If we are going forward, skip the previous entry
- * lp-count field (or in case of the master entry, the zero
- * term field) */
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- if (si->lp_ele == NULL) break;
- } else {
- /* If we are going backward, read the number of elements this
- * entry is composed of, and jump backward N times to seek
- * its start. */
- int64_t lp_count = lpGetInteger(si->lp_ele);
- if (lp_count == 0) { /* We reached the master entry. */
- si->lp = NULL;
- si->lp_ele = NULL;
- break;
- }
- while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
- }
-
- /* Get the flags entry. */
- si->lp_flags = si->lp_ele;
- int64_t flags = lpGetInteger(si->lp_ele);
- si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
-
- /* Get the ID: it is encoded as difference between the master
- * ID and this entry ID. */
- *id = si->master_id;
- id->ms += lpGetInteger(si->lp_ele);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- id->seq += lpGetInteger(si->lp_ele);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
-
- /* The number of entries is here or not depending on the
- * flags. */
- if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
- *numfields = si->master_fields_count;
- } else {
- *numfields = lpGetInteger(si->lp_ele);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- }
- serverAssert(*numfields>=0);
-
- /* If current >= start, and the entry is not marked as
- * deleted or tombstones are included, emit it. */
- if (!si->rev) {
- if ((id->ms > si->start_ms ||
- (id->ms == si->start_ms && id->seq >= si->start_seq)) &&
- (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
- {
- if (id->ms > si->end_ms ||
- (id->ms == si->end_ms && id->seq > si->end_seq))
- return 0; /* We are already out of range. */
- si->entry_flags = flags;
- if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
- si->master_fields_ptr = si->master_fields_start;
- return 1; /* Valid item returned. */
- }
- } else {
- if ((id->ms < si->end_ms ||
- (id->ms == si->end_ms && id->seq <= si->end_seq)) &&
- (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
- {
- if (id->ms < si->start_ms ||
- (id->ms == si->start_ms && id->seq < si->start_seq))
- return 0; /* We are already out of range. */
- si->entry_flags = flags;
- if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
- si->master_fields_ptr = si->master_fields_start;
- return 1; /* Valid item returned. */
- }
- }
-
- /* If we do not emit, we have to discard if we are going
- * forward, or seek the previous entry if we are going
- * backward. */
- if (!si->rev) {
- int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
- *numfields : *numfields*2;
- for (int64_t i = 0; i < to_discard; i++)
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- } else {
- int64_t prev_times = 4; /* flag + id ms + id seq + one more to
- go back to the previous entry "count"
- field. */
- /* If the entry was not flagged SAMEFIELD we also read the
- * number of fields, so go back one more. */
- if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) prev_times++;
- while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
- }
- }
-
- /* End of listpack reached. Try the next/prev radix tree node. */
- }
-}
-
-/* Get the field and value of the current item we are iterating. This should
- * be called immediately after streamIteratorGetID(), and for each field
- * according to the number of fields returned by streamIteratorGetID().
- * The function populates the field and value pointers and the corresponding
- * lengths by reference, that are valid until the next iterator call, assuming
- * no one touches the stream meanwhile. */
-void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
- if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
- *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
- si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
- } else {
- *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- }
- *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
-}
-
-/* Remove the current entry from the stream: can be called after the
- * GetID() API or after any GetField() call, however we need to iterate
- * a valid entry while calling this function. Moreover the function
- * requires the entry ID we are currently iterating, that was previously
- * returned by GetID().
- *
- * Note that after calling this function, next calls to GetField() can't
- * be performed: the entry is now deleted. Instead the iterator will
- * automatically re-seek to the next entry, so the caller should continue
- * with GetID(). */
-void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
- stream *s = si->stream;
- unsigned char *lp = si->lp;
- size_t oldsize = lpBytes(lp);
- int64_t aux;
-
- /* We do not really delete the entry here. Instead we mark it as
- * deleted by flagging it, and also incrementing the count of the
- * deleted entries in the listpack header.
- *
- * We start flagging: */
- int64_t flags = lpGetInteger(si->lp_flags);
- flags |= STREAM_ITEM_FLAG_DELETED;
- lp = lpReplaceInteger(lp,&si->lp_flags,flags);
-
- /* Change the valid/deleted entries count in the master entry. */
- unsigned char *p = lpFirst(lp);
- aux = lpGetInteger(p);
-
- if (aux == 1) {
- /* If this is the last element in the listpack, we can remove the whole
- * node. */
- s->alloc_size -= oldsize;
- lpFree(lp);
- raxRemove(s->rax,si->ri.key,si->ri.key_len,NULL);
- } else {
- /* In the base case we alter the counters of valid/deleted entries. */
- lp = lpReplaceInteger(lp,&p,aux-1);
- p = lpNext(lp,p); /* Seek deleted field. */
- aux = lpGetInteger(p);
- lp = lpReplaceInteger(lp,&p,aux+1);
- s->alloc_size -= oldsize;
- s->alloc_size += lpBytes(lp);
-
- /* Update the listpack with the new pointer. */
- if (si->lp != lp)
- raxInsert(s->rax,si->ri.key,si->ri.key_len,lp,NULL);
- }
-
- /* Update the number of entries counter. */
- s->length--;
-
- /* Re-seek the iterator to fix the now messed up state. */
- streamID start, end;
- if (si->rev) {
- streamDecodeID(si->start_key,&start);
- end = *current;
- } else {
- start = *current;
- streamDecodeID(si->end_key,&end);
- }
- streamIteratorStop(si);
- streamIteratorStart(si,s,&start,&end,si->rev);
-
- /* TODO: perform a garbage collection here if the ratio between
- * deleted and valid goes over a certain limit. */
-}
-
-/* Stop the stream iterator. The only cleanup we need is to free the rax
- * iterator, since the stream iterator itself is supposed to be stack
- * allocated. */
-void streamIteratorStop(streamIterator *si) {
- raxStop(&si->ri);
-}
-
-/* Return 1 if `id` exists in `s` (and not marked as deleted) */
-int streamEntryExists(stream *s, streamID *id) {
- streamIterator si;
- streamIteratorStart(&si,s,id,id,0);
- streamID myid;
- int64_t numfields;
- int found = streamIteratorGetID(&si,&myid,&numfields);
- streamIteratorStop(&si);
- if (!found)
- return 0;
- serverAssert(streamCompareID(id,&myid) == 0);
- return 1;
-}
-
-/* Delete the specified item ID from the stream, returning 1 if the item
- * was deleted 0 otherwise (if it does not exist). */
-int streamDeleteItem(stream *s, streamID *id) {
- int deleted = 0;
- streamIterator si;
- streamIteratorStart(&si,s,id,id,0);
- streamID myid;
- int64_t numfields;
- if (streamIteratorGetID(&si,&myid,&numfields)) {
- streamIteratorRemoveEntry(&si,&myid);
- deleted = 1;
- }
- streamIteratorStop(&si);
- return deleted;
-}
-
-/* Get the last valid (non-tombstone) streamID of 's'. */
-void streamLastValidID(stream *s, streamID *maxid)
-{
- streamIterator si;
- streamIteratorStart(&si,s,NULL,NULL,1);
- int64_t numfields;
- if (!streamIteratorGetID(&si,maxid,&numfields) && s->length)
- serverPanic("Corrupt stream, length is %llu, but no max id", (unsigned long long)s->length);
- streamIteratorStop(&si);
-}
-
-/* Maximum size for a stream ID string. In theory 20*2+1 should be enough,
- * But to avoid chance for off by one issues and null-term, in case this will
- * be used as parsing buffer, we use a slightly larger buffer. On the other
- * hand considering sds header is gonna add 4 bytes, we wanna keep below the
- * allocator's 48 bytes bin. */
-#define STREAM_ID_STR_LEN 44
-
-sds createStreamIDString(streamID *id) {
- /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */
- sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN);
- sdssetlen(str, 0);
- return sdscatfmt(str,"%U-%U", id->ms,id->seq);
-}
-
-/* Emit a reply in the client output buffer by formatting a Stream ID
- * in the standard <ms>-<seq> format, using the simple string protocol
- * of REPL. */
-void addReplyStreamID(client *c, streamID *id) {
- addReplyBulkSds(c,createStreamIDString(id));
-}
-
-void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
- setDeferredReplyBulkSds(c, dr, createStreamIDString(id));
-}
-
-/* Similar to the above function, but just creates an object, usually useful
- * for replication purposes to create arguments. */
-robj *createObjectFromStreamID(streamID *id) {
- return createObject(OBJ_STRING, createStreamIDString(id));
-}
-
-/* Returns non-zero if the ID is 0-0. */
-int streamIDEqZero(streamID *id) {
- return !(id->ms || id->seq);
-}
-
-/* A helper that returns non-zero if the range from 'start' to `end`
- * contains a tombstone.
- *
- * NOTE: this assumes that the caller had verified that 'start' is less than
- * 's->last_id'. */
-int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
- streamID start_id, end_id;
-
- if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
- /* The stream is empty or has no tombstones. */
- return 0;
- }
-
- if (start) {
- start_id = *start;
- } else {
- start_id.ms = 0;
- start_id.seq = 0;
- }
-
- if (end) {
- end_id = *end;
- } else {
- end_id.ms = UINT64_MAX;
- end_id.seq = UINT64_MAX;
- }
-
- if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 &&
- streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0)
- {
- /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */
- return 1;
- }
-
- /* The range doesn't includes a tombstone. */
- return 0;
-}
-
-/* Replies with a consumer group's current lag, that is the number of messages
- * in the stream that are yet to be delivered. In case that the lag isn't
- * available due to fragmentation, the reply to the client is a null. */
-void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
- int valid = 0;
- long long lag = 0;
-
- if (!s->entries_added) {
- /* The lag of a newly-initialized stream is 0. */
- lag = 0;
- valid = 1;
- } else if (!s->length) { /* All entries deleted, now empty. */
- lag = 0;
- valid = 1;
- } else if (streamCompareID(&cg->last_id,&s->first_id) < 0 &&
- streamCompareID(&s->max_deleted_entry_id,&s->first_id) < 0)
- {
- /* When both the consumer group's last_id and the maximum tombstone are behind
- * the stream's first entry, the consumer group's lag will always be equal to
- * the number of remainin entries in the stream. */
- lag = s->length;
- valid = 1;
- } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
- /* No fragmentation ahead means that the group's logical reads counter
- * is valid for performing the lag calculation. */
- lag = (long long)s->entries_added - cg->entries_read;
- valid = 1;
- } else {
- /* Attempt to retrieve the group's last ID logical read counter. */
- long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
- if (entries_read != SCG_INVALID_ENTRIES_READ) {
- /* A valid counter was obtained. */
- lag = (long long)s->entries_added - entries_read;
- valid = 1;
- }
- }
-
- if (valid) {
- addReplyLongLong(c,lag);
- } else {
- addReplyNull(c);
- }
-}
-
-/* This function returns a value that is the ID's logical read counter, or its
- * distance (the number of entries) from the first entry ever to have been added
- * to the stream.
- *
- * A counter is returned only in one of the following cases:
- * 1. The ID is the same as the stream's last ID. In this case, the returned
- * is the same as the stream's entries_added counter.
- * 2. The ID equals that of the currently first entry in the stream, and the
- * stream has no tombstones. The returned value, in this case, is the result
- * of subtracting the stream's length from its added_entries, incremented by
- * one.
- * 3. The ID less than the stream's first current entry's ID, and there are no
- * tombstones. Here the estimated counter is the result of subtracting the
- * stream's length from its added_entries.
- * 4. The stream's added_entries is zero, meaning that no entries were ever
- * added.
- *
- * The special return value of ULLONG_MAX signals that the counter's value isn't
- * obtainable. It is returned in these cases:
- * 1. The provided ID, if it even exists, is somewhere between the stream's
- * current first and last entries' IDs, or in the future.
- * 2. The stream contains one or more tombstones. */
-long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
- /* The counter of any ID in an empty, never-before-used stream is 0. */
- if (!s->entries_added) {
- return 0;
- }
-
- /* In the empty stream, if the ID is smaller or equal to the last ID,
- * it can set to the current added_entries value. */
- if (!s->length && streamCompareID(id,&s->last_id) < 1) {
- return s->entries_added;
- }
-
- /* There are fragmentations between the `id` and the stream's last-generated-id. */
- if (!streamIDEqZero(id) && streamCompareID(id,&s->max_deleted_entry_id) < 0)
- return SCG_INVALID_ENTRIES_READ;
-
- int cmp_last = streamCompareID(id,&s->last_id);
- if (cmp_last == 0) {
- /* Return the exact counter of the last entry in the stream. */
- return s->entries_added;
- } else if (cmp_last > 0) {
- /* The counter of a future ID is unknown. */
- return SCG_INVALID_ENTRIES_READ;
- }
-
- int cmp_id_first = streamCompareID(id,&s->first_id);
- int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id);
- if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) {
- /* There's definitely no fragmentation ahead. */
- if (cmp_id_first < 0) {
- /* Return the estimated counter. */
- return s->entries_added - s->length;
- } else if (cmp_id_first == 0) {
- /* Return the exact counter of the first entry in the stream. */
- return s->entries_added - s->length + 1;
- }
- }
-
- /* The ID is either before an XDEL that fragments the stream or an arbitrary
- * ID. Either case, so we can't make a prediction. */
- return SCG_INVALID_ENTRIES_READ;
-}
-
-/* Copy-free version of streamPropagateXCLAIM that expects pre-created robj* arguments.
- * This is useful when propagating multiple XCLAIMs in a loop to avoid repeated
- * object creation/destruction overhead. */
-static inline void streamPropagateXCLAIMCopyFree(int dbid, robj *key, robj *group_last_id, robj *groupname, robj *id, robj *consumername, robj *delivery_time, robj *delivery_count) {
- /* We need to generate an XCLAIM that will work in a idempotent fashion:
- *
- * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
- * RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
- *
- * Note that JUSTID is useful in order to avoid that XCLAIM will do
- * useless work in the slave side, trying to fetch the stream item. */
- robj *argv[14];
- argv[0] = shared.xclaim;
- argv[1] = key;
- argv[2] = groupname;
- argv[3] = consumername;
- argv[4] = shared.integers[0];
- argv[5] = id;
- argv[6] = shared.time;
- argv[7] = delivery_time;
- argv[8] = shared.retrycount;
- argv[9] = delivery_count;
- argv[10] = shared.force;
- argv[11] = shared.justid;
- argv[12] = shared.lastid;
- argv[13] = group_last_id;
-
- alsoPropagate(dbid,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
-}
-
-/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
- * are created in the pending list of the stream and consumers. We need
- * to propagate this changes in the form of XCLAIM commands. */
-static inline void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
- robj *consumername = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
- robj *delivery_time = createStringObjectFromLongLong(nack->delivery_time);
- robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
- robj *group_last_id = createObjectFromStreamID(&group->last_id);
-
- streamPropagateXCLAIMCopyFree(c->db->id, key, group_last_id, groupname, id, consumername, delivery_time, delivery_count);
-
- decrRefCount(consumername);
- decrRefCount(delivery_time);
- decrRefCount(delivery_count);
- decrRefCount(group_last_id);
-}
-
-/* We need this when we want to propagate the new last-id of a consumer group
- * that was consumed by XREADGROUP with the NOACK option: in that case we can't
- * propagate the last ID just using the XCLAIM LASTID option, so we emit
- *
- * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read>
- */
-void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
- robj *argv[7];
- argv[0] = shared.xgroup;
- argv[1] = shared.setid;
- argv[2] = key;
- argv[3] = groupname;
- argv[4] = createObjectFromStreamID(&group->last_id);
- argv[5] = shared.entriesread;
- argv[6] = createStringObjectFromLongLong(group->entries_read);
-
- alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL);
-
- decrRefCount(argv[4]);
- decrRefCount(argv[6]);
-}
-
-/* We need this when we want to propagate creation of consumer that was created
- * by XREADGROUP with the NOACK option. In that case, the only way to create
- * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140)
- *
- * XGROUP CREATECONSUMER <key> <groupname> <consumername>
- */
-void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) {
- robj *argv[5];
- argv[0] = shared.xgroup;
- argv[1] = shared.createconsumer;
- argv[2] = key;
- argv[3] = groupname;
- argv[4] = createObject(OBJ_STRING,sdsdup(consumername));
-
- alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
-
- decrRefCount(argv[4]);
-}
-
-/* Send the stream items in the specified range to the client 'c'. The range
- * the client will receive is between start and end inclusive, if 'count' is
- * non zero, no more than 'count' elements are sent.
- *
- * The 'end' pointer can be NULL to mean that we want all the elements from
- * 'start' till the end of the stream. If 'rev' is non zero, elements are
- * produced in reversed order from end to start.
- *
- * The function returns the number of entries emitted.
- *
- * If 'min_idle_time' is not -1 and a group is specified, the function first
- * processes pending entries (from the group's PEL) that have been idle for at
- * least 'min_idle_time' milliseconds, claiming them for the specified consumer.
- * Each claimed entry is returned as a four-element array: ID, field-value pairs,
- * idle time, and delivery count. The NACK is transferred from the previous
- * consumer to the new consumer with updated delivery metadata.
- *
- * If group and consumer are not NULL, the function performs additional work:
- * 1. It updates the last delivered ID in the group in case we are
- * sending IDs greater than the current last ID.
- * 2. If the requested IDs are already assigned to some other consumer, the
- * function will not return it to the client.
- * 3. An entry in the pending list will be created for every entry delivered
- * for the first time to this consumer.
- * 4. The group's read counter is incremented if it is already valid and there
- * are no future tombstones, or is invalidated (set to 0) otherwise. If the
- * counter is invalid to begin with, we try to obtain it for the last
- * delivered ID.
- *
- * The behavior may be modified passing non-zero flags:
- *
- * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above
- * is not performed.
- * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
- * and return the number of entries emitted as usually.
- * This is used when the function is just used in order
- * to emit data and there is some higher level logic.
- * STREAM_RWR_HISTORY: Return entries from the consumer's own PEL history only.
- * STREAM_RWR_CLAIMED: Return only claimable entries from the PEL. New entries
- * from the stream are not returned.
- *
- * The final argument 'spi' (stream propagation info pointer) is a structure
- * filled with information needed to propagate the command execution to AOF
- * and slaves, in the case a consumer group was passed: we need to generate
- * XCLAIM commands to create the pending list into AOF/slaves in that case.
- *
- * If 'spi' is set to NULL no propagation will happen even if the group was
- * given, but currently such a feature is never used by the code base that
- * will always pass 'spi' and propagate when a group is passed.
- *
- * Note that this function is recursive in certain cases. When it's called
- * with a non NULL group and consumer argument, it may call
- * streamReplyWithRangeFromConsumerPEL() in order to get entries from the
- * consumer pending entries list. However such a function will then call
- * streamReplyWithRange() in order to emit single entries (found in the
- * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
- * flag. */
-#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
-#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
- boundaries, just the entries. */
-#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
-#define STREAM_RWR_CLAIMED (1<<3) /* Only serve claimed entries from PEL. */
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, long long min_idle_time, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount) {
- void *arraylen_ptr = NULL;
- size_t arraylen = 0;
- streamIterator si;
- int64_t numfields;
- streamID id;
- int propagate_last_id = 0;
- int noack = flags & STREAM_RWR_NOACK;
- const int db_id = c->db->id;
- const mstime_t cmd_time_snapshot = commandTimeSnapshot();
- /* to be used in case of stream propagation */
- robj *consumername = NULL;
- robj *delivery_time = NULL;
- robj *group_last_id = NULL;
- if (spi && consumer) {
- consumername = createStringObject(consumer->name,sdslen(consumer->name));
- delivery_time = createStringObjectFromLongLong(cmd_time_snapshot);
- group_last_id = createObjectFromStreamID(&group->last_id);
- }
- if (propCount) *propCount = 0;
-
- if (group && min_idle_time != -1) {
- arraylen_ptr = addReplyDeferredLen(c);
- /* Scan the group's pending entries list (PEL) to find messages that have been
- * idle for at least min_idle_time milliseconds. The pel_by_time radix tree
- * stores entries ordered by their last delivery timestamp, allowing us to
- * efficiently iterate from oldest to newest.
- *
- * We collect eligible entries into a temporary list rather than processing
- * them inline because:
- * 1. We cannot safely modify a radix tree while iterating over it
- * 2. The claiming process requires removing and re-inserting entries in
- * both pel_by_time and the consumer PELs
- *
- * The iteration can terminate early in two cases:
- * 1. We find an entry that hasn't been idle long enough - due to time-based
- * ordering, all subsequent entries will be even newer
- * 2. We've collected enough entries to satisfy the requested count limit */
- list *eligible_pels = listCreate();
- listSetFreeMethod(eligible_pels, zfree);
- raxIterator ri;
- raxStart(&ri, group->pel_by_time);
- raxSeek(&ri, "^", NULL, 0);
- while (raxNext(&ri)) {
- pelTimeKey pelKey;
- decodePelTimeKey(ri.key, &pelKey);
- uint64_t idle = cmd_time_snapshot - pelKey.delivery_time;
- if (idle < (uint64_t)min_idle_time)
- break;
-
- /* Store a copy of the key for later processing */
- pelTimeKey *keyCopy = zmalloc(sizeof(pelTimeKey));
- memcpy(keyCopy, &pelKey, sizeof(pelTimeKey));
- listAddNodeTail(eligible_pels, keyCopy);
-
- if (count && listLength(eligible_pels) >= count) break;
- }
- raxStop(&ri);
-
- /* Process each eligible pending entry, claiming it for the current consumer.
- * For each entry we:
- * 1. Fetch the actual message data from the stream
- * 2. Send the message to the client with metadata (idle time, delivery count)
- * 3. Transfer ownership from the previous consumer to the current consumer
- * 4. Update all relevant data structures and propagate the claim operation */
- listIter li;
- listNode *ln;
- listRewind(eligible_pels, &li);
- while ((ln = listNext(&li))) {
- pelTimeKey *pelKey = (pelTimeKey*)listNodeValue(ln);
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf, &pelKey->id);
-
- void *result;
- streamNACK *nack = NULL;
- uint64_t delivery_count = 0;
- /* Must exist, we got the ID from pel_by_time */
- serverAssert(raxFind(group->pel,buf,sizeof(buf),&result));
-
- nack = (streamNACK*)result;
- delivery_count = nack->delivery_count;
-
- streamID pel_id;
- streamIteratorStart(&si,s,&pelKey->id,&pelKey->id,rev);
- if (streamIteratorGetID(&si,&pel_id,&numfields)) {
- /* Emit a four elements array: ID, array of field-value pairs,
- * idle time and delivery count. */
- robj *idarg = createObjectFromStreamID(&pel_id);
- addReplyArrayLen(c,4);
- addReplyBulk(c,idarg);
- addReplyArrayLen(c,numfields*2);
-
- /* Emit the field-value pairs. */
- while (numfields--) {
- unsigned char *key, *value;
- int64_t key_len, value_len;
- streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
- addReplyBulkCBuffer(c,key,key_len);
- addReplyBulkCBuffer(c,value,value_len);
- }
-
- uint64_t idle = cmd_time_snapshot - pelKey->delivery_time;
- addReplyLongLong(c, idle);
- addReplyLongLong(c, delivery_count);
-
- /* Remove the NACK from old consumer and time-based PEL. */
- raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &pel_id);
-
- /* Transfer NACK to new consumer with updated metadata. */
- nack->consumer = consumer;
- nack->delivery_time = cmd_time_snapshot;
- nack->delivery_count++;
- raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
- raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &pel_id);
-
- consumer->active_time = cmd_time_snapshot;
-
- /* Propagate as XCLAIM. */
- if (spi) {
- robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
- streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count);
- decrRefCount(delivery_count);
- if (propCount) (*propCount)++;
- }
- decrRefCount(idarg);
- arraylen++;
- }
- streamIteratorStop(&si);
- }
- listRelease(eligible_pels);
- }
- /* If the client is asking for some history, we serve it using a
- * different function, so that we return entries *solely* from its
- * own PEL. This ensures each consumer will always and only see
- * the history of messages delivered to it and not yet confirmed
- * as delivered. */
- if (group && (flags & STREAM_RWR_HISTORY)) {
- if (spi && consumer) {
- decrRefCount(delivery_time);
- decrRefCount(consumername);
- decrRefCount(group_last_id);
- }
- return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
- group, consumer);
- }
-
- /* Stop here if client only wants claimed entries or count is satisfied. */
- if ((group && (flags & STREAM_RWR_CLAIMED)) || (count && count == arraylen)) {
- if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
- if (spi && consumer) {
- decrRefCount(delivery_time);
- decrRefCount(consumername);
- decrRefCount(group_last_id);
- }
- return arraylen;
- }
-
- if (!(flags & STREAM_RWR_RAWENTRIES) && !arraylen_ptr)
- arraylen_ptr = addReplyDeferredLen(c);
- streamIteratorStart(&si,s,start,end,rev);
- while (streamIteratorGetID(&si,&id,&numfields)) {
- /* Update the group last_id if needed. */
- if (group && streamCompareID(&id,&group->last_id) > 0) {
- if (group->entries_read != SCG_INVALID_ENTRIES_READ &&
- streamCompareID(&group->last_id, &s->first_id) >= 0 &&
- !streamRangeHasTombstones(s,&group->last_id,NULL))
- {
- /* A valid counter and no tombstones between the group's last-delivered-id
- * and the stream's last-generated-id mean we can increment the read counter
- * to keep tracking the group's progress. */
- group->entries_read++;
- } else if (s->entries_added) {
- /* The group's counter may be invalid, so we try to obtain it. */
- group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
- }
- streamUpdateCGroupLastId(s, group, &id);
- /* In the past, we would only set it when NOACK was specified. And in
- * #9127, XCLAIM did not propagate entries_read in ACK, which would
- * cause entries_read to be inconsistent between master and replicas,
- * so here we call streamPropagateGroupID unconditionally. */
- propagate_last_id = 1;
- }
-
- if (min_idle_time != -1) {
- /* If min-idle-time is specified, we emit a four elements
- * array: ID, array of field-value pairs, idle time and delivery count. */
- addReplyArrayLen(c,4);
- } else {
- /* Emit a two elements array for each item. The first is
- * the ID, the second is an array of field-value pairs. */
- addReplyArrayLen(c,2);
- }
- robj *idarg = createObjectFromStreamID(&id);
- addReplyBulk(c,idarg);
- addReplyArrayLen(c,numfields*2);
-
- /* Emit the field-value pairs. */
- while (numfields--) {
- unsigned char *key, *value;
- int64_t key_len, value_len;
- streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
- addReplyBulkCBuffer(c,key,key_len);
- addReplyBulkCBuffer(c,value,value_len);
- }
-
- if (min_idle_time != -1) {
- /* For new entries idle time and delivery count is 0. */
- addReplyLongLong(c, 0);
- addReplyLongLong(c, 0);
- }
-
- /* If a group is passed, we need to create an entry in the
- * PEL (pending entries list) of this group *and* this consumer.
- *
- * Note that we cannot be sure about the fact the message is not
- * already owned by another consumer, because the admin is able
- * to change the consumer group last delivered ID using the
- * XGROUP SETID command. So if we find that there is already
- * a NACK for the entry, we need to associate it to the new
- * consumer. */
- if (group && !noack) {
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf,&id);
-
- /* Try to add a new NACK. Most of the time this will work and
- * will not require extra lookups. We'll fix the problem later
- * if we find that there is already an entry for this ID. */
- streamNACK *nack = streamCreateNACK(s, consumer);
- int group_inserted =
- raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
- int consumer_inserted =
- raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
-
- /* Now we can check if the entry was already busy, and
- * in that case reassign the entry to the new consumer,
- * or update it if the consumer is the same as before. */
- if (group_inserted == 0) {
- streamFreeNACK(s,nack);
- void *result;
- int found = raxFind(group->pel,buf,sizeof(buf),&result);
- serverAssert(found);
- nack = result;
- raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
- /* Remove old entry from the PEL by time. */
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
- /* Update the consumer and NACK metadata. */
- nack->consumer = consumer;
- nack->delivery_time = cmd_time_snapshot;
- nack->delivery_count = 1;
- /* Add the entry in the new consumer local PEL. */
- raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
- } else if (group_inserted == 1 && consumer_inserted == 1) {
- nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf);
- } else if (group_inserted == 1 && consumer_inserted == 0) {
- serverPanic("NACK half-created. Should not be possible.");
- }
-
- /* We have new NACK or updated existing one. */
- raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
-
- consumer->active_time = cmd_time_snapshot;
-
- /* Propagate as XCLAIM. */
- if (spi) {
- robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
- streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count);
- decrRefCount(delivery_count);
- if (propCount) (*propCount)++;
- }
- }
- decrRefCount(idarg);
- arraylen++;
- if (count && count == arraylen) break;
- }
-
- if (spi && consumer) {
- decrRefCount(delivery_time);
- decrRefCount(consumername);
- decrRefCount(group_last_id);
- }
-
- if (spi && propagate_last_id) {
- streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
- if (propCount) (*propCount)++;
- }
-
- streamIteratorStop(&si);
- if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
- return arraylen;
-}
-
-/* This is a helper function for streamReplyWithRange() when called with
- * group and consumer arguments, but with a range that is referring to already
- * delivered messages. In this case we just emit messages that are already
- * in the history of the consumer, fetching the IDs from its PEL.
- *
- * Note that this function does not have a 'rev' argument because it's not
- * possible to iterate in reverse using a group. Basically this function
- * is only called as a result of the XREADGROUP command.
- *
- * This function is more expensive because it needs to inspect the PEL and then
- * seek into the radix tree of the messages in order to emit the full message
- * to the client. However clients only reach this code path when they are
- * fetching the history of already retrieved messages, which is rare. */
-size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
- raxIterator ri;
- unsigned char startkey[sizeof(streamID)];
- unsigned char endkey[sizeof(streamID)];
- streamEncodeID(startkey,start);
- if (end) streamEncodeID(endkey,end);
-
- size_t arraylen = 0;
- void *arraylen_ptr = addReplyDeferredLen(c);
- raxStart(&ri,consumer->pel);
- raxSeek(&ri,">=",startkey,sizeof(startkey));
- while(raxNext(&ri) && (!count || arraylen < count)) {
- if (end && memcmp(ri.key,endkey,ri.key_len) > 0) break;
- streamID thisid;
- streamDecodeID(ri.key,&thisid);
- if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,-1,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL,NULL) == 0)
- {
- /* Note that we may have a not acknowledged entry in the PEL
- * about a message that's no longer here because was removed
- * by the user by other means. In that case we signal it emitting
- * the ID but then a NULL entry for the fields. */
- addReplyArrayLen(c,2);
- addReplyStreamID(c,&thisid);
- addReplyNullArray(c);
- } else {
- streamNACK *nack = ri.data;
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &thisid);
-
- nack->delivery_time = commandTimeSnapshot();
- nack->delivery_count++;
-
- raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &thisid);
- }
- arraylen++;
- }
- raxStop(&ri);
- setDeferredArrayLen(c,arraylen_ptr,arraylen);
- return arraylen;
-}
-
-/* -----------------------------------------------------------------------
- * Stream commands implementation
- * ----------------------------------------------------------------------- */
-
-/* Look the stream at 'key' and return the corresponding stream object.
- * The function creates a key setting it to an empty stream if needed. */
-kvobj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
- dictEntryLink link;
- kvobj *kv = lookupKeyWriteWithLink(c->db,key, &link);
- if (checkType(c, kv, OBJ_STREAM)) return NULL;
- if (kv != NULL) return kv;
-
- if (no_create) {
- addReplyNull(c);
- return NULL;
- }
- robj *o = createStreamObject();
- dbAddByLink(c->db, key, &o, &link);
- return o;
-}
-
-/* Parse a stream ID in the format given by clients to Redis, that is
- * <ms>-<seq>, and converts it into a streamID structure. If
- * the specified ID is invalid C_ERR is returned and an error is reported
- * to the client, otherwise C_OK is returned. The ID may be in incomplete
- * form, just stating the milliseconds time part of the stream. In such a case
- * the missing part is set according to the value of 'missing_seq' parameter.
- *
- * The IDs "-" and "+" specify respectively the minimum and maximum IDs
- * that can be represented. If 'strict' is set to 1, "-" and "+" will be
- * treated as an invalid ID.
- *
- * The ID form <ms>-* specifies a milliseconds-only ID, leaving the sequence part
- * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this
- * form is accepted and the argument is set to 0 unless the sequence part is
- * specified.
- *
- * If 'c' is set to NULL, no reply is sent to the client. */
-int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) {
- char buf[128];
- if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
- memcpy(buf,o->ptr,sdslen(o->ptr)+1);
-
- if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
- goto invalid;
-
- if (seq_given != NULL) {
- *seq_given = 1;
- }
-
- /* Handle the "-" and "+" special cases. */
- if (buf[0] == '-' && buf[1] == '\0') {
- id->ms = 0;
- id->seq = 0;
- return C_OK;
- } else if (buf[0] == '+' && buf[1] == '\0') {
- id->ms = UINT64_MAX;
- id->seq = UINT64_MAX;
- return C_OK;
- }
-
- /* Parse <ms>-<seq> form. */
- unsigned long long ms, seq;
- char *dot = strchr(buf,'-');
- if (dot) *dot = '\0';
- if (string2ull(buf,&ms) == 0) goto invalid;
- if (dot) {
- size_t seqlen = strlen(dot+1);
- if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') {
- /* Handle the <ms>-* form. */
- seq = 0;
- *seq_given = 0;
- } else if (string2ull(dot+1,&seq) == 0) {
- goto invalid;
- }
- } else {
- seq = missing_seq;
- }
- id->ms = ms;
- id->seq = seq;
- return C_OK;
-
-invalid:
- if (c) addReplyError(c,"Invalid stream ID specified as stream "
- "command argument");
- return C_ERR;
-}
-
-/* Wrapper for streamGenericParseIDOrReply() used by module API. */
-int streamParseID(const robj *o, streamID *id) {
- return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL);
-}
-
-/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
- * 0, to be used when - and + are acceptable IDs. */
-int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
- return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL);
-}
-
-/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
- * 1, to be used when we want to return an error if the special IDs + or -
- * are provided. */
-int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) {
- return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given);
-}
-
-/* Helper for parsing a stream ID that is a range query interval. When the
- * exclude argument is NULL, streamParseIDOrReply() is called and the interval
- * is treated as close (inclusive). Otherwise, the exclude argument is set if
- * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is
- * called in that case.
- */
-int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) {
- char *p = o->ptr;
- size_t len = sdslen(p);
- int invalid = 0;
-
- if (exclude != NULL) *exclude = (len > 1 && p[0] == '(');
- if (exclude != NULL && *exclude) {
- robj *t = createStringObject(p+1,len-1);
- invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR);
- decrRefCount(t);
- } else
- invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
- if (invalid)
- return C_ERR;
- return C_OK;
-}
-
-void streamRewriteApproxSpecifier(client *c, int idx) {
- rewriteClientCommandArgument(c,idx,shared.special_equals);
-}
-
-/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream>
- * otherwise trimming is no longer deterministic on replicas / AOF. */
-void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) {
- robj *arg;
- if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
- arg = createStringObjectFromLongLong(s->length);
- } else {
- streamID first_id;
- streamGetEdgeID(s,1,0,&first_id);
- arg = createObjectFromStreamID(&first_id);
- }
-
- rewriteClientCommandArgument(c,idx,arg);
- decrRefCount(arg);
-}
-
-/* XADD key [NOMKSTREAM] [KEEPREF | DELREF | ACKED] [IDMPAUTO pid | IDMP pid iid] [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] <ID or *> [field value] [field value] ... */
-void xaddCommand(client *c) {
- /* Parse options. */
- streamAddTrimArgs parsed_args;
- int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);
- if (idpos < 0)
- return; /* streamParseAddOrTrimArgsOrReply already replied. */
- int field_pos = idpos+1; /* The ID is always one argument before the first field */
-
- /* Check arity. */
- if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
- addReplyErrorArity(c);
- return;
- }
-
- /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
- * a new stream and have streamAppendItem fail, leaving an empty key in the
- * database. */
- if (parsed_args.id_given && parsed_args.seq_given &&
- parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
- {
- addReplyError(c,"The ID specified in XADD must be greater than 0-0");
- return;
- }
-
- /* Lookup the stream at key. */
- kvobj *kv;
- stream *s;
- if ((kv = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;
- s = kv->ptr;
- size_t old_alloc = s->alloc_size;
-
- /* IDMP: Check if IID already exists, save IID for later insertion */
- XXH128_hash_t hash;
- char *iid_str = NULL;
- size_t iid_len = 0;
- idmpProducer *producer = NULL;
- idmpEntry *entry = NULL;
-
- if (parsed_args.idmp_pid != NULL) {
- /* Get or create the producer for this pid */
- char *pid_str = parsed_args.idmp_pid->ptr;
- size_t pid_len = sdslen((sds)pid_str);
- producer = idmpGetOrCreateProducer(s, pid_str, pid_len);
-
- /* Get IID string based on option */
- if (parsed_args.idmp_auto) {
- /* Auto-generate IID by hashing field-value pairs */
- int64_t numfields = (c->argc - field_pos) / 2;
- if (createIdempotencyHash(&c->argv[field_pos], numfields, &hash) == C_ERR) {
- addReplyError(c, "Failed to create idempotency hash");
- return;
- }
- iid_str = (char *)&hash;
- iid_len = sizeof(hash);
- } else {
- /* Use user-provided IID directly */
- iid_str = parsed_args.idmp_iid->ptr;
- iid_len = sdslen((sds)iid_str);
- }
-
- /* Create entry for lookup and potential insertion */
- entry = idmpEntryCreate(iid_str, iid_len, &s->alloc_size);
-
- /* Check if IID already exists and reply if found */
- if (idmpLookupAndReply(s, producer, entry, c)) {
- /* IID already exists, free the entry and return */
- idmpEntryFree(entry, &s->alloc_size);
- return;
- }
- }
-
- /* Return ASAP if the stream has reached the last possible ID */
- if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
- addReplyError(c,"The stream has exhausted the last possible ID, "
- "unable to add more items");
- idmpEntryFree(entry, &s->alloc_size);
- return;
- }
-
- /* Append using the low level function and return the ID. */
- errno = 0;
- streamID id;
- if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
- &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR)
- {
- serverAssert(errno != 0);
- if (errno == EDOM)
- addReplyError(c,"The ID specified in XADD is equal or smaller than "
- "the target stream top item");
- else
- addReplyError(c,"Elements are too large to be stored");
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
- idmpEntryFree(entry, &s->alloc_size);
- return;
- }
- sds replyid = createStreamIDString(&id);
- addReplyBulkCBuffer(c, replyid, sdslen(replyid));
-
- /* IDMP: Insert the entry now that we have the actual ID */
- if (parsed_args.idmp_pid != NULL) {
- idmpInsertEntry(s, producer, entry, &id);
- trackStreamIdmpEntries(c, c->argv[1]);
- }
-
- notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
- server.dirty++;
-
- /* Trim if needed. */
- if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) {
- if (streamTrim(s, &parsed_args))
- notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
- if (parsed_args.approx_trim) {
- /* In case our trimming was limited (by LIMIT or by ~) we must
- * re-write the relevant trim argument to make sure there will be
- * no inconsistencies in AOF loading or in the replica.
- * It's enough to check only args->approx because there is no
- * way LIMIT is given without the ~ option. */
- streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
- streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
- }
- }
-
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
-
- keyModified(c,c->db,c->argv[1],kv,1);
-
- /* Let's rewrite the ID argument with the one actually generated for
- * AOF/replication propagation. */
- if (!parsed_args.id_given || !parsed_args.seq_given) {
- robj *idarg = createObject(OBJ_STRING, replyid);
- rewriteClientCommandArgument(c, idpos, idarg);
- decrRefCount(idarg);
- } else {
- sdsfree(replyid);
- }
-
- /* We need to signal to blocked clients that there is new data on this
- * stream. */
- signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
-}
-
-/* XRANGE/XREVRANGE actual implementation.
- * The 'start' and 'end' IDs are parsed as follows:
- * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX.
- * "-" and "+"" mean the minimal and maximal ID values, respectively.
- * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0
- * will match anything from 1-1 and 1-UINT64_MAX.
- */
-void xrangeGenericCommand(client *c, int rev) {
- kvobj *kv;
- stream *s;
- streamID startid, endid;
- long long count = -1;
- robj *startarg = rev ? c->argv[3] : c->argv[2];
- robj *endarg = rev ? c->argv[2] : c->argv[3];
- int startex = 0, endex = 0;
- size_t old_alloc;
-
- /* Parse start and end IDs. */
- if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK)
- return;
- if (startex && streamIncrID(&startid) != C_OK) {
- addReplyError(c,"invalid start ID for the interval");
- return;
- }
- if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK)
- return;
- if (endex && streamDecrID(&endid) != C_OK) {
- addReplyError(c,"invalid end ID for the interval");
- return;
- }
-
- /* Parse the COUNT option if any. */
- if (c->argc > 4) {
- for (int j = 4; j < c->argc; j++) {
- int additional = c->argc-j-1;
- if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
- if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
- != C_OK) return;
- if (count < 0) count = 0;
- j++; /* Consume additional arg. */
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
- }
- }
-
- /* Return the specified range to the user. */
- if ((kv = lookupKeyReadOrReply(c, c->argv[1], shared.emptyarray)) == NULL ||
- checkType(c, kv, OBJ_STREAM)) return;
-
- s = kv->ptr;
-
- if (count == 0) {
- addReplyNullArray(c);
- } else {
- if (count == -1) count = 0;
- old_alloc = s->alloc_size;
- streamReplyWithRange(c,s,&startid,&endid,count,rev,-1,NULL,NULL,0,NULL,NULL);
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
- }
-}
-
-/* XRANGE key start end [COUNT <n>] */
-void xrangeCommand(client *c) {
- xrangeGenericCommand(c,0);
-}
-
-/* XREVRANGE key end start [COUNT <n>] */
-void xrevrangeCommand(client *c) {
- xrangeGenericCommand(c,1);
-}
-
-/* XLEN key*/
-void xlenCommand(client *c) {
- kvobj *kv;
- if ((kv = lookupKeyReadOrReply(c, c->argv[1], shared.czero)) == NULL
- || checkType(c, kv, OBJ_STREAM)) return;
- stream *s = kv->ptr;
- addReplyLongLong(c,s->length);
-}
-
-/* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N
- * ID_1 ID_2 ... ID_N
- *
- * This function also implements the XREADGROUP command, which is like XREAD
- * but accepting the [GROUP group-name consumer-name] additional option.
- * This is useful because while XREAD is a read command and can be called
- * on slaves, XREADGROUP is not. */
-#define XREAD_BLOCKED_DEFAULT_COUNT 1000
-void xreadCommand(client *c) {
- long long min_idle_time = -1; /* -1 means, no IDLE argument given. */
- long long timeout = -1; /* -1 means, no BLOCK argument given. */
- long long count = 0;
- int streams_count = 0;
- int streams_arg = 0;
- int noack = 0; /* True if NOACK option was specified. */
- streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
- streamID *ids = static_ids;
- streamCG **groups = NULL;
- int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
- robj *groupname = NULL;
- robj *consumername = NULL;
- size_t old_alloc;
-
- /* Parse arguments. */
- for (int i = 1; i < c->argc; i++) {
- int moreargs = c->argc-i-1;
- char *o = c->argv[i]->ptr;
- if (!strcasecmp(o,"CLAIM") && moreargs) {
- if (!xreadgroup) {
- addReplyError(c,"The CLAIM option is only supported by "
- "XREADGROUP. You called XREAD instead.");
- return;
- }
- i++;
- min_idle_time = -1;
- if (getLongLongFromObjectOrReply(c, c->argv[i], &min_idle_time,
- "min-idle-time is not an integer or out of range") != C_OK)
- return;
- if (min_idle_time < 0) {
- addReplyError(c,"min-idle-time must be a positive integer");
- return;
- }
- } else if (!strcasecmp(o,"BLOCK") && moreargs) {
- i++;
- if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
- UNIT_MILLISECONDS) != C_OK) return;
- } else if (!strcasecmp(o,"COUNT") && moreargs) {
- i++;
- if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
- return;
- if (count < 0) count = 0;
- } else if (!strcasecmp(o,"STREAMS") && moreargs) {
- streams_arg = i+1;
- streams_count = (c->argc-streams_arg);
- if ((streams_count % 2) != 0) {
- const char *symbol = xreadgroup ? "ID or '>'" : "ID, '+', or '$'";
- addReplyErrorFormat(c,"Unbalanced '%s' list of streams: "
- "for each stream key an %s must be "
- "specified.", c->cmd->fullname,symbol);
- return;
- }
- streams_count /= 2; /* We have two arguments for each stream. */
- break;
- } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
- if (!xreadgroup) {
- addReplyError(c,"The GROUP option is only supported by "
- "XREADGROUP. You called XREAD instead.");
- return;
- }
- groupname = c->argv[i+1];
- consumername = c->argv[i+2];
- i += 2;
- } else if (!strcasecmp(o,"NOACK")) {
- if (!xreadgroup) {
- addReplyError(c,"The NOACK option is only supported by "
- "XREADGROUP. You called XREAD instead.");
- return;
- }
- noack = 1;
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
- }
-
- /* STREAMS option is mandatory. */
- if (streams_arg == 0) {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
-
- /* If the user specified XREADGROUP then it must also
- * provide the GROUP option. */
- if (xreadgroup && groupname == NULL) {
- addReplyError(c,"Missing GROUP option for XREADGROUP");
- return;
- }
-
- /* Parse the IDs and resolve the group name. */
- if (streams_count > STREAMID_STATIC_VECTOR_LEN)
- ids = zmalloc(sizeof(streamID)*streams_count);
- if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
-
- for (int i = streams_arg + streams_count; i < c->argc; i++) {
- /* Specifying "$" as last-known-id means that the client wants to be
- * served with just the messages that will arrive into the stream
- * starting from now. */
- int id_idx = i - streams_arg - streams_count;
- robj *key = c->argv[i-streams_count];
- kvobj *o = lookupKeyRead(c->db, key);
- if (checkType(c,o,OBJ_STREAM)) goto cleanup;
- streamCG *group = NULL;
-
- /* If a group was specified, than we need to be sure that the
- * key and group actually exist. */
- if (groupname) {
- if (o == NULL ||
- (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
- {
- addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
- "group '%s' in XREADGROUP with GROUP "
- "option",
- (char*)key->ptr,(char*)groupname->ptr);
- goto cleanup;
- }
- groups[id_idx] = group;
- }
-
- if (strcmp(c->argv[i]->ptr,"$") == 0) {
- if (xreadgroup) {
- addReplyError(c,"The $ ID is meaningless in the context of "
- "XREADGROUP: you want to read the history of "
- "this consumer by specifying a proper ID, or "
- "use the > ID to get new messages. The $ ID would "
- "just return an empty result set.");
- goto cleanup;
- }
- if (o) {
- stream *s = o->ptr;
- ids[id_idx] = s->last_id;
- } else {
- ids[id_idx].ms = 0;
- ids[id_idx].seq = 0;
- }
- continue;
- } else if (strcmp(c->argv[i]->ptr,"+") == 0) {
- if (xreadgroup) {
- addReplyError(c,"The + ID is meaningless in the context of "
- "XREADGROUP: you want to read the history of "
- "this consumer by specifying a proper ID, or "
- "use the > ID to get new messages. The + ID would "
- "just return an empty result set.");
- goto cleanup;
- }
- if (o && ((stream *)o->ptr)->length) {
- stream *s = o->ptr;
- /* We need to get the last valid ID.
- * It is impossible to use s->last_id because
- * entry with s->last_id may have been removed. */
- streamLastValidID(s, &ids[id_idx]);
- streamDecrID(&ids[id_idx]);
- } else {
- ids[id_idx].ms = 0;
- ids[id_idx].seq = 0;
- }
- continue;
- } else if (strcmp(c->argv[i]->ptr,">") == 0) {
- if (!xreadgroup) {
- addReplyError(c,"The > ID can be specified only when calling "
- "XREADGROUP using the GROUP <group> "
- "<consumer> option.");
- goto cleanup;
- }
- /* We use just the maximum ID to signal this is a ">" ID, anyway
- * the code handling the blocking clients will have to update the
- * ID later in order to match the changing consumer group last ID. */
- ids[id_idx].ms = UINT64_MAX;
- ids[id_idx].seq = UINT64_MAX;
- continue;
- }
- if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK)
- goto cleanup;
- }
-
- /* Try to serve the client synchronously. */
- size_t arraylen = 0;
- void *arraylen_ptr = NULL;
- uint64_t min_pel_delivery_time = UINT64_MAX;
- for (int i = 0; i < streams_count; i++) {
- kvobj *o = lookupKeyRead(c->db, c->argv[streams_arg + i]);
- if (o == NULL) continue;
- stream *s = o->ptr;
- streamID *gt = ids+i; /* ID must be greater than this. */
- int serve_claimed = 0;
- int serve_synchronously = 0;
- int serve_history = 0; /* True for XREADGROUP with ID != ">". */
- streamConsumer *consumer = NULL; /* Unused if XREAD */
- streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */
-
- /* Check if there are the conditions to serve the client
- * synchronously. */
- if (groups) {
- /* If min_idle_time is set we need to check is there any pending
- * message in the PEL idle enough to be claimed. Also we need to
- * get the minimum delivery time in the PEL, in order to use it
- * later if block option is set. */
- if (min_idle_time != -1) {
- raxIterator ri;
- raxStart(&ri, groups[i]->pel_by_time);
- raxSeek(&ri, "^", NULL, 0);
- while(raxNext(&ri)) {
- pelTimeKey timeKey;
- decodePelTimeKey(ri.key, &timeKey);
- if (!streamEntryExists(s, &timeKey.id))
- continue;
-
- if (timeKey.delivery_time < min_pel_delivery_time) {
- min_pel_delivery_time = timeKey.delivery_time;
- }
-
- uint64_t idle = commandTimeSnapshot() - timeKey.delivery_time;
- if (idle >= (uint64_t)min_idle_time) {
- serve_claimed = 1;
- }
- break;
- }
- raxStop(&ri);
- }
-
- /* If the consumer is blocked on a group, we always serve it
- * synchronously (serving its local history) if the ID specified
- * was not the special ">" ID. */
- if (gt->ms != UINT64_MAX ||
- gt->seq != UINT64_MAX)
- {
- serve_synchronously = 1;
- serve_history = 1;
- } else if (s->length) {
- /* We also want to serve a consumer in a consumer group
- * synchronously in case the group top item delivered is smaller
- * than what the stream has inside. */
- streamID maxid, *last = &groups[i]->last_id;
- streamLastValidID(s, &maxid);
- if (streamCompareID(&maxid, last) > 0) {
- serve_synchronously = 1;
- *gt = *last;
- }
- }
- consumer = streamLookupConsumer(groups[i],consumername->ptr);
- if (consumer == NULL) {
- old_alloc = s->alloc_size;
- consumer = streamCreateConsumer(s,groups[i],consumername->ptr,
- c->argv[streams_arg+i],
- c->db->id,SCC_DEFAULT);
- if (server.memory_tracking_per_slot)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),old_alloc,s->alloc_size);
- if (noack)
- streamPropagateConsumerCreation(c,spi.keyname,
- spi.groupname,
- consumer->name);
- }
- consumer->seen_time = commandTimeSnapshot();
- keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */
- } else if (s->length) {
- /* For consumers without a group, we serve synchronously if we can
- * actually provide at least one item from the stream. */
- streamID maxid;
- streamLastValidID(s, &maxid);
- if (streamCompareID(&maxid, gt) > 0) {
- serve_synchronously = 1;
- }
- }
-
- int flags = 0;
- if (serve_history) {
- /* CLAIM option is ignored when we server from consumer history.*/
- min_idle_time = -1;
- } else if (!serve_synchronously && serve_claimed) {
- /* We serve the client synchronously if the CLAIM option was
- * specified and there are messages in the PEL that are idle
- * enough. */
- serve_synchronously = 1;
- flags |= STREAM_RWR_CLAIMED;
- }
-
- if (serve_synchronously) {
- arraylen++;
- if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
- /* streamReplyWithRange() handles the 'start' ID as inclusive,
- * so start from the next ID, since we want only messages with
- * IDs greater than start. */
- streamID start = *gt;
- streamIncrID(&start);
-
- /* Emit the two elements sub-array consisting of the name
- * of the stream and the data we extracted from it. */
- if (c->resp == 2) addReplyArrayLen(c,2);
- addReplyBulk(c,c->argv[streams_arg+i]);
-
- unsigned long propCount = 0;
- if (noack) flags |= STREAM_RWR_NOACK;
- if (serve_history) flags |= STREAM_RWR_HISTORY;
- old_alloc = s->alloc_size;
- streamReplyWithRange(c,s,&start,NULL,count,0, min_idle_time,
- groups ? groups[i] : NULL,
- consumer, flags, &spi, &propCount);
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),old_alloc,s->alloc_size);
- if (propCount) {
- server.dirty++;
- keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */
- }
- }
- }
-
- /* We replied synchronously! Set the top array len and return to caller. */
- if (arraylen) {
- if (c->resp == 2)
- setDeferredArrayLen(c,arraylen_ptr,arraylen);
- else
- setDeferredMapLen(c,arraylen_ptr,arraylen);
- goto cleanup;
- }
-
- /* Block if needed. */
- if (timeout != -1) {
- /* If we are not allowed to block the client, the only thing
- * we can do is treating it as a timeout (even with timeout 0). */
- if (c->flags & CLIENT_DENY_BLOCKING) {
- addReplyNullArray(c);
- goto cleanup;
- }
- /* We change the '$' to the current last ID for this stream. this is
- * Since later on when we unblock on arriving data - we would like to
- * re-process the command and in case '$' stays we will spin-block forever.
- */
- for (int id_idx = 0; id_idx < streams_count; id_idx++) {
- int arg_idx = id_idx + streams_arg + streams_count;
- if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) {
- robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]);
- rewriteClientCommandArgument(c, arg_idx, argv_streamid);
- decrRefCount(argv_streamid);
- }
- }
- /* If min_idle_time is set we need to unblock client if PEL entry became claimable
- * before new messages arrive. min_pel_delivery_time is the minimum delivery time of all
- * entries in the PELs of different streams specified in the command. We add it to
- * min_idle_time to get the earliest time when an entry will be eligible for claiming.
- * If there are no entries in the PELs we will unblock the client after min_idle_time. */
- if (min_idle_time != -1) {
- uint64_t pel_expire_time = min_idle_time;
- if (min_pel_delivery_time != UINT64_MAX)
- pel_expire_time += min_pel_delivery_time;
- else
- pel_expire_time += commandTimeSnapshot();
- trackStreamClaimTimeouts(c, c->argv+streams_arg, streams_count, pel_expire_time);
- }
- blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup);
- goto cleanup;
- }
-
- /* No BLOCK option, nor any stream we can serve. Reply as with a
- * timeout happened. */
- addReplyNullArray(c);
- /* Continue to cleanup... */
-
-cleanup: /* Cleanup. */
-
- /* The command is propagated (in the READGROUP form) as a side effect
- * of calling lower level APIs. So stop any implicit propagation. */
- preventCommandPropagation(c);
- if (ids != static_ids) zfree(ids);
- zfree(groups);
-}
-
-/* -----------------------------------------------------------------------
- * Low level implementation of consumer groups
- * ----------------------------------------------------------------------- */
-
-/* Update a consumer group's last_id and handle minimum last_id tracking.
- * we will recalculate the minimum last_id when needed. */
-void streamUpdateCGroupLastId(stream *s, streamCG *cg, streamID *id) {
- /* When a consumer group's last_id is updated, we need to invalidate the cached
- * minimum last_id in two cases:
- * 1. If the consumer group's previous last_id equals the minimum last_id.
- * 2. If the new ID being set is smaller than the current minimum last_id. */
- if (s->min_cgroup_last_id_valid &&
- (streamCompareID(&cg->last_id, &s->min_cgroup_last_id) == 0 ||
- streamCompareID(id, &s->min_cgroup_last_id) < 0))
- {
- s->min_cgroup_last_id_valid = 0;
- }
- cg->last_id = *id;
-}
-
-/* Link a consumer group to a stream entry in the cgroups_ref index.
- * Returns a pointer to the list node, so that it can be used for future deletion. */
-listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key) {
- list *cglist;
-
- if (!s->cgroups_ref)
- s->cgroups_ref = raxNewWithMetadata(0, &s->alloc_size);
-
- /* Try to find the list for this stream ID, create it if it doesn't exist */
- if (!raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) {
- cglist = listCreate();
- serverAssert(raxInsert(s->cgroups_ref, key, sizeof(streamID), cglist, NULL));
- }
-
- /* Add the consumer group to the list and return the list node */
- listAddNodeTail(cglist, cg);
- return listLast(cglist);
-}
-
-/* Unlink a consumer group reference from the entry index for a specific stream ID.
- * This is called when a message is acknowledged or when a consumer group is deleted. */
-void streamUnlinkEntryFromCGroupRef(stream *s, streamNACK *na, unsigned char *key) {
- list *cglist;
- if (!s->cgroups_ref) return;
- if (raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) {
- listDelNode(cglist, na->cgroup_ref_node);
-
- /* If the list is now empty, remove it from the index. */
- if (listLength(cglist) == 0) {
- raxRemove(s->cgroups_ref, key, sizeof(streamID), NULL);
- listRelease(cglist);
- }
- }
-}
-
-/* Remove all consumer group references to a specific stream message. */
-void streamCleanupEntryCGroupRefs(stream *s, streamID *id) {
- if (!s->cgroups_ref) return;
- list *cglist;
- listIter li;
- listNode *ln;
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf, id);
-
- /* If message is not in any consumer group, nothing to do */
- if (!raxFind(s->cgroups_ref, buf, sizeof(streamID), (void **)&cglist))
- return;
-
- listRewind(cglist, &li);
- while ((ln = listNext(&li))) {
- streamNACK *nack;
- streamCG *group = listNodeValue(ln);
-
- /* Find the message in this consumer group's PEL */
- serverAssert(raxFind(group->pel, buf, sizeof(buf), (void **)&nack));
-
- /* Remove from group and consumer PELs */
- raxRemove(group->pel, buf, sizeof(buf), NULL);
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id);
- raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL);
- /* Since we're removing all references from the cgroups_ref, we can directly
- * free the NACK without unlinking it from the cgroups_ref. */
- streamFreeNACK(s, nack);
- }
-
- raxRemove(s->cgroups_ref, buf, sizeof(streamID), NULL);
- listRelease(cglist);
-}
-
-/* Check if a stream entry is still referenced by any consumer group.
- *
- * An entry is considered referenced if:
- * 1. Its ID is smaller than the minimum last_id of all consumer groups,
- * which means at least one group hasn't read it yet.
- * 2. It exists in any consumer group's PEL.
- *
- * Returns 1 if the entry is referenced, 0 if it's fully acknowledged by all groups. */
-int streamEntryIsReferenced(stream *s, streamID *id) {
- if (!s->cgroups || !raxSize(s->cgroups)) return 0;
- if (!s->min_cgroup_last_id_valid) {
- /* If the cached minimum last_id is invalid, we need to recalculate it
- * by iterating through all consumer groups to find the minimum last_id */
- s->min_cgroup_last_id_valid = 1;
- s->min_cgroup_last_id.ms = UINT64_MAX;
- s->min_cgroup_last_id.seq = UINT64_MAX;
- raxIterator ri;
- raxStart(&ri, s->cgroups);
- raxSeek(&ri, "^", NULL, 0);
- while (raxNext(&ri)) {
- streamCG *cg = ri.data;
- if (streamCompareID(&cg->last_id, &s->min_cgroup_last_id) < 0)
- s->min_cgroup_last_id = cg->last_id;
- }
- raxStop(&ri);
- }
-
- /* The consume group doesn't read it. */
- if (streamCompareID(&s->min_cgroup_last_id, id) < 0)
- return 1;
-
- /* Check if the message is in any consumer group's PEL */
- if (!s->cgroups_ref) return 0;
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf, id);
- return raxFind(s->cgroups_ref, buf, sizeof(streamID), NULL);
-}
-
-/* Create a NACK entry setting the delivery count to 1 and the delivery
- * time to the current time. The NACK consumer will be set to the one
- * specified as argument of the function. */
-streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer) {
- size_t usable;
- streamNACK *nack = zmalloc_usable(sizeof(*nack), &usable);
- s->alloc_size += usable;
- nack->delivery_time = commandTimeSnapshot();
- nack->delivery_count = 1;
- nack->consumer = consumer;
- nack->cgroup_ref_node = NULL; /* Will be set when added to cgroups_ref */
- return nack;
-}
-
-/* Free a NACK entry. */
-void streamFreeNACK(stream *s, streamNACK *na) {
- size_t usable;
- zfree_usable(na, &usable);
- s->alloc_size -= usable;
-}
-
-/* Free a NACK entry and remove its reference from the cgroups_ref.
- * This ensures proper cleanup of the consumer group list associated with the message ID. */
-void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key) {
- size_t usable;
- streamUnlinkEntryFromCGroupRef(s, na, key);
- zfree_usable(na, &usable);
- s->alloc_size -= usable;
-}
-
-/* Generic version of streamFreeNACK. */
-void streamFreeNACKGeneric(void *na, void *s) {
- streamFreeNACK((stream *)s, (streamNACK *)na);
-}
-
-/* Free a consumer and associated data structures. Note that this function
- * will not reassign the pending messages associated with this consumer
- * nor will delete them from the stream, so when this function is called
- * to delete a consumer, and not when the whole stream is destroyed, the caller
- * should do some work before. */
-void streamFreeConsumer(stream *s, streamConsumer *sc) {
- size_t usable;
- raxFree(sc->pel); /* No value free callback: the PEL entries are shared
- between the consumer and the main stream PEL. */
- s->alloc_size -= sdsAllocSize(sc->name);
- sdsfree(sc->name);
- zfree_usable(sc, &usable);
- s->alloc_size -= usable;
-}
-
-/* Generic version of streamFreeConsumer. */
-void streamFreeConsumerGeneric(void *sc, void *s) {
- streamFreeConsumer((stream *)s, (streamConsumer *)sc);
-}
-
-/* Create a new consumer group in the context of the stream 's', having the
- * specified name, last server ID and reads counter. If a consumer group with
- * the same name already exists NULL is returned, otherwise the pointer to the
- * consumer group is returned. */
-streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
- if (s->cgroups == NULL)
- s->cgroups = raxNewWithMetadata(0, &s->alloc_size);
- if (raxFind(s->cgroups,(unsigned char*)name,namelen,NULL))
- return NULL;
-
- size_t usable;
- streamCG *cg = zmalloc_usable(sizeof(*cg), &usable);
- s->alloc_size += usable;
- cg->pel = raxNewWithMetadata(0, &s->alloc_size);
- cg->pel_by_time = raxNewWithMetadata(0, &s->alloc_size);
- cg->consumers = raxNewWithMetadata(0, &s->alloc_size);
- cg->last_id.ms = 0;
- cg->last_id.seq = 0;
- streamUpdateCGroupLastId(s, cg, id);
- cg->entries_read = entries_read;
- raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
- return cg;
-}
-
-/* Free a consumer group and all its associated data. */
-static void streamFreeCG(stream *s, streamCG *cg) {
- raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, s);
- raxFree(cg->pel_by_time);
- raxFreeWithCbAndContext(cg->consumers, streamFreeConsumerGeneric, s);
- size_t usable;
- zfree_usable(cg, &usable);
- s->alloc_size -= usable;
-}
-
-/* Destroy a consumer group and clean up all associated references. */
-void streamDestroyCG(stream *s, streamCG *cg) {
- /* Remove all references from the cgroups_ref. */
- raxIterator it;
- raxStart(&it, cg->pel);
- raxSeek(&it, "^", NULL, 0);
- while (raxNext(&it)) {
- streamNACK *nack = it.data;
- streamUnlinkEntryFromCGroupRef(s, nack, it.key);
- }
- raxStop(&it);
-
- /* If we're destroying the group with the minimum last_id, the cached
- * minimum is no longer valid and needs to be recalculated from the
- * remaining groups. */
- if (s->min_cgroup_last_id_valid && streamCompareID(&s->min_cgroup_last_id, &cg->last_id) == 0)
- s->min_cgroup_last_id_valid = 0;
-
- streamFreeCG(s, cg);
-}
-
-/* Generic version of streamFreeCG. */
-void streamFreeCGGeneric(void *cg, void *s) {
- streamFreeCG((stream *)s, (streamCG *)cg);
-}
-
-/* Lookup the consumer group in the specified stream and returns its
- * pointer, otherwise if there is no such group, NULL is returned. */
-streamCG *streamLookupCG(stream *s, sds groupname) {
- if (s->cgroups == NULL) return NULL;
- void *cg = NULL;
- raxFind(s->cgroups,(unsigned char*)groupname,sdslen(groupname),&cg);
- return cg;
-}
-
-/* Create a consumer with the specified name in the group 'cg' and return.
- * If the consumer exists, return NULL. As a side effect, when the consumer
- * is successfully created, the key space will be notified and dirty++ unless
- * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
-streamConsumer *streamCreateConsumer(stream *s, streamCG *cg, sds name, robj *key, int dbid, int flags) {
- if (cg == NULL) return NULL;
- int notify = !(flags & SCC_NO_NOTIFY);
- int dirty = !(flags & SCC_NO_DIRTIFY);
- size_t usable;
- streamConsumer *consumer = zmalloc_usable(sizeof(*consumer), &usable);
- int success = raxTryInsert(cg->consumers,(unsigned char*)name,
- sdslen(name),consumer,NULL);
- if (!success) {
- zfree(consumer);
- return NULL;
- }
- s->alloc_size += usable;
- consumer->name = sdsdup(name);
- s->alloc_size += sdsAllocSize(consumer->name);
- consumer->pel = raxNewWithMetadata(0, &s->alloc_size);
- consumer->active_time = -1;
- consumer->seen_time = commandTimeSnapshot();
- if (dirty) server.dirty++;
- if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid);
- return consumer;
-}
-
-/* Lookup the consumer with the specified name in the group 'cg'. */
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
- if (cg == NULL) return NULL;
- void *consumer = NULL;
- raxFind(cg->consumers,(unsigned char*)name,sdslen(name),&consumer);
- return consumer;
-}
-
-/* Delete the consumer specified in the consumer group 'cg'. */
-void streamDelConsumer(stream *s, streamCG *cg, streamConsumer *consumer) {
- /* Iterate all the consumer pending messages, deleting every corresponding
- * entry from the global entry. */
- raxIterator ri;
- raxStart(&ri,consumer->pel);
- raxSeek(&ri,"^",NULL,0);
- while(raxNext(&ri)) {
- streamNACK *nack = ri.data;
- streamUnlinkEntryFromCGroupRef(s, nack, ri.key);
-
- streamID id;
- streamDecodeID(ri.key, &id);
-
- raxRemovePelByTime(cg->pel_by_time, nack->delivery_time, &id);
- raxRemove(cg->pel,ri.key,ri.key_len,NULL);
-
- streamFreeNACK(s, nack);
- }
- raxStop(&ri);
-
- /* Deallocate the consumer. */
- raxRemove(cg->consumers,(unsigned char*)consumer->name,
- sdslen(consumer->name),NULL);
- streamFreeConsumer(s,consumer);
-}
-
-/* -----------------------------------------------------------------------
- * Consumer groups commands
- * ----------------------------------------------------------------------- */
-
-/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESREAD entries_read]
- * XGROUP SETID <key> <groupname> <id or $> [ENTRIESREAD entries_read]
- * XGROUP DESTROY <key> <groupname>
- * XGROUP CREATECONSUMER <key> <groupname> <consumer>
- * XGROUP DELCONSUMER <key> <groupname> <consumername> */
-void xgroupCommand(client *c) {
- stream *s = NULL;
- sds grpname = NULL;
- streamCG *cg = NULL;
- char *opt = c->argv[1]->ptr; /* Subcommand name. */
- int mkstream = 0;
- long long entries_read = SCG_INVALID_ENTRIES_READ;
- robj *o;
- size_t old_alloc;
-
- /* Everything but the "HELP" option requires a key and group name. */
- if (c->argc >= 4) {
- /* Parse optional arguments for CREATE and SETID */
- int i = 5;
- int create_subcmd = !strcasecmp(opt,"CREATE");
- int setid_subcmd = !strcasecmp(opt,"SETID");
- while (i < c->argc) {
- if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) {
- mkstream = 1;
- i++;
- } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) {
- if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK)
- return;
- if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) {
- addReplyError(c,"value for ENTRIESREAD must be positive or -1");
- return;
- }
- i += 2;
- } else {
- addReplySubcommandSyntaxError(c);
- return;
- }
- }
-
- o = lookupKeyWrite(c->db,c->argv[2]);
- if (o) {
- if (checkType(c,o,OBJ_STREAM)) return;
- s = o->ptr;
- }
- grpname = c->argv[3]->ptr;
- }
-
- /* Check for missing key/group. */
- if (c->argc >= 4 && !mkstream) {
- /* At this point key must exist, or there is an error. */
- if (s == NULL) {
- addReplyError(c,
- "The XGROUP subcommand requires the key to exist. "
- "Note that for CREATE you may want to use the MKSTREAM "
- "option to create an empty stream automatically.");
- return;
- }
-
- /* Certain subcommands require the group to exist. */
- if ((cg = streamLookupCG(s,grpname)) == NULL &&
- (!strcasecmp(opt,"SETID") ||
- !strcasecmp(opt,"CREATECONSUMER") ||
- !strcasecmp(opt,"DELCONSUMER")))
- {
- addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
- "for key name '%s'",
- (char*)grpname, (char*)c->argv[2]->ptr);
- return;
- }
- }
-
- /* Dispatch the different subcommands. */
- if (c->argc == 2 && !strcasecmp(opt,"HELP")) {
- const char *help[] = {
-"CREATE <key> <groupname> <id|$> [option]",
-" Create a new consumer group. Options are:",
-" * MKSTREAM",
-" Create the empty stream if it does not exist.",
-" * ENTRIESREAD entries_read",
-" Set the group's entries_read counter (internal use).",
-"CREATECONSUMER <key> <groupname> <consumer>",
-" Create a new consumer in the specified group.",
-"DELCONSUMER <key> <groupname> <consumer>",
-" Remove the specified consumer.",
-"DESTROY <key> <groupname>",
-" Remove the specified group.",
-"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
-" Set the current group ID and entries_read counter.",
-NULL
- };
- addReplyHelp(c, help);
- } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) {
- streamID id;
- if (!strcmp(c->argv[4]->ptr,"$")) {
- if (s) {
- id = s->last_id;
- } else {
- id.ms = 0;
- id.seq = 0;
- }
- } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) {
- return;
- }
-
- /* Handle the MKSTREAM option now that the command can no longer fail. */
- if (s == NULL) {
- serverAssert(mkstream);
- o = createStreamObject();
- dbAdd(c->db, c->argv[2], &o);
- s = o->ptr;
- keyModified(c,c->db,c->argv[2],o,1);
- }
-
- if (entries_read != SCG_INVALID_ENTRIES_READ && (uint64_t)entries_read > s->entries_added) {
- entries_read = s->entries_added;
- }
-
- old_alloc = s->alloc_size;
- streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read);
- if (cg) {
- if (server.memory_tracking_per_slot)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size);
- addReply(c,shared.ok);
- server.dirty++;
- notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create",
- c->argv[2],c->db->id);
- keyModified(c,c->db,c->argv[2],o,0);
- } else {
- addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
- }
- } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) {
- streamID id;
- if (!strcmp(c->argv[4]->ptr,"$")) {
- id = s->last_id;
- } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
- return;
- }
-
- if (entries_read != SCG_INVALID_ENTRIES_READ && (uint64_t)entries_read > s->entries_added) {
- entries_read = s->entries_added;
- }
-
- streamUpdateCGroupLastId(s, cg, &id);
- cg->entries_read = entries_read;
- addReply(c,shared.ok);
- server.dirty++;
- notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
- keyModified(c,c->db,c->argv[2],o,0);
- } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
- if (cg) {
- old_alloc = s->alloc_size;
- raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
- streamDestroyCG(s, cg);
- if (server.memory_tracking_per_slot)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size);
- addReply(c,shared.cone);
- server.dirty++;
- notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
- c->argv[2],c->db->id);
- keyModified(c,c->db,c->argv[2],o,0);
- /* We want to unblock any XREADGROUP consumers with -NOGROUP. */
- signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM);
- } else {
- addReply(c,shared.czero);
- }
- } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) {
- old_alloc = s->alloc_size;
- streamConsumer *created = streamCreateConsumer(s,cg,c->argv[4]->ptr,c->argv[2],
- c->db->id,SCC_DEFAULT);
- keyModified(c,c->db,c->argv[2],o,0);
- if (server.memory_tracking_per_slot)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size);
- addReplyLongLong(c,created ? 1 : 0);
- } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
- long long pending = 0;
- streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr);
- if (consumer) {
- /* Delete the consumer and returns the number of pending messages
- * that were yet associated with such a consumer. */
- old_alloc = s->alloc_size;
- pending = raxSize(consumer->pel);
- streamDelConsumer(s,cg,consumer);
- if (server.memory_tracking_per_slot)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size);
- server.dirty++;
- notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
- c->argv[2],c->db->id);
- keyModified(c,c->db,c->argv[2],o,0);
- }
- addReplyLongLong(c,pending);
- } else {
- addReplySubcommandSyntaxError(c);
- }
-}
-
-/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]
- *
- * Set the internal "last ID", "added entries" and "maximal deleted entry ID"
- * of a stream. */
-void xsetidCommand(client *c) {
- streamID id, max_xdel_id = {0, 0};
- long long entries_added = -1;
-
- if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK)
- return;
-
- int i = 3;
- while (i < c->argc) {
- int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
- char *opt = c->argv[i]->ptr;
- if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) {
- if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) {
- return;
- } else if (entries_added < 0) {
- addReplyError(c,"entries_added must be positive");
- return;
- }
- i += 2;
- } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) {
- if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) {
- return;
- } else if (streamCompareID(&id,&max_xdel_id) < 0) {
- addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id");
- return;
- }
- i += 2;
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
- }
-
- kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.nokeyerr);
- if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return;
- stream *s = kv->ptr;
-
- if (streamCompareID(&id,&s->max_deleted_entry_id) < 0) {
- addReplyError(c,"The ID specified in XSETID is smaller than current max_deleted_entry_id");
- return;
- }
-
- /* If the stream has at least one item, we want to check that the user
- * is setting a last ID that is equal or greater than the current top
- * item, otherwise the fundamental ID monotonicity assumption is violated. */
- if (s->length > 0) {
- streamID maxid;
- streamLastValidID(s,&maxid);
-
- if (streamCompareID(&id,&maxid) < 0) {
- addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item");
- return;
- }
-
- /* If an entries_added was provided, it can't be lower than the length. */
- if (entries_added != -1 && s->length > (uint64_t)entries_added) {
- addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length");
- return;
- }
- }
-
- s->last_id = id;
- if (entries_added != -1)
- s->entries_added = entries_added;
- if (!streamIDEqZero(&max_xdel_id))
- s->max_deleted_entry_id = max_xdel_id;
- addReply(c,shared.ok);
- server.dirty++;
- notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
- keyModified(c,c->db,c->argv[1],kv,0);
-}
-
-/* XACK <key> <group> <id> <id> ... <id>
- * Acknowledge a message as processed. In practical terms we just check the
- * pending entries list (PEL) of the group, and delete the PEL entry both from
- * the group and the consumer (pending messages are referenced in both places).
- *
- * Return value of the command is the number of messages successfully
- * acknowledged, that is, the IDs we were actually able to resolve in the PEL.
- */
-void xackCommand(client *c) {
- streamCG *group = NULL;
- kvobj *kv = lookupKeyRead(c->db, c->argv[1]);
- if (kv) {
- if (checkType(c, kv, OBJ_STREAM)) return; /* Type error. */
- group = streamLookupCG(kv->ptr, c->argv[2]->ptr);
- }
-
- /* No key or group? Nothing to ack. */
- if (kv == NULL || group == NULL) {
- addReply(c,shared.czero);
- return;
- }
-
- /* Start parsing the IDs, so that we abort ASAP if there is a syntax
- * error: the return value of this command cannot be an error in case
- * the client successfully acknowledged some messages, so it should be
- * executed in a "all or nothing" fashion. */
- streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
- streamID *ids = static_ids;
- int id_count = c->argc-3;
- if (id_count > STREAMID_STATIC_VECTOR_LEN)
- ids = zmalloc(sizeof(streamID)*id_count);
- for (int j = 3; j < c->argc; j++) {
- if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;
- }
-
- int acknowledged = 0;
- stream *s = kv->ptr;
- size_t old_alloc = s->alloc_size;
- for (int j = 3; j < c->argc; j++) {
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf,&ids[j-3]);
-
- /* Lookup the ID in the group PEL: it will have a reference to the
- * NACK structure that will have a reference to the consumer, so that
- * we are able to remove the entry from both PELs. */
- void *result;
- if (raxFind(group->pel,buf,sizeof(buf),&result)) {
- streamNACK *nack = result;
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &ids[j-3]);
- raxRemove(group->pel,buf,sizeof(buf),NULL);
- raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
- streamDestroyNACK(kv->ptr, nack, buf);
- acknowledged++;
- server.dirty++;
- keyModified(c,c->db,c->argv[1],kv,0);
- }
- }
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
- addReplyLongLong(c,acknowledged);
-cleanup:
- if (ids != static_ids) zfree(ids);
-}
-
-/* Used by xackdelCommand() */
-typedef enum XAckDelRes {
- XACKDEL_NO_ID = -1, /* ID not found in PEL. */
- XACKDEL_DELETED = 1, /* Message acknowledged and deleted. */
- XACKDEL_STILL_REFERENCED = 2, /* Message acknowledged but not deleted (still referenced). */
-} XAckDelRes;
-
-/* XACKDEL <key> <group> [KEEPREF|DELREF|ACKED] [IDS <numids> <id ...>]
- * Acknowledges messages as processed and deletes them from the stream.
- *
- * Returns an array of status codes for each ID, indicating whether it
- * was deleted, still referenced, or not found. */
-void xackdelCommand(client *c) {
- stream *s = NULL;
- streamCG *group = NULL;
- kvobj *kv = lookupKeyRead(c->db, c->argv[1]);
- if (checkType(c, kv, OBJ_STREAM)) return; /* Type error. */
-
- /* Parse command options */
- streamAckDelArgs args;
- if (!streamParseAckDelArgsOrReply(c, 3, &args)) return;
-
- /* Reply null if the key doesn't exist or the group doesn't exist.*/
- if (!kv || !(group = streamLookupCG(kv->ptr, c->argv[2]->ptr))) {
- addReplyArrayLen(c, args.numids);
- for (int i = 0; i < args.numids; i++)
- addReplyLongLong(c, XACKDEL_NO_ID);
- return;
- }
-
- /* Start parsing the IDs, so that we abort ASAP if there is a syntax
- * error: the return value of this command cannot be an error in case
- * the client successfully acknowledged some messages, so it should be
- * executed in a "all or nothing" fashion. */
- streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
- streamID *ids = static_ids;
- if (args.numids > STREAMID_STATIC_VECTOR_LEN)
- ids = zmalloc(sizeof(streamID)*args.numids);
- for (int j = 0; j < args.numids; j++) {
- if (streamParseStrictIDOrReply(c,c->argv[j+args.startidx],&ids[j],0,NULL) != C_OK)
- goto cleanup;
- }
-
- s = kv->ptr;
- size_t old_alloc = s->alloc_size;
- int first_entry = 0;
- int deleted = 0, dirty = server.dirty;
- addReplyArrayLen(c, args.numids);
- for (int j = 0; j < args.numids; j++) {
- int res = XACKDEL_NO_ID;
- streamID *id = &ids[j];
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf,id);
-
- /* Lookup the ID in the group PEL: it will have a reference to the
- * NACK structure that will have a reference to the consumer, so that
- * we are able to remove the entry from both PELs. */
- void *result;
- if (raxFind(group->pel,buf,sizeof(buf),&result)) {
- streamNACK *nack = result;
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id);
- raxRemove(group->pel,buf,sizeof(buf),NULL);
- raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
- streamDestroyNACK(s, nack, buf);
- server.dirty++;
-
- int can_delete = 1;
- if (args.delete_strategy == DELETE_STRATEGY_ACKED) {
- /* Only delete if acknowledged by all consumer groups */
- if (streamEntryIsReferenced(s, id))
- can_delete = 0;
- } else if (args.delete_strategy == DELETE_STRATEGY_DELREF) {
- streamCleanupEntryCGroupRefs(s, id);
- }
-
- if (can_delete && streamDeleteItem(s,id)) {
- /* We want to know if the first entry in the stream was deleted
- * so we can later set the new one. */
- if (streamCompareID(id,&s->first_id) == 0) {
- first_entry = 1;
- }
- /* Update the stream's maximal tombstone if needed. */
- if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
- s->max_deleted_entry_id = *id;
- }
- deleted++;
- }
-
- /* If the entry was in the PEL but not found in the stream,
- * we still consider it successfully deleted. */
- res = can_delete ? XACKDEL_DELETED : XACKDEL_STILL_REFERENCED;
- }
- addReplyLongLong(c, res);
- }
-
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
-
- /* Update the stream's first ID. */
- if (deleted) {
- if (s->length == 0) {
- s->first_id.ms = 0;
- s->first_id.seq = 0;
- } else if (first_entry) {
- streamGetEdgeID(s,1,1,&s->first_id);
- }
-
- /* Propagate the write. */
- keyModified(c,c->db,c->argv[1],kv,1);
- notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
- } else if (server.dirty > dirty) {
- /* Only ACK succeeded without deleting elements, just update LRM without signaling */
- keyModified(c,c->db,c->argv[1],kv,0);
- }
-
-cleanup:
- if (ids != static_ids) zfree(ids);
-}
-
-/* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]]
- *
- * If start and stop are omitted, the command just outputs information about
- * the amount of pending messages for the key/group pair, together with
- * the minimum and maximum ID of pending messages.
- *
- * If start and stop are provided instead, the pending messages are returned
- * with information about the current owner, number of deliveries and last
- * delivery time and so forth. */
-void xpendingCommand(client *c) {
- int justinfo = c->argc == 3; /* Without the range just outputs general
- information about the PEL. */
- robj *key = c->argv[1];
- robj *groupname = c->argv[2];
- robj *consumername = NULL;
- streamID startid, endid;
- long long count = 0;
- long long minidle = 0;
- int startex = 0, endex = 0;
-
- /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */
- if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
-
- /* Parse start/end/count arguments ASAP if needed, in order to report
- * syntax errors before any other error. */
- if (c->argc >= 6) {
- int startidx = 3; /* Without IDLE */
-
- if (!strcasecmp(c->argv[3]->ptr, "IDLE")) {
- if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR)
- return;
- if (c->argc < 8) {
- /* If IDLE was provided we must have at least 'start end count' */
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
- /* Search for rest of arguments after 'IDLE <idle>' */
- startidx += 2;
- }
-
- /* count argument. */
- if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR)
- return;
- if (count < 0) count = 0;
-
- /* start and end arguments. */
- if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK)
- return;
- if (startex && streamIncrID(&startid) != C_OK) {
- addReplyError(c,"invalid start ID for the interval");
- return;
- }
- if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK)
- return;
- if (endex && streamDecrID(&endid) != C_OK) {
- addReplyError(c,"invalid end ID for the interval");
- return;
- }
-
- if (startidx+3 < c->argc) {
- /* 'consumer' was provided */
- consumername = c->argv[startidx+3];
- }
- }
-
- /* Lookup the key and the group inside the stream. */
- kvobj *kv = lookupKeyRead(c->db, c->argv[1]);
- streamCG *group;
-
- if (checkType(c, kv, OBJ_STREAM)) return;
- if (kv == NULL ||
- (group = streamLookupCG(kv->ptr, groupname->ptr)) == NULL)
- {
- addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
- "group '%s'",
- (char*)key->ptr,(char*)groupname->ptr);
- return;
- }
-
- /* XPENDING <key> <group> variant. */
- if (justinfo) {
- addReplyArrayLen(c,4);
- /* Total number of messages in the PEL. */
- addReplyLongLong(c,raxSize(group->pel));
- /* First and last IDs. */
- if (raxSize(group->pel) == 0) {
- addReplyNull(c); /* Start. */
- addReplyNull(c); /* End. */
- addReplyNullArray(c); /* Clients. */
- } else {
- /* Start. */
- raxIterator ri;
- raxStart(&ri,group->pel);
- raxSeek(&ri,"^",NULL,0);
- raxNext(&ri);
- streamDecodeID(ri.key,&startid);
- addReplyStreamID(c,&startid);
-
- /* End. */
- raxSeek(&ri,"$",NULL,0);
- raxNext(&ri);
- streamDecodeID(ri.key,&endid);
- addReplyStreamID(c,&endid);
- raxStop(&ri);
-
- /* Consumers with pending messages. */
- raxStart(&ri,group->consumers);
- raxSeek(&ri,"^",NULL,0);
- void *arraylen_ptr = addReplyDeferredLen(c);
- size_t arraylen = 0;
- while(raxNext(&ri)) {
- streamConsumer *consumer = ri.data;
- if (raxSize(consumer->pel) == 0) continue;
- addReplyArrayLen(c,2);
- addReplyBulkCBuffer(c,ri.key,ri.key_len);
- addReplyBulkLongLong(c,raxSize(consumer->pel));
- arraylen++;
- }
- setDeferredArrayLen(c,arraylen_ptr,arraylen);
- raxStop(&ri);
- }
- } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */
- streamConsumer *consumer = NULL;
- if (consumername) {
- consumer = streamLookupConsumer(group,consumername->ptr);
-
- /* If a consumer name was mentioned but it does not exist, we can
- * just return an empty array. */
- if (consumer == NULL) {
- addReplyArrayLen(c,0);
- return;
- }
- }
-
- rax *pel = consumer ? consumer->pel : group->pel;
- unsigned char startkey[sizeof(streamID)];
- unsigned char endkey[sizeof(streamID)];
- raxIterator ri;
- mstime_t now = commandTimeSnapshot();
-
- streamEncodeID(startkey,&startid);
- streamEncodeID(endkey,&endid);
- raxStart(&ri,pel);
- raxSeek(&ri,">=",startkey,sizeof(startkey));
- void *arraylen_ptr = addReplyDeferredLen(c);
- size_t arraylen = 0;
-
- while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
- streamNACK *nack = ri.data;
-
- if (minidle) {
- mstime_t this_idle = now - nack->delivery_time;
- if (this_idle < minidle) continue;
- }
-
- arraylen++;
- count--;
- addReplyArrayLen(c,4);
-
- /* Entry ID. */
- streamID id;
- streamDecodeID(ri.key,&id);
- addReplyStreamID(c,&id);
-
- /* Consumer name. */
- addReplyBulkCBuffer(c,nack->consumer->name,
- sdslen(nack->consumer->name));
-
- /* Milliseconds elapsed since last delivery. */
- mstime_t elapsed = now - nack->delivery_time;
- if (elapsed < 0) elapsed = 0;
- addReplyLongLong(c,elapsed);
-
- /* Number of deliveries. */
- addReplyLongLong(c,nack->delivery_count);
- }
- raxStop(&ri);
- setDeferredArrayLen(c,arraylen_ptr,arraylen);
- }
-}
-
-/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
- * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
- * [FORCE] [JUSTID]
- *
- * Changes ownership of one or multiple messages in the Pending Entries List
- * of a given stream consumer group.
- *
- * If the message ID (among the specified ones) exists, and its idle
- * time greater or equal to <min-idle-time>, then the message new owner
- * becomes the specified <consumer>. If the minimum idle time specified
- * is zero, messages are claimed regardless of their idle time.
- *
- * All the messages that cannot be found inside the pending entries list
- * are ignored, but in case the FORCE option is used. In that case we
- * create the NACK (representing a not yet acknowledged message) entry in
- * the consumer group PEL.
- *
- * This command creates the consumer as side effect if it does not yet
- * exists. Moreover the command reset the idle time of the message to 0,
- * even if by using the IDLE or TIME options, the user can control the
- * new idle time.
- *
- * The options at the end can be used in order to specify more attributes
- * to set in the representation of the pending message:
- *
- * 1. IDLE <ms>:
- * Set the idle time (last time it was delivered) of the message.
- * If IDLE is not specified, an IDLE of 0 is assumed, that is,
- * the time count is reset because the message has now a new
- * owner trying to process it.
- *
- * 2. TIME <ms-unix-time>:
- * This is the same as IDLE but instead of a relative amount of
- * milliseconds, it sets the idle time to a specific unix time
- * (in milliseconds). This is useful in order to rewrite the AOF
- * file generating XCLAIM commands.
- *
- * 3. RETRYCOUNT <count>:
- * Set the retry counter to the specified value. This counter is
- * incremented every time a message is delivered again. Normally
- * XCLAIM does not alter this counter, which is just served to clients
- * when the XPENDING command is called: this way clients can detect
- * anomalies, like messages that are never processed for some reason
- * after a big number of delivery attempts.
- *
- * 4. FORCE:
- * Creates the pending message entry in the PEL even if certain
- * specified IDs are not already in the PEL assigned to a different
- * client. However the message must be exist in the stream, otherwise
- * the IDs of non existing messages are ignored.
- *
- * 5. JUSTID:
- * Return just an array of IDs of messages successfully claimed,
- * without returning the actual message.
- *
- * 6. LASTID <id>:
- * Update the consumer group last ID with the specified ID if the
- * current last ID is smaller than the provided one.
- * This is used for replication / AOF, so that when we read from a
- * consumer group, the XCLAIM that gets propagated to give ownership
- * to the consumer, is also used in order to update the group current
- * ID.
- *
- * The command returns an array of messages that the user
- * successfully claimed, so that the caller is able to understand
- * what messages it is now in charge of. */
-void xclaimCommand(client *c) {
- streamCG *group = NULL;
- kvobj *o = lookupKeyRead(c->db,c->argv[1]);
- long long minidle; /* Minimum idle time argument. */
- long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
- mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
- int force = 0;
- int justid = 0;
-
- if (o) {
- if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
- group = streamLookupCG(o->ptr,c->argv[2]->ptr);
- }
-
- /* No key or group? Send an error given that the group creation
- * is mandatory. */
- if (o == NULL || group == NULL) {
- addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
- "consumer group '%s'", (char*)c->argv[1]->ptr,
- (char*)c->argv[2]->ptr);
- return;
- }
-
- if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
- "Invalid min-idle-time argument for XCLAIM")
- != C_OK) return;
- if (minidle < 0) minidle = 0;
-
- /* Start parsing the IDs, so that we abort ASAP if there is a syntax
- * error: the return value of this command cannot be an error in case
- * the client successfully claimed some message, so it should be
- * executed in a "all or nothing" fashion. */
- int j;
- streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
- streamID *ids = static_ids;
- int id_count = c->argc-5;
- if (id_count > STREAMID_STATIC_VECTOR_LEN)
- ids = zmalloc(sizeof(streamID)*id_count);
- for (j = 5; j < c->argc; j++) {
- if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break;
- }
- int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
-
- /* If we stopped because some IDs cannot be parsed, perhaps they
- * are trailing options. */
- mstime_t now = commandTimeSnapshot();
- streamID last_id = {0,0};
- int propagate_last_id = 0;
- for (; j < c->argc; j++) {
- int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
- char *opt = c->argv[j]->ptr;
- if (!strcasecmp(opt,"FORCE")) {
- force = 1;
- } else if (!strcasecmp(opt,"JUSTID")) {
- justid = 1;
- } else if (!strcasecmp(opt,"IDLE") && moreargs) {
- j++;
- if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
- "Invalid IDLE option argument for XCLAIM")
- != C_OK) goto cleanup;
- deliverytime = now - deliverytime;
- } else if (!strcasecmp(opt,"TIME") && moreargs) {
- j++;
- if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
- "Invalid TIME option argument for XCLAIM")
- != C_OK) goto cleanup;
- } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
- j++;
- if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
- "Invalid RETRYCOUNT option argument for XCLAIM")
- != C_OK) goto cleanup;
- } else if (!strcasecmp(opt,"LASTID") && moreargs) {
- j++;
- if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup;
- } else {
- addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
- goto cleanup;
- }
- }
-
- if (streamCompareID(&last_id,&group->last_id) > 0) {
- streamUpdateCGroupLastId(o->ptr, group, &last_id);
- propagate_last_id = 1;
- }
-
- if (deliverytime != -1) {
- /* If a delivery time was passed, either with IDLE or TIME, we
- * do some sanity check on it, and set the deliverytime to now
- * (which is a sane choice usually) if the value is bogus.
- * To raise an error here is not wise because clients may compute
- * the idle time doing some math starting from their local time,
- * and this is not a good excuse to fail in case, for instance,
- * the computer time is a bit in the future from our POV. */
- if (deliverytime < 0 || deliverytime > now) deliverytime = now;
- } else {
- /* If no IDLE/TIME option was passed, we want the last delivery
- * time to be now, so that the idle time of the message will be
- * zero. */
- deliverytime = now;
- }
-
- /* Do the actual claiming. */
- stream *s = o->ptr;
- size_t old_alloc = s->alloc_size;
- streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
- if (consumer == NULL) {
- consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
- }
- consumer->seen_time = commandTimeSnapshot();
-
- void *arraylenptr = addReplyDeferredLen(c);
- size_t arraylen = 0;
- for (int j = 5; j <= last_id_arg; j++) {
- streamID id = ids[j-5];
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf,&id);
-
- /* Lookup the ID in the group PEL. */
- void *result = NULL;
- raxFind(group->pel,buf,sizeof(buf),&result);
- streamNACK *nack = result;
-
- /* Item must exist for us to transfer it to another consumer. */
- if (!streamEntryExists(s,&id)) {
- /* Clear this entry from the PEL, it no longer exists */
- if (nack != NULL) {
- /* Propagate this change (we are going to delete the NACK). */
- streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
- propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
- server.dirty++;
- /* Release the NACK */
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
- raxRemove(group->pel,buf,sizeof(buf),NULL);
- raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
- streamDestroyNACK(s, nack, buf);
- }
- continue;
- }
-
- /* If FORCE is passed, let's check if at least the entry
- * exists in the Stream. In such case, we'll create a new
- * entry in the PEL from scratch, so that XCLAIM can also
- * be used to create entries in the PEL. Useful for AOF
- * and replication of consumer groups. */
- if (force && nack == NULL) {
- /* Create the NACK. */
- nack = streamCreateNACK(s,NULL);
- raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
- raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
- nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf);
- }
-
- if (nack != NULL) {
- /* We need to check if the minimum idle time requested
- * by the caller is satisfied by this entry.
- *
- * Note that the nack could be created by FORCE, in this
- * case there was no pre-existing entry and minidle should
- * be ignored, but in that case nack->consumer is NULL. */
- if (nack->consumer && minidle) {
- mstime_t this_idle = now - nack->delivery_time;
- if (this_idle < minidle) continue;
- }
-
- if (nack->consumer != consumer) {
- /* Remove the entry from the old consumer.
- * Note that nack->consumer is NULL if we created the
- * NACK above because of the FORCE option. */
- if (nack->consumer) {
- raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
- }
- }
-
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
- nack->delivery_time = deliverytime;
- raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
-
- /* Set the delivery attempts counter if given, otherwise
- * autoincrement unless JUSTID option provided */
- if (retrycount >= 0) {
- nack->delivery_count = retrycount;
- } else if (!justid) {
- nack->delivery_count++;
- }
- if (nack->consumer != consumer) {
- /* Add the entry in the new consumer local PEL. */
- raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
- nack->consumer = consumer;
- }
- /* Send the reply for this entry. */
- if (justid) {
- addReplyStreamID(c,&id);
- } else {
- serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,-1,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
- }
- arraylen++;
-
- consumer->active_time = commandTimeSnapshot();
-
- /* Propagate this change. */
- streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
- propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
- server.dirty++;
- }
- }
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
- if (propagate_last_id) {
- streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
- server.dirty++;
- }
- setDeferredArrayLen(c,arraylenptr,arraylen);
- preventCommandPropagation(c);
- keyModified(c,c->db,c->argv[1],o,0);
-cleanup:
- if (ids != static_ids) zfree(ids);
-}
-
-/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
- *
- * Changes ownership of one or multiple messages in the Pending Entries List
- * of a given stream consumer group.
- *
- * For each PEL entry, if its idle time greater or equal to <min-idle-time>,
- * then the message new owner becomes the specified <consumer>.
- * If the minimum idle time specified is zero, messages are claimed
- * regardless of their idle time.
- *
- * This command creates the consumer as side effect if it does not yet
- * exists. Moreover the command reset the idle time of the message to 0.
- *
- * The command returns an array of messages that the user
- * successfully claimed, so that the caller is able to understand
- * what messages it is now in charge of. */
-void xautoclaimCommand(client *c) {
- streamCG *group = NULL;
- kvobj *o = lookupKeyRead(c->db,c->argv[1]);
- long long minidle; /* Minimum idle time argument, in milliseconds. */
- long count = 100; /* Maximum entries to claim. */
- const unsigned attempts_factor = 10;
- streamID startid;
- int startex;
- int justid = 0;
-
- /* Parse idle/start/end/count arguments ASAP if needed, in order to report
- * syntax errors before any other error. */
- if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK)
- return;
- if (minidle < 0) minidle = 0;
-
- if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK)
- return;
- if (startex && streamIncrID(&startid) != C_OK) {
- addReplyError(c,"invalid start ID for the interval");
- return;
- }
-
- int j = 6; /* options start at argv[6] */
- while(j < c->argc) {
- int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
- char *opt = c->argv[j]->ptr;
- if (!strcasecmp(opt,"COUNT") && moreargs) {
- long max_count = LONG_MAX / (max(sizeof(streamID), attempts_factor));
- if (getRangeLongFromObjectOrReply(c,c->argv[j+1],1,max_count,&count,"COUNT must be > 0") != C_OK)
- return;
- j++;
- } else if (!strcasecmp(opt,"JUSTID")) {
- justid = 1;
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
- j++;
- }
-
- if (o) {
- if (checkType(c,o,OBJ_STREAM))
- return; /* Type error. */
- group = streamLookupCG(o->ptr,c->argv[2]->ptr);
- }
-
- /* No key or group? Send an error given that the group creation
- * is mandatory. */
- if (o == NULL || group == NULL) {
- addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'",
- (char*)c->argv[1]->ptr,
- (char*)c->argv[2]->ptr);
- return;
- }
-
- streamID *deleted_ids = ztrymalloc(count * sizeof(streamID));
- if (!deleted_ids) {
- addReplyError(c, "Insufficient memory, failed allocating transient memory, COUNT too high.");
- return;
- }
-
- /* Do the actual claiming. */
- stream *s = o->ptr;
- size_t old_alloc = s->alloc_size;
- streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
- if (consumer == NULL) {
- consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
- }
- consumer->seen_time = commandTimeSnapshot();
-
- long long attempts = count * attempts_factor;
-
- addReplyArrayLen(c, 3); /* We add another reply later */
- void *endidptr = addReplyDeferredLen(c); /* reply[0] */
- void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */
-
- unsigned char startkey[sizeof(streamID)];
- streamEncodeID(startkey,&startid);
- raxIterator ri;
- raxStart(&ri,group->pel);
- raxSeek(&ri,">=",startkey,sizeof(startkey));
- size_t arraylen = 0;
- mstime_t now = commandTimeSnapshot();
- int deleted_id_num = 0;
- while (attempts-- && count && raxNext(&ri)) {
- streamNACK *nack = ri.data;
-
- streamID id;
- streamDecodeID(ri.key, &id);
-
- /* Item must exist for us to transfer it to another consumer. */
- if (!streamEntryExists(s,&id)) {
- /* Propagate this change (we are going to delete the NACK). */
- robj *idstr = createObjectFromStreamID(&id);
- streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
- decrRefCount(idstr);
- server.dirty++;
- /* Clear this entry from the PEL, it no longer exists */
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
- raxRemove(group->pel,ri.key,ri.key_len,NULL);
- raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
- streamDestroyNACK(s, nack, ri.key);
- /* Remember the ID for later */
- deleted_ids[deleted_id_num++] = id;
- raxSeek(&ri,">=",ri.key,ri.key_len);
- count--; /* Count is a limit of the command response size. */
- continue;
- }
-
- if (minidle) {
- mstime_t this_idle = now - nack->delivery_time;
- if (this_idle < minidle)
- continue;
- }
-
- if (nack->consumer != consumer) {
- /* Remove the entry from the old consumer.
- * Note that nack->consumer is NULL if we created the
- * NACK above because of the FORCE option. */
- if (nack->consumer) {
- raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
- }
- }
-
- /* Update the consumer and idle time. */
- raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
- nack->delivery_time = now;
- raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
-
- /* Increment the delivery attempts counter unless JUSTID option provided */
- if (!justid)
- nack->delivery_count++;
-
- if (nack->consumer != consumer) {
- /* Add the entry in the new consumer local PEL. */
- raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL);
- nack->consumer = consumer;
- }
-
- /* Send the reply for this entry. */
- if (justid) {
- addReplyStreamID(c,&id);
- } else {
- serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,-1,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
- }
- arraylen++;
- count--;
-
- consumer->active_time = commandTimeSnapshot();
-
- /* Propagate this change. */
- robj *idstr = createObjectFromStreamID(&id);
- streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
- decrRefCount(idstr);
- server.dirty++;
- }
-
- /* We need to return the next entry as a cursor for the next XAUTOCLAIM call */
- raxNext(&ri);
-
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
-
- streamID endid;
- if (raxEOF(&ri)) {
- endid.ms = endid.seq = 0;
- } else {
- streamDecodeID(ri.key, &endid);
- }
- raxStop(&ri);
-
- setDeferredArrayLen(c,arraylenptr,arraylen);
- setDeferredReplyStreamID(c,endidptr,&endid);
-
- addReplyArrayLen(c, deleted_id_num); /* reply[2] */
- for (int i = 0; i < deleted_id_num; i++) {
- addReplyStreamID(c, &deleted_ids[i]);
- }
- zfree(deleted_ids);
-
- preventCommandPropagation(c);
- /* Update LRM but don't signal. */
- keyModified(c,c->db,c->argv[1],o,0);
-}
-
-/* XDEL <key> [<ID1> <ID2> ... <IDN>]
- *
- * Removes the specified entries from the stream. Returns the number
- * of items actually deleted, that may be different from the number
- * of IDs passed in case certain IDs do not exist. */
-void xdelCommand(client *c) {
- kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.czero);
- if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return;
- stream *s = kv->ptr;
- size_t old_alloc = s->alloc_size;
-
- /* We need to sanity check the IDs passed to start. Even if not
- * a big issue, it is not great that the command is only partially
- * executed because at some point an invalid ID is parsed. */
- streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
- streamID *ids = static_ids;
- int id_count = c->argc-2;
- if (id_count > STREAMID_STATIC_VECTOR_LEN)
- ids = zmalloc(sizeof(streamID)*id_count);
- for (int j = 2; j < c->argc; j++) {
- if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup;
- }
-
- /* Actually apply the command. */
- int deleted = 0;
- int first_entry = 0;
- for (int j = 2; j < c->argc; j++) {
- streamID *id = &ids[j-2];
- if (streamDeleteItem(s,id)) {
- /* We want to know if the first entry in the stream was deleted
- * so we can later set the new one. */
- if (streamCompareID(id,&s->first_id) == 0) {
- first_entry = 1;
- }
- /* Update the stream's maximal tombstone if needed. */
- if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
- s->max_deleted_entry_id = *id;
- }
- deleted++;
- };
- }
-
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
-
- /* Update the stream's first ID. */
- if (deleted) {
- if (s->length == 0) {
- s->first_id.ms = 0;
- s->first_id.seq = 0;
- } else if (first_entry) {
- streamGetEdgeID(s,1,1,&s->first_id);
- }
- }
-
- /* Propagate the write if needed. */
- if (deleted) {
- keyModified(c,c->db,c->argv[1],kv,1);
- notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
- server.dirty += deleted;
- }
- addReplyLongLong(c,deleted);
-cleanup:
- if (ids != static_ids) zfree(ids);
-}
-
-/* Used by xdelexCommand() */
-typedef enum XDelexRes {
- XDELEX_NO_ID = -1, /* ID not found in the stream. */
- XDELEX_DELETED = 1, /* Message deleted. */
- XDELEX_STILL_REFERENCED = 2, /* Message not deleted (still referenced). */
-} XDelexRes;
-
-/* XDELEX <key> [KEEPREF|DELREF|ACKED] [IDS <numids> <id ...>]
- *
- * Removes specified entries from the stream. Returns an array of status codes for
- * each ID, indicating whether it was deleted, still referenced, or not found. */
-void xdelexCommand(client *c) {
- kvobj *kv = lookupKeyWrite(c->db, c->argv[1]);
- if (checkType(c, kv, OBJ_STREAM)) return;
-
- /* Parse command options */
- streamAckDelArgs args;
- if (!streamParseAckDelArgsOrReply(c, 2, &args)) return;
-
- /* Non-existing keys and empty stream are the same thing. Reply null if the
- * key does not exist.*/
- if (!kv) {
- addReplyArrayLen(c, args.numids);
- for (int i = 0; i < args.numids; i++)
- addReplyLongLong(c, XDELEX_NO_ID);
- return;
- }
-
- /* We need to sanity check the IDs passed to start. Even if not
- * a big issue, it is not great that the command is only partially
- * executed because at some point an invalid ID is parsed. */
- streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
- streamID *ids = static_ids;
- if (args.numids > STREAMID_STATIC_VECTOR_LEN)
- ids = zmalloc(sizeof(streamID)*args.numids);
- for (int j = 0; j < args.numids; j++) {
- if (streamParseStrictIDOrReply(c,c->argv[j+args.startidx],&ids[j],0,NULL) != C_OK)
- goto cleanup;
- }
-
- stream *s = kv->ptr;
- size_t old_alloc = s->alloc_size;
- int first_entry = 0;
- int deleted = 0;
- addReplyArrayLen(c, args.numids);
- for (int j = 0; j < args.numids; j++) {
- int res = XDELEX_NO_ID;
- streamID *id = &ids[j];
- unsigned char buf[sizeof(streamID)];
- streamEncodeID(buf,id);
-
- int can_delete = 1;
- if (args.delete_strategy == DELETE_STRATEGY_ACKED) {
- /* Only delete if acknowledged by all consumer groups */
- if (streamEntryIsReferenced(s, id))
- can_delete = 0;
- } else if (args.delete_strategy == DELETE_STRATEGY_DELREF) {
- streamCleanupEntryCGroupRefs(s, id);
- }
-
- if (can_delete) { /* can_delete being true doesn't guarantee the ID exists */
- if (streamDeleteItem(s,id)) {
- /* We want to know if the first entry in the stream was deleted
- * so we can later set the new one. */
- if (streamCompareID(id,&s->first_id) == 0) {
- first_entry = 1;
- }
- /* Update the stream's maximal tombstone if needed. */
- if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
- s->max_deleted_entry_id = *id;
- }
- deleted++;
- res = XDELEX_DELETED;
- } else {
- /* This id doesn't exist. */
- }
- } else {
- res = XDELEX_STILL_REFERENCED;
- }
-
- addReplyLongLong(c, res);
- }
-
- /* Update the stream's first ID. */
- if (deleted) {
- if (server.memory_tracking_per_slot)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
- if (s->length == 0) {
- s->first_id.ms = 0;
- s->first_id.seq = 0;
- } else if (first_entry) {
- streamGetEdgeID(s,1,1,&s->first_id);
- }
-
- /* Propagate the write. */
- keyModified(c,c->db,c->argv[1],kv,1);
- notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
- server.dirty += deleted;
- }
-
-cleanup:
- if (ids != static_ids) zfree(ids);
-}
-
-/* General form: XTRIM <key> [... options ...]
- *
- * List of options:
- *
- * Trim strategies:
- *
- * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
- * the specified length. Use ~ before the
- * count in order to demand approximated trimming
- * (like XADD MAXLEN option).
- * MINID [~|=] <id> -- Trim so that the stream will not contain entries
- * with IDs smaller than 'id'. Use ~ before the
- * count in order to demand approximated trimming
- * (like XADD MINID option).
- *
- * Consumer group reference handling (optional, defaults to KEEPREF):
- *
- * KEEPREF -- Keeps existing consumer group references
- * DELREF -- Clean up all consumer group references
- * ACKED -- Only delete messages that are acknowledged
- *
- * Other options:
- *
- * LIMIT <entries> -- The maximum number of entries to trim.
- * 0 means unlimited. Unless specified, it is set
- * to a default of 100*server.stream_node_max_entries,
- * and that's in order to keep the trimming time sane.
- * Has meaning only if `~` was provided.
- */
-void xtrimCommand(client *c) {
- /* Argument parsing. */
- streamAddTrimArgs parsed_args;
- if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 0) < 0)
- return; /* streamParseAddOrTrimArgsOrReply already replied. */
-
- /* If the key does not exist, we are ok returning zero, that is, the
- * number of elements removed from the stream. */
- kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.czero);
- if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return;
- stream *s = kv->ptr;
-
- /* Perform the trimming. */
- size_t old_alloc = s->alloc_size;
- int64_t deleted = streamTrim(s, &parsed_args);
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
- if (deleted) {
- notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
- if (parsed_args.approx_trim) {
- /* In case our trimming was limited (by LIMIT or by ~) we must
- * re-write the relevant trim argument to make sure there will be
- * no inconsistencies in AOF loading or in the replica.
- * It's enough to check only args->approx because there is no
- * way LIMIT is given without the ~ option. */
- streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
- streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
- }
-
- /* Propagate the write. */
- keyModified(c, c->db,c->argv[1], kv, 1);
- server.dirty += deleted;
- }
- addReplyLongLong(c,deleted);
-}
-
-/* Helper function for xinfoCommand.
- * Handles the variants of XINFO STREAM */
-void xinfoReplyWithStreamInfo(client *c, stream *s) {
- int full = 1;
- long long count = 10; /* Default COUNT is 10 so we don't block the server */
- robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */
- int optc = c->argc - 3;
-
- /* Parse options. */
- if (optc == 0) {
- full = 0;
- } else {
- /* Valid options are [FULL] or [FULL COUNT <count>] */
- if (optc != 1 && optc != 3) {
- addReplySubcommandSyntaxError(c);
- return;
- }
-
- /* First option must be "FULL" */
- if (strcasecmp(optv[0]->ptr,"full")) {
- addReplySubcommandSyntaxError(c);
- return;
- }
-
- if (optc == 3) {
- /* First option must be "FULL" */
- if (strcasecmp(optv[1]->ptr,"count")) {
- addReplySubcommandSyntaxError(c);
- return;
- }
- if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR)
- return;
- if (count < 0) count = 10;
- }
- }
-
- addReplyMapLen(c,full ? 15 : 16);
- addReplyBulkCString(c,"length");
- addReplyLongLong(c,s->length);
- addReplyBulkCString(c,"radix-tree-keys");
- addReplyLongLong(c,raxSize(s->rax));
- addReplyBulkCString(c,"radix-tree-nodes");
- addReplyLongLong(c,s->rax->numnodes);
- addReplyBulkCString(c,"last-generated-id");
- addReplyStreamID(c,&s->last_id);
- addReplyBulkCString(c,"max-deleted-entry-id");
- addReplyStreamID(c,&s->max_deleted_entry_id);
- addReplyBulkCString(c,"entries-added");
- addReplyLongLong(c,s->entries_added);
- addReplyBulkCString(c,"recorded-first-entry-id");
- addReplyStreamID(c,&s->first_id);
- addReplyBulkCString(c,"idmp-duration");
- addReplyLongLong(c,s->idmp_duration);
- addReplyBulkCString(c,"idmp-maxsize");
- addReplyLongLong(c,s->idmp_max_entries);
- addReplyBulkCString(c,"pids-tracked");
- addReplyLongLong(c, s->idmp_producers ? raxSize(s->idmp_producers) : 0);
- addReplyBulkCString(c,"iids-tracked");
- /* Count total IIDs across all producers */
- size_t total_iids = 0;
- if (s->idmp_producers) {
- raxIterator ri;
- raxStart(&ri, s->idmp_producers);
- raxSeek(&ri, "^", NULL, 0);
- while (raxNext(&ri)) {
- idmpProducer *producer = ri.data;
- total_iids += dictSize(producer->idmp_dict);
- }
- raxStop(&ri);
- }
- addReplyLongLong(c, total_iids);
- addReplyBulkCString(c,"iids-added");
- addReplyLongLong(c,s->iids_added);
- addReplyBulkCString(c,"iids-duplicates");
- addReplyLongLong(c,s->iids_duplicates);
-
- size_t old_alloc = s->alloc_size;
- if (!full) {
- /* XINFO STREAM <key> */
-
- addReplyBulkCString(c,"groups");
- addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
-
- /* To emit the first/last entry we use streamReplyWithRange(). */
- int emitted;
- streamID start, end;
- start.ms = start.seq = 0;
- end.ms = end.seq = UINT64_MAX;
- addReplyBulkCString(c,"first-entry");
- emitted = streamReplyWithRange(c,s,&start,&end,1,0,-1,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL,NULL);
- if (!emitted) addReplyNull(c);
- addReplyBulkCString(c,"last-entry");
- emitted = streamReplyWithRange(c,s,&start,&end,1,1,-1,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL,NULL);
- if (!emitted) addReplyNull(c);
- } else {
- /* XINFO STREAM <key> FULL [COUNT <count>] */
-
- /* Stream entries */
- addReplyBulkCString(c,"entries");
- streamReplyWithRange(c,s,NULL,NULL,count,0,-1,NULL,NULL,0,NULL,NULL);
-
- /* Consumer groups */
- addReplyBulkCString(c,"groups");
- if (s->cgroups == NULL) {
- addReplyArrayLen(c,0);
- } else {
- addReplyArrayLen(c,raxSize(s->cgroups));
- raxIterator ri_cgroups;
- raxStart(&ri_cgroups,s->cgroups);
- raxSeek(&ri_cgroups,"^",NULL,0);
- while(raxNext(&ri_cgroups)) {
- streamCG *cg = ri_cgroups.data;
- addReplyMapLen(c,7);
-
- /* Name */
- addReplyBulkCString(c,"name");
- addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len);
-
- /* Last delivered ID */
- addReplyBulkCString(c,"last-delivered-id");
- addReplyStreamID(c,&cg->last_id);
-
- /* Read counter of the last delivered ID */
- addReplyBulkCString(c,"entries-read");
- if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
- addReplyLongLong(c,cg->entries_read);
- } else {
- addReplyNull(c);
- }
-
- /* Group lag */
- addReplyBulkCString(c,"lag");
- streamReplyWithCGLag(c,s,cg);
-
- /* Group PEL count */
- addReplyBulkCString(c,"pel-count");
- addReplyLongLong(c,raxSize(cg->pel));
-
- /* Group PEL */
- addReplyBulkCString(c,"pending");
- long long arraylen_cg_pel = 0;
- void *arrayptr_cg_pel = addReplyDeferredLen(c);
- raxIterator ri_cg_pel;
- raxStart(&ri_cg_pel,cg->pel);
- raxSeek(&ri_cg_pel,"^",NULL,0);
- while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) {
- streamNACK *nack = ri_cg_pel.data;
- addReplyArrayLen(c,4);
-
- /* Entry ID. */
- streamID id;
- streamDecodeID(ri_cg_pel.key,&id);
- addReplyStreamID(c,&id);
-
- /* Consumer name. */
- serverAssert(nack->consumer); /* assertion for valgrind (avoid NPD) */
- addReplyBulkCBuffer(c,nack->consumer->name,
- sdslen(nack->consumer->name));
-
- /* Last delivery. */
- addReplyLongLong(c,nack->delivery_time);
-
- /* Number of deliveries. */
- addReplyLongLong(c,nack->delivery_count);
-
- arraylen_cg_pel++;
- }
- setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel);
- raxStop(&ri_cg_pel);
-
- /* Consumers */
- addReplyBulkCString(c,"consumers");
- addReplyArrayLen(c,raxSize(cg->consumers));
- raxIterator ri_consumers;
- raxStart(&ri_consumers,cg->consumers);
- raxSeek(&ri_consumers,"^",NULL,0);
- while(raxNext(&ri_consumers)) {
- streamConsumer *consumer = ri_consumers.data;
- addReplyMapLen(c,5);
-
- /* Consumer name */
- addReplyBulkCString(c,"name");
- addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
-
- /* Seen-time */
- addReplyBulkCString(c,"seen-time");
- addReplyLongLong(c,consumer->seen_time);
-
- /* Active-time */
- addReplyBulkCString(c,"active-time");
- addReplyLongLong(c,consumer->active_time);
-
- /* Consumer PEL count */
- addReplyBulkCString(c,"pel-count");
- addReplyLongLong(c,raxSize(consumer->pel));
-
- /* Consumer PEL */
- addReplyBulkCString(c,"pending");
- long long arraylen_cpel = 0;
- void *arrayptr_cpel = addReplyDeferredLen(c);
- raxIterator ri_cpel;
- raxStart(&ri_cpel,consumer->pel);
- raxSeek(&ri_cpel,"^",NULL,0);
- while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) {
- streamNACK *nack = ri_cpel.data;
- addReplyArrayLen(c,3);
-
- /* Entry ID. */
- streamID id;
- streamDecodeID(ri_cpel.key,&id);
- addReplyStreamID(c,&id);
-
- /* Last delivery. */
- addReplyLongLong(c,nack->delivery_time);
-
- /* Number of deliveries. */
- addReplyLongLong(c,nack->delivery_count);
-
- arraylen_cpel++;
- }
- setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel);
- raxStop(&ri_cpel);
- }
- raxStop(&ri_consumers);
- }
- raxStop(&ri_cgroups);
- }
- }
- if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
- updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
-}
-
-/* XINFO CONSUMERS <key> <group>
- * XINFO GROUPS <key>
- * XINFO STREAM <key> [FULL [COUNT <count>]]
- * XINFO HELP. */
-void xinfoCommand(client *c) {
- stream *s = NULL;
- char *opt;
- robj *key;
-
- /* HELP is special. Handle it ASAP. */
- if (!strcasecmp(c->argv[1]->ptr,"HELP")) {
- const char *help[] = {
-"CONSUMERS <key> <groupname>",
-" Show consumers of <groupname>.",
-"GROUPS <key>",
-" Show the stream consumer groups.",
-"STREAM <key> [FULL [COUNT <count>]",
-" Show information about the stream.",
-NULL
- };
- addReplyHelp(c, help);
- return;
- }
-
- /* With the exception of HELP handled before any other sub commands, all
- * the ones are in the form of "<subcommand> <key>". */
- opt = c->argv[1]->ptr;
- key = c->argv[2];
-
- /* Lookup the key now, this is common for all the subcommands but HELP. */
- kvobj *kv = lookupKeyReadOrReply(c, key, shared.nokeyerr);
- if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return;
- s = kv->ptr;
-
- /* Dispatch the different subcommands. */
- if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
- /* XINFO CONSUMERS <key> <group>. */
- streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
- if (cg == NULL) {
- addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
- "for key name '%s'",
- (char*)c->argv[3]->ptr, (char*)key->ptr);
- return;
- }
-
- addReplyArrayLen(c,raxSize(cg->consumers));
- raxIterator ri;
- raxStart(&ri,cg->consumers);
- raxSeek(&ri,"^",NULL,0);
- mstime_t now = commandTimeSnapshot();
- while(raxNext(&ri)) {
- streamConsumer *consumer = ri.data;
- mstime_t inactive = consumer->active_time != -1 ? now - consumer->active_time : consumer->active_time;
- mstime_t idle = now - consumer->seen_time;
- if (idle < 0) idle = 0;
-
- addReplyMapLen(c,4);
- addReplyBulkCString(c,"name");
- addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
- addReplyBulkCString(c,"pending");
- addReplyLongLong(c,raxSize(consumer->pel));
- addReplyBulkCString(c,"idle");
- addReplyLongLong(c,idle);
- addReplyBulkCString(c,"inactive");
- addReplyLongLong(c,inactive);
- }
- raxStop(&ri);
- } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
- /* XINFO GROUPS <key>. */
- if (s->cgroups == NULL) {
- addReplyArrayLen(c,0);
- return;
- }
-
- addReplyArrayLen(c,raxSize(s->cgroups));
- raxIterator ri;
- raxStart(&ri,s->cgroups);
- raxSeek(&ri,"^",NULL,0);
- while(raxNext(&ri)) {
- streamCG *cg = ri.data;
- addReplyMapLen(c,6);
- addReplyBulkCString(c,"name");
- addReplyBulkCBuffer(c,ri.key,ri.key_len);
- addReplyBulkCString(c,"consumers");
- addReplyLongLong(c,raxSize(cg->consumers));
- addReplyBulkCString(c,"pending");
- addReplyLongLong(c,raxSize(cg->pel));
- addReplyBulkCString(c,"last-delivered-id");
- addReplyStreamID(c,&cg->last_id);
- addReplyBulkCString(c,"entries-read");
- if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
- addReplyLongLong(c,cg->entries_read);
- } else {
- addReplyNull(c);
- }
- addReplyBulkCString(c,"lag");
- streamReplyWithCGLag(c,s,cg);
- }
- raxStop(&ri);
- } else if (!strcasecmp(opt,"STREAM")) {
- /* XINFO STREAM <key> [FULL [COUNT <count>]]. */
- xinfoReplyWithStreamInfo(c,s);
- } else {
- addReplySubcommandSyntaxError(c);
- }
-}
-
-/* XCFGSET <key> [IDMP-DURATION <duration>] [IDMP-MAXSIZE <maxsize>] */
-void xcfgsetCommand(client *c) {
- robj *key = c->argv[1];
-
- /* Lookup the stream key */
- kvobj *kv = lookupKeyWriteOrReply(c,key,shared.nokeyerr);
- if (kv == NULL || checkType(c,kv,OBJ_STREAM)) return;
- stream *s = kv->ptr;
-
- /* XCFGSET <key> [IDMP-DURATION <duration>] [IDMP-MAXSIZE <maxsize>] */
- long long duration = -1;
- long long maxsize = -1;
-
- /* Parse parameters */
- for (int i = 2; i < c->argc; i++) {
- int moreargs = c->argc - i - 1;
- char *param = c->argv[i]->ptr;
- if (!strcasecmp(param,"IDMP-DURATION") && moreargs) {
- if (duration != -1) {
- addReplyError(c,"IDMP-DURATION specified multiple times");
- return;
- }
- i++;
- if (getLongLongFromObjectOrReply(c,c->argv[i],&duration,NULL) != C_OK)
- return;
- if (duration < CONFIG_STREAM_IDMP_MIN_DURATION ||
- duration > CONFIG_STREAM_IDMP_MAX_DURATION) {
- addReplyErrorFormat(c,"IDMP-DURATION must be between %d and %d seconds",
- CONFIG_STREAM_IDMP_MIN_DURATION,CONFIG_STREAM_IDMP_MAX_DURATION);
- return;
- }
- } else if (!strcasecmp(param,"IDMP-MAXSIZE") && moreargs) {
- if (maxsize != -1) {
- addReplyError(c,"IDMP-MAXSIZE specified multiple times");
- return;
- }
- i++;
- if (getLongLongFromObjectOrReply(c,c->argv[i],&maxsize,NULL) != C_OK)
- return;
- if (maxsize < CONFIG_STREAM_IDMP_MIN_MAXSIZE ||
- maxsize > CONFIG_STREAM_IDMP_MAX_MAXSIZE) {
- addReplyErrorFormat(c,"IDMP-MAXSIZE must be between %d and %d entries",
- CONFIG_STREAM_IDMP_MIN_MAXSIZE,CONFIG_STREAM_IDMP_MAX_MAXSIZE);
- return;
- }
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
- }
-
- /* At least one parameter must be specified */
- if (duration == -1 && maxsize == -1) {
- addReplyError(c,"At least one parameter must be specified");
- return;
- }
-
- /* Track if we made any changes */
- int changed = 0;
-
- /* Update the stream configuration. When we set IDMP-DURATION or IDMP-MAXSIZE to a
- * different value, we clear all existing producer IDMP maps for the stream.
- * If the value is the same, we don't clear to allow multiple publishers
- * to call this before starting to publish without clearing each time. */
- if (duration != -1 && s->idmp_duration != (uint64_t)duration) {
- s->idmp_duration = duration;
- streamClearIdmpEntries(s);
- changed = 1;
- }
- if (maxsize != -1 && s->idmp_max_entries != (uint64_t)maxsize) {
- s->idmp_max_entries = maxsize;
- streamClearIdmpEntries(s);
- changed = 1;
- }
-
- /* Mark the key as dirty for replication only if we changed something */
- if (changed) {
- keyModified(c,c->db,key,kv,0);
- server.dirty++;
- }
- addReply(c,shared.ok);
-}
-
-/* Validate the integrity stream listpack entries structure. Both in term of a
- * valid listpack, but also that the structure of the entries matches a valid
- * stream. return 1 if valid 0 if not valid. */
-int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) {
- int valid_record;
- unsigned char *p, *next;
-
- /* Since we don't want to run validation of all records twice, we'll
- * run the listpack validation of just the header and do the rest here. */
- if (!lpValidateIntegrity(lp, size, 0, NULL, NULL))
- return 0;
-
- /* In non-deep mode we just validated the listpack header (encoded size) */
- if (!deep) return 1;
-
- next = p = lpValidateFirst(lp);
- if (!lpValidateNext(lp, &next, size)) return 0;
- if (!p) return 0;
-
- /* entry count */
- int64_t entry_count = lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
-
- /* deleted */
- int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
-
- /* num-of-fields */
- int64_t master_fields = lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
-
- /* the field names */
- for (int64_t j = 0; j < master_fields; j++) {
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
- }
-
- /* the zero master entry terminator. */
- int64_t zero = lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record || zero != 0) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
-
- entry_count += deleted_count;
- while (entry_count--) {
- if (!p) return 0;
- int64_t fields = master_fields, extra_fields = 3;
- int64_t flags = lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
-
- /* entry id */
- lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
- lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
-
- if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
- /* num-of-fields */
- fields = lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
-
- /* the field names */
- for (int64_t j = 0; j < fields; j++) {
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
- }
-
- extra_fields += fields + 1;
- }
-
- /* the values */
- for (int64_t j = 0; j < fields; j++) {
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
- }
-
- /* lp-count */
- int64_t lp_count = lpGetIntegerIfValid(p, &valid_record);
- if (!valid_record) return 0;
- if (lp_count != fields + extra_fields) return 0;
- p = next; if (!lpValidateNext(lp, &next, size)) return 0;
- }
-
- if (next)
- return 0;
-
- return 1;
-}
-
-/* Convert the specified pelTimeKey as a 192 bit big endian number, so
- * that the key can be sorted lexicographically. */
-void encodePelTimeKey(void *buf, pelTimeKey *key) {
- uint64_t e[3];
- e[0] = htonu64(key->delivery_time);
- e[1] = htonu64(key->id.ms);
- e[2] = htonu64(key->id.seq);
- memcpy(buf,e,sizeof(e));
-}
-
-/* This is the reverse of encodePelTimeKey(): the decoded key will be stored
- * in the 'key' structure passed by reference. The buffer 'buf' must point
- * to a 192 bit big-endian encoded key. */
-void decodePelTimeKey(void *buf, pelTimeKey *key) {
- uint64_t e[3];
- memcpy(e,buf,sizeof(e));
- key->delivery_time = ntohu64(e[0]);
- key->id.ms = ntohu64(e[1]);
- key->id.seq = ntohu64(e[2]);
-}
-
-/* Helper function to prepare an encoded PEL time key.
- * This encapsulates the creation and encoding of a pelTimeKey structure. */
-static inline void preparePelTimeKey(unsigned char *keyBuf, uint64_t delivery_time, streamID *id) {
- pelTimeKey timeKey;
- timeKey.delivery_time = delivery_time;
- timeKey.id = *id;
- encodePelTimeKey(keyBuf, &timeKey);
-}
-
-/* Helper function to insert a NACK into the PEL by time index.
- * This encapsulates the encoding and insertion into the pel_by_time rax tree. */
-void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) {
- unsigned char keyBuf[sizeof(pelTimeKey)];
- preparePelTimeKey(keyBuf, delivery_time, id);
- raxInsert(pel_by_time, keyBuf, sizeof(keyBuf), NULL, NULL);
-}
-
-/* Helper function to remove a NACK from the PEL by time index.
- * This encapsulates the encoding and removal from the pel_by_time rax tree. */
-void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) {
- unsigned char keyBuf[sizeof(pelTimeKey)];
- preparePelTimeKey(keyBuf, delivery_time, id);
- raxRemove(pel_by_time, keyBuf, sizeof(keyBuf), NULL);
-}
-
-/* Register stream keys for monitoring of expired pending entries to enable
- * reactive blocking behavior for XREADGROUP commands with CLAIM. When a client
- * blocks waiting for either new messages or expired pending entries, this
- * function records the earliest timestamp when pending entries will expire
- * (satisfy the min-idle-time requirement).
- *
- * For multi-client coordination, when multiple clients are blocked on the same
- * stream with different min-idle-time values, the dictionary stores the minimum
- * (earliest) expire_time across all clients to ensure the earliest possible
- * wakeup when any pending entry expires and becomes available for claiming.
- *
- * 'c' is the client that is blocking on the stream(s).
- * 'keys' is an array of stream key objects to monitor.
- * 'numkeys' is the number of keys in the array.
- * 'expire_time' is the absolute timestamp (in milliseconds) when the next
- * pending entry will expire for this client, calculated as
- * next_delivery_time + min_idle_time, where next_delivery_time is the
- * delivery timestamp of the oldest pending entry in the stream.
- *
- * For new entries, the key is added with the given expire_time and the
- * reference count is incremented. For existing entries, the expire_time
- * is updated to the minimum value if the new expire_time is earlier,
- * ensuring the earliest wakeup time is preserved for multi-client scenarios.
- * Note that the reference count is only incremented for newly added keys,
- * not for updates to existing entries. */
-void trackStreamClaimTimeouts(client *c, robj **keys, int numkeys, uint64_t expire_time) {
- dictEntry *db_watch_entry, *db_watch_existing_entry;
- uint64_t old_expire_time;
- int j;
-
- for (j = 0; j < numkeys; j++) {
- db_watch_entry = dictAddRaw(c->db->stream_claim_pending_keys, keys[j], &db_watch_existing_entry);
- if (db_watch_entry != NULL) {
- dictSetUnsignedIntegerVal(db_watch_entry, expire_time);
- incrRefCount(keys[j]);
- } else {
- old_expire_time = dictGetUnsignedIntegerVal(db_watch_existing_entry);
- if (expire_time < old_expire_time) {
- dictSetUnsignedIntegerVal(db_watch_existing_entry, expire_time);
- }
- }
- }
-}
-
-/* Check and wake clients waiting for expired pending entries. This function
- * is invoked regularly from blockedBeforeSleep() to monitor streams being
- * watched for expired pending entries and wake up blocked clients when
- * entries expire and become available for claiming.
- *
- * The function processes up to CRON_DBS_PER_CALL databases per call in a
- * round-robin fashion, cycling through all databases over multiple invocations.
- * For each database, it iterates through the stream_claim_pending_keys dictionary.
- * For each watched stream, it compares the registered expire_time against the
- * current server time. When expire_time is less than the current server time,
- * the pending entry has expired and the stream is signaled as ready via
- * signalKeyAsReady(), which wakes all blocked clients waiting on that stream.
- * The entry is then removed from stream_claim_pending_keys. */
-void handleClaimableStreamEntries(void) {
- static unsigned int current_db = 0;
- int dbs_per_call = CRON_DBS_PER_CALL;
- int j;
-
- if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
-
- for (j = 0; j < dbs_per_call; j++) {
- redisDb *db = &server.db[current_db % server.dbnum];
- current_db++;
-
- if (dictIsEmpty(db->stream_claim_pending_keys))
- continue;
-
- dictEntry *de;
- dictIterator di;
- dictInitSafeIterator(&di, db->stream_claim_pending_keys);
- while ((de = dictNext(&di)) != NULL) {
- robj *key = dictGetKey(de);
- uint64_t expire_time = dictGetUnsignedIntegerVal(de);
- kvobj *kv = dbFind(db, key->ptr);
-
- if (!kv || kv->type != OBJ_STREAM) {
- dictDelete(db->stream_claim_pending_keys, key);
- continue;
- }
-
- if (expire_time < (uint64_t)server.mstime) {
- signalKeyAsReady(db, key, kv->type);
- dictDelete(db->stream_claim_pending_keys, key);
- }
- }
- dictResetIterator(&di);
- }
-}
-
-/* -----------------------------------------------------------------------
- * IDMP (Idempotent Message Producer) Functions
- * ----------------------------------------------------------------------- */
-
-/* Hash function for idmpEntry - hashes the embedded iid buffer */
-static uint64_t idmpDictHashFunction(const void *key) {
- const idmpEntry *entry = (const idmpEntry *)key;
- return dictGenHashFunction((const char *)entry->iid, entry->iid_len);
-}
-
-/* Key comparison function for idmpEntry - compares embedded iid buffers */
-static int idmpDictKeyCompare(dictCmpCache *cache, const void *key1, const void *key2) {
- UNUSED(cache);
- const idmpEntry *e1 = (const idmpEntry *)key1;
- const idmpEntry *e2 = (const idmpEntry *)key2;
- if (e1->iid_len != e2->iid_len) return 0;
- return memcmp((const char *)e1->iid, (const char *)e2->iid, e1->iid_len) == 0;
-}
-
-/* Dictionary type for IDMP entries - keys are idmpEntry pointers, values are NULL */
-dictType idmpDictType = {
- idmpDictHashFunction, /* hash function */
- NULL, /* key dup */
- NULL, /* val dup */
- idmpDictKeyCompare, /* key compare */
- NULL, /* key destructor - handled manually with linked list */
- NULL, /* val destructor */
- NULL, /* resize allowed */
- NULL, /* rehashing started */
- NULL, /* rehashing completed */
- NULL, /* bucket changed */
- NULL, /* dict metadata bytes */
- NULL, /* userdata */
- .no_value = 0, /* Use regular dict entries with NULL values to support defrag */
- .keys_are_odd = 0, /* keys are not odd */
- .force_full_rehash = 0, /* no force full rehash */
- NULL, /* key from stored key */
- NULL, /* on dict release */
-};
-
-/* Create a new idmpEntry with the given IID string.
- * The entry and IID are allocated together using flexible array member.
- * alloc_size must not be NULL and will be updated with the allocation size. */
-idmpEntry *idmpEntryCreate(const char *iid, size_t iid_len, size_t *alloc_size) {
- size_t usable;
- idmpEntry *entry = zmalloc_usable(sizeof(idmpEntry) + iid_len, &usable);
-
- entry->next = NULL;
- entry->iid_len = iid_len;
- memcpy(entry->iid, iid, iid_len);
-
- *alloc_size += usable;
-
- return entry;
-}
-
-/* Free an idmpEntry (iid is embedded via flexible array member).
- * alloc_size must not be NULL and will be updated with the freed size. */
-void idmpEntryFree(idmpEntry *entry, size_t *alloc_size) {
- if (entry == NULL) return;
-
- size_t usable;
- zfree_usable(entry, &usable);
- *alloc_size -= usable;
-}
-
-/* Create a new idmpProducer with an empty dict and linked list.
- * alloc_size must not be NULL and will be updated with the allocation size. */
-idmpProducer *idmpProducerCreate(size_t *alloc_size) {
- size_t usable;
- idmpProducer *producer = zmalloc_usable(sizeof(idmpProducer), &usable);
- producer->idmp_dict = dictCreate(&idmpDictType);
- producer->idmp_head = NULL;
- producer->idmp_tail = NULL;
-
- *alloc_size += usable;
-
- return producer;
-}
-
-/* Free an idmpProducer including its dict and all linked list entries.
- * alloc_size must not be NULL and will be updated with the freed size. */
-void idmpProducerFree(idmpProducer *producer, size_t *alloc_size) {
- if (producer == NULL) return;
-
- /* Release the dict */
- if (producer->idmp_dict)
- dictRelease(producer->idmp_dict);
-
- /* Free IDMP linked list entries */
- idmpEntry *entry = producer->idmp_head;
- while (entry) {
- idmpEntry *next = entry->next;
- idmpEntryFree(entry, alloc_size);
- entry = next;
- }
-
- size_t usable;
- zfree_usable(producer, &usable);
- *alloc_size -= usable;
-}
-
-/* Check if an IID already exists in the producer's idmp_dict.
- * If found, sends the existing stream ID as a reply and returns 1.
- * Returns 0 if the IID was not found.
- *
- * The 'entry' parameter should be an idmpEntry with the IID already set
- * (iid and iid_len fields must be initialized). */
-static int idmpLookupAndReply(stream *s, idmpProducer *producer, idmpEntry *entry, client *c) {
- dictEntry *de = dictFind(producer->idmp_dict, entry);
- if (de != NULL) {
- /* IID already exists, return the existing stream ID */
- idmpEntry *existing = (idmpEntry *)dictGetKey(de);
- addReplyStreamID(c, &existing->id);
- s->iids_duplicates++;
- return 1;
- }
- return 0;
-}
-
-/* Insert an idmpEntry into the producer's dict and linked list with the given stream ID. */
-static void idmpInsertEntry(stream *s, idmpProducer *producer, idmpEntry *entry, const streamID *id) {
- /* Set the stream ID and initialize next pointer */
- entry->next = NULL;
- entry->id = *id;
-
- /* Insert into dict (should always succeed since we already checked with lookup) */
- serverAssert(dictAdd(producer->idmp_dict, entry, NULL) == DICT_OK);
-
- /* Add to linked list tail */
- if (producer->idmp_tail == NULL) {
- producer->idmp_head = producer->idmp_tail = entry;
- } else {
- producer->idmp_tail->next = entry;
- producer->idmp_tail = entry;
- }
-
- s->iids_added++;
-
- /* Remove oldest entry if exceeding max entries */
- idmpEvictOldestEntry(s, producer);
-}
-
-/* Get or create an idmpProducer for the given producer ID.
- * Returns the producer, or NULL on allocation failure. */
-static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t pid_len) {
- /* Create the producers rax tree if it doesn't exist */
- if (s->idmp_producers == NULL) {
- s->idmp_producers = raxNew();
- }
-
- /* Look up the producer */
- idmpProducer *producer = NULL;
- int found = raxFind(s->idmp_producers, (unsigned char *)pid, pid_len, (void **)&producer);
- if (!found) {
- /* Create a new producer */
- producer = idmpProducerCreate(&s->alloc_size);
- /* Insert into the rax tree - must succeed since we checked it doesn't exist */
- serverAssert(raxInsert(s->idmp_producers, (unsigned char *)pid, pid_len, producer, NULL));
- }
-
- return producer;
-}
-
-/* Register a stream key for IDMP entry tracking.
- * This registers a stream key in the database's stream_idmp_keys dictionary,
- * allowing the cron job handleExpiredIdmpEntries() to periodically check
- * and clean up expired idempotency entries from the stream's idmp_dict.
- *
- * 'c' is the client that is performing the XADD operation with IDMP.
- * 'key' is the stream key object to track.
- *
- * If the key is not already tracked, it is added to stream_idmp_keys and its
- * reference count is incremented. If the key is already being tracked (added
- * by a previous XADD operation), this function does nothing, as the stream
- * is already registered for periodic cleanup. */
-static void trackStreamIdmpEntries(client *c, robj *key) {
- if (dictAddRaw(c->db->stream_idmp_keys, key, NULL)) {
- incrRefCount(key);
- }
-}
-
-/* Clean up expired idempotency entries from tracked streams. This function
- * is invoked regularly from serverCron() to remove expired entries
- * from the idmp_dict of streams that have idempotency tracking enabled,
- * keeping memory usage under control.
- *
- * The function processes up to CRON_DBS_PER_CALL databases per call in a
- * round-robin fashion, cycling through all databases over multiple invocations.
- * For each database, it iterates through the stream_idmp_keys dictionary.
- * For each tracked stream, it compares the timestamp of entries in the stream's
- * idmp linked list against the expiration threshold (current time - idmp_duration).
- * Entries with timestamps older than the threshold are removed from the head
- * of the linked list. When all entries have been removed and the list becomes empty,
- * the stream key is removed from stream_idmp_keys to stop tracking it. */
-void handleExpiredIdmpEntries(void) {
- static unsigned int current_db = 0;
- int dbs_per_call = CRON_DBS_PER_CALL;
- int j;
-
- if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
-
- for (j = 0; j < dbs_per_call; j++) {
- redisDb *db = &server.db[current_db % server.dbnum];
- current_db++;
-
- if (dictIsEmpty(db->stream_idmp_keys))
- continue;
-
- dictEntry *de;
- dictIterator di;
- dictInitSafeIterator(&di, db->stream_idmp_keys);
- while ((de = dictNext(&di)) != NULL) {
- robj *key = dictGetKey(de);
- kvobj *kv = dbFind(db, key->ptr);
-
- if (!kv || kv->type != OBJ_STREAM) {
- dictDelete(db->stream_idmp_keys, key);
- continue;
- }
-
- stream *s = kv->ptr;
- uint64_t expire_time = server.mstime - (s->idmp_duration * 1000);
-
- /* Skip if no producers */
- if (s->idmp_producers == NULL) {
- dictDelete(db->stream_idmp_keys, key);
- continue;
- }
-
- /* Iterate through all producers and remove expired entries */
- raxIterator ri;
- raxStart(&ri, s->idmp_producers);
- raxSeek(&ri, "^", NULL, 0);
- while (raxNext(&ri)) {
- idmpProducer *producer = ri.data;
-
- /* Remove expired entries from the head of this producer's linked list */
- while (producer->idmp_head != NULL) {
- idmpEntry *entry = producer->idmp_head;
- if (entry->id.ms <= expire_time) {
- /* Remove from dict */
- dictDelete(producer->idmp_dict, entry);
- /* Remove from linked list head */
- producer->idmp_head = entry->next;
- if (producer->idmp_head == NULL) {
- producer->idmp_tail = NULL;
- }
- /* Free the entry */
- idmpEntryFree(entry, &s->alloc_size);
- } else {
- break;
- }
- }
-
- /* If this producer has no entries left, remove it from the rax tree */
- if (producer->idmp_head == NULL) {
- raxRemove(s->idmp_producers, ri.key, ri.key_len, NULL);
- idmpProducerFree(producer, &s->alloc_size);
- raxSeek(&ri, ">=", ri.key, ri.key_len);
- }
- }
- raxStop(&ri);
-
- /* If no producers remain, free the entire rax tree */
- if (raxSize(s->idmp_producers) == 0) {
- raxFree(s->idmp_producers);
- s->idmp_producers = NULL;
- dictDelete(db->stream_idmp_keys, key);
- continue;
- }
- }
- dictResetIterator(&di);
- }
-}
-
-/* 64-bit left rotation helper for hash combination */
-static inline uint64_t rotl64(uint64_t x, int r) {
- return (x << r) | (x >> (64 - r));
-}
-
-/* Hash field-value pairs using XXH3_128bits for AUTOIDMP. The function takes
- * an array of robj pointers in 'argv' representing field-value pairs (field1,
- * value1, field2, value2, ...) and 'numfields' indicating the number of pairs
- * (not the array length). Each field-value pair is hashed using streaming
- * XXH3_128bits with the field length included as a separator to prevent hash
- * collisions from ambiguous concatenations. The resulting pair hashes are
- * combined using an order-independent Sum + XOR approach with rotation to
- * produce a final 128-bit hash stored in 'out_hash'. Returns C_OK on success,
- * C_ERR on error. XXH128 is a non-cryptographic hash function: fast and
- * well-distributed, but does NOT prevent intentional collision attacks. */
-static int createIdempotencyHash(robj **argv, int64_t numfields, XXH128_hash_t *out_hash) {
- uint64_t sum_lo = 0, sum_hi = 0;
- uint64_t xor_lo = 0, xor_hi = 0;
- XXH3_state_t* state = XXH3_createState();
- if (state == NULL) return C_ERR;
-
- char llbuf[LONG_STR_SIZE];
- XXH_errorcode err;
-
- /* Process each field-value pair */
- for (int64_t i = 0; i < numfields; i++) {
- robj *field = argv[i * 2];
- robj *value = argv[i * 2 + 1];
-
- /* Initialize hash state for this pair */
- err = XXH3_128bits_reset(state);
- if (err != XXH_OK) goto cleanup;
-
- /* Hash the field */
- long field_len;
- unsigned char *field_data = getObjectReadOnlyString(field, &field_len, llbuf);
- err = XXH3_128bits_update(state, field_data, field_len);
- if (err != XXH_OK) goto cleanup;
-
- /* Hash the field length as separator to prevent collisions */
- err = XXH3_128bits_update(state, &field_len, sizeof(field_len));
- if (err != XXH_OK) goto cleanup;
-
- /* Hash the value */
- long value_len;
- unsigned char *value_data = getObjectReadOnlyString(value, &value_len, llbuf);
- err = XXH3_128bits_update(state, value_data, value_len);
- if (err != XXH_OK) goto cleanup;
-
- /* Get the hash for this pair */
- XXH128_hash_t pair_hash = XXH3_128bits_digest(state);
-
- /* Accumulate with both sum and xor for order-independent combination */
- sum_lo += pair_hash.low64;
- sum_hi += pair_hash.high64;
- xor_lo ^= pair_hash.low64;
- xor_hi ^= pair_hash.high64;
- }
-
- /* Combine sum and xor with rotation for better distribution */
- XXH128_hash_t hash_result;
- hash_result.low64 = sum_lo ^ rotl64(xor_hi, 1);
- hash_result.high64 = sum_hi ^ rotl64(xor_lo, 1);
-
- XXH3_freeState(state);
- *out_hash = hash_result;
- return C_OK;
-
-cleanup:
- XXH3_freeState(state);
- return C_ERR;
-}
-
-/* Clear all IDMP entries from a stream - free all producers and their entries */
-static void streamClearIdmpEntries(stream *s) {
- if (s->idmp_producers == NULL) return;
-
- /* Iterate through all producers and free them */
- raxIterator ri;
- raxStart(&ri, s->idmp_producers);
- raxSeek(&ri, "^", NULL, 0);
- while (raxNext(&ri)) {
- idmpProducerFree(ri.data, &s->alloc_size);
- }
- raxStop(&ri);
-
- /* Free the producers rax tree and reset */
- raxFree(s->idmp_producers);
- s->idmp_producers = NULL;
-}
-
-/* Evict the oldest entry from the IDMP producer when max entries is exceeded.
- * This function checks if the number of entries exceeds the stream's max limit,
- * and if so, removes the oldest entry from the producer's linked list and
- * dictionary, maintaining the integrity of both data structures. If the list
- * becomes empty after removal, both head and tail pointers are set to NULL. */
-static void idmpEvictOldestEntry(stream *s, idmpProducer *producer) {
- if (dictSize(producer->idmp_dict) <= s->idmp_max_entries) {
- return;
- }
-
- idmpEntry *oldest = producer->idmp_head;
- producer->idmp_head = oldest->next;
- if (producer->idmp_head == NULL) {
- producer->idmp_tail = NULL;
- }
- dictDelete(producer->idmp_dict, oldest);
- idmpEntryFree(oldest, &s->alloc_size);
-}