summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/stream.h
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
commitdcacc00e3750300617ba6e16eb346713f91a783a (patch)
tree38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/stream.h
parent58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff)
downloadcrep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/stream.h')
-rw-r--r--examples/redis-unstable/src/stream.h208
1 files changed, 0 insertions, 208 deletions
diff --git a/examples/redis-unstable/src/stream.h b/examples/redis-unstable/src/stream.h
deleted file mode 100644
index 8112e38..0000000
--- a/examples/redis-unstable/src/stream.h
+++ /dev/null
@@ -1,208 +0,0 @@
-#ifndef STREAM_H
-#define STREAM_H
-
-#include "rax.h"
-#include "listpack.h"
-#include "dict.h"
-#include "xxhash.h"
-
-/* Stream item ID: a 128 bit number composed of a milliseconds time and
- * a sequence counter. IDs generated in the same millisecond (or in a past
- * millisecond if the clock jumped backward) will use the millisecond time
- * of the latest generated ID and an incremented sequence. */
-typedef struct streamID {
- uint64_t ms; /* Unix time in milliseconds. */
- uint64_t seq; /* Sequence number. */
-} streamID;
-
-/* Structure to hold IID and stream ID for IDMP deduplication */
-typedef struct idmpEntry {
- struct idmpEntry *next; /* Pointer to next entry in insertion order (linked list) */
- streamID id; /* Associated stream ID */
- size_t iid_len; /* Length of the IID */
- char iid[]; /* Flexible array member for inline IID storage */
-} idmpEntry;
-
-/* IDMP Producer structure for per-producer deduplication tracking */
-typedef struct idmpProducer {
- dict *idmp_dict; /* IDMP IID tracking tree. */
- idmpEntry *idmp_head; /* Head of the IDMP entries linked list. */
- idmpEntry *idmp_tail; /* Tail of the IDMP entries linked list. */
-} idmpProducer;
-
-/* Dictionary type for IDMP entries - uses IID as key */
-extern dictType idmpDictType;
-
-typedef struct stream {
- rax *rax; /* The radix tree holding the stream. */
- uint64_t length; /* Current number of elements inside this stream. */
- streamID last_id; /* Zero if there are yet no items. */
- streamID first_id; /* The first non-tombstone entry, zero if empty. */
- streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
- uint64_t entries_added; /* All time count of elements added. */
- size_t alloc_size; /* Total allocated memory (in bytes) by this stream. */
- rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
- rax *cgroups_ref; /* Index mapping message IDs to their consumer groups. */
- streamID min_cgroup_last_id; /* The minimum ID of consume group. */
- unsigned int min_cgroup_last_id_valid: 1;
- uint64_t idmp_duration; /* IDMP duration in seconds. */
- uint64_t idmp_max_entries; /* Max number of IID for tracking. */
- rax *idmp_producers; /* IDMP producers radix tree: pid -> idmpProducer */
- uint64_t iids_added; /* All time count of entries with IID added. */
- uint64_t iids_duplicates; /* All time count of duplicate IIDs detected. */
-} stream;
-
-/* We define an iterator to iterate stream items in an abstract way, without
- * caring about the radix tree + listpack representation. Technically speaking
- * the iterator is only used inside streamReplyWithRange(), so could just
- * be implemented inside the function, but practically there is the AOF
- * rewriting code that also needs to iterate the stream to emit the XADD
- * commands. */
-typedef struct streamIterator {
- stream *stream; /* The stream we are iterating. */
- streamID master_id; /* ID of the master entry at listpack head. */
- uint64_t master_fields_count; /* Master entries # of fields. */
- unsigned char *master_fields_start; /* Master entries start in listpack. */
- unsigned char *master_fields_ptr; /* Master field to emit next. */
- int entry_flags; /* Flags of entry we are emitting. */
- int rev; /* True if iterating end to start (reverse). */
- int skip_tombstones; /* True if not emitting tombstone entries. */
- uint64_t start_key[2]; /* Start key as 128 bit big endian. */
- uint64_t end_key[2]; /* End key as 128 bit big endian. */
- /* Decoded native-endian fields for fast numeric comparison */
- uint64_t start_ms;
- uint64_t start_seq;
- uint64_t end_ms;
- uint64_t end_seq;
- raxIterator ri; /* Rax iterator. */
- unsigned char *lp; /* Current listpack. */
- unsigned char *lp_last_ele; /* Previous listpack element position for corruption detection. */
- unsigned char *lp_ele; /* Current listpack cursor. */
- unsigned char *lp_flags; /* Current entry flags pointer. */
- /* Buffers used to hold the string of lpGet() when the element is
- * integer encoded, so that there is no string representation of the
- * element inside the listpack itself. */
- unsigned char field_buf[LP_INTBUF_SIZE];
- unsigned char value_buf[LP_INTBUF_SIZE];
-} streamIterator;
-
-/* Consumer group. */
-typedef struct streamCG {
- streamID last_id; /* Last delivered (not acknowledged) ID for this
- group. Consumers that will just ask for more
- messages will served with IDs > than this. */
- long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
- XGROUP SETID, ...), this is the total number of
- group reads. In the real world, the reasoning behind
- this value is detailed at the top comment of
- streamEstimateDistanceFromFirstEverEntry(). */
- rax *pel; /* Pending entries list. This is a radix tree that
- has every message delivered to consumers (without
- the NOACK option) that was yet not acknowledged
- as processed. The key of the radix tree is the
- ID as a 64 bit big endian number, while the
- associated value is a streamNACK structure.*/
- rax *pel_by_time; /* A radix tree mapping delivery time to pending
- entries, so that we can query faster PEL entries
- by time. The key is a pelTimeKey structure containing
- both delivery_time and stream ID. All information is
- in the key; no value is stored. */
- rax *consumers; /* A radix tree representing the consumers by name
- and their associated representation in the form
- of streamConsumer structures. */
-} streamCG;
-
-/* A specific consumer in a consumer group. */
-typedef struct streamConsumer {
- mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */
- mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */
- sds name; /* Consumer name. This is how the consumer
- will be identified in the consumer group
- protocol. Case sensitive. */
- rax *pel; /* Consumer specific pending entries list: all
- the pending messages delivered to this
- consumer not yet acknowledged. Keys are
- big endian message IDs, while values are
- the same streamNACK structure referenced
- in the "pel" of the consumer group structure
- itself, so the value is shared. */
-} streamConsumer;
-
-/* Pending (yet not acknowledged) message in a consumer group. */
-typedef struct streamNACK {
- mstime_t delivery_time; /* Last time this message was delivered. */
- uint64_t delivery_count; /* Number of times this message was delivered.*/
- streamConsumer *consumer; /* The consumer this message was delivered to
- in the last delivery. */
- listNode *cgroup_ref_node; /* Reference to this NACK in the cgroups_ref list. */
-} streamNACK;
-
-/* Stream propagation information, passed to functions in order to propagate
- * XCLAIM commands to AOF and slaves. */
-typedef struct streamPropInfo {
- robj *keyname;
- robj *groupname;
-} streamPropInfo;
-
-/* Pending entry in the consumer group's PEL, indexed by delivery time. */
-typedef struct pelTimeKey {
- uint64_t delivery_time;
- streamID id;
-} pelTimeKey;
-
-/* Prototypes of exported APIs. */
-struct client;
-
-/* Flags for streamCreateConsumer */
-#define SCC_DEFAULT 0
-#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
-#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
-
-#define SCG_INVALID_ENTRIES_READ -1
-
-stream *streamNew(void);
-void freeStream(stream *s);
-unsigned long streamLength(const robj *subject);
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, long long min_idle_time, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount);
-void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
-int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
-void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
-void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
-void streamIteratorStop(streamIterator *si);
-streamCG *streamLookupCG(stream *s, sds groupname);
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
-streamConsumer *streamCreateConsumer(stream *s, streamCG *cg, sds name, robj *key, int dbid, int flags);
-streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
-streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer);
-void streamDecodeID(void *buf, streamID *id);
-int streamCompareID(streamID *a, streamID *b);
-void streamFreeNACK(stream *s, streamNACK *na);
-int streamIncrID(streamID *id);
-int streamDecrID(streamID *id);
-void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
-robj *streamDup(robj *o);
-int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
-int streamParseID(const robj *o, streamID *id);
-robj *createObjectFromStreamID(streamID *id);
-int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
-int streamDeleteItem(stream *s, streamID *id);
-void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
-long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
-int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
-int64_t streamTrimByID(stream *s, streamID minid, int approx);
-
-listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key);
-
-void encodePelTimeKey(void* buf, pelTimeKey *timeKey);
-void decodePelTimeKey(void *buf, pelTimeKey *timeKey);
-void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id);
-void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id);
-
-/* IDMP functions */
-idmpEntry *idmpEntryCreate(const char *iid, size_t iid_len, size_t *alloc_size);
-void idmpEntryFree(idmpEntry *entry, size_t *alloc_size);
-idmpProducer *idmpProducerCreate(size_t *alloc_size);
-void idmpProducerFree(idmpProducer *producer, size_t *alloc_size);
-void streamFreeIdmpProducerGeneric(void *producer, void *strm);
-
-#endif