diff options
Diffstat (limited to 'examples/redis-unstable/src/stream.h')
| -rw-r--r-- | examples/redis-unstable/src/stream.h | 208 |
1 files changed, 0 insertions, 208 deletions
diff --git a/examples/redis-unstable/src/stream.h b/examples/redis-unstable/src/stream.h deleted file mode 100644 index 8112e38..0000000 --- a/examples/redis-unstable/src/stream.h +++ /dev/null @@ -1,208 +0,0 @@ -#ifndef STREAM_H -#define STREAM_H - -#include "rax.h" -#include "listpack.h" -#include "dict.h" -#include "xxhash.h" - -/* Stream item ID: a 128 bit number composed of a milliseconds time and - * a sequence counter. IDs generated in the same millisecond (or in a past - * millisecond if the clock jumped backward) will use the millisecond time - * of the latest generated ID and an incremented sequence. */ -typedef struct streamID { - uint64_t ms; /* Unix time in milliseconds. */ - uint64_t seq; /* Sequence number. */ -} streamID; - -/* Structure to hold IID and stream ID for IDMP deduplication */ -typedef struct idmpEntry { - struct idmpEntry *next; /* Pointer to next entry in insertion order (linked list) */ - streamID id; /* Associated stream ID */ - size_t iid_len; /* Length of the IID */ - char iid[]; /* Flexible array member for inline IID storage */ -} idmpEntry; - -/* IDMP Producer structure for per-producer deduplication tracking */ -typedef struct idmpProducer { - dict *idmp_dict; /* IDMP IID tracking tree. */ - idmpEntry *idmp_head; /* Head of the IDMP entries linked list. */ - idmpEntry *idmp_tail; /* Tail of the IDMP entries linked list. */ -} idmpProducer; - -/* Dictionary type for IDMP entries - uses IID as key */ -extern dictType idmpDictType; - -typedef struct stream { - rax *rax; /* The radix tree holding the stream. */ - uint64_t length; /* Current number of elements inside this stream. */ - streamID last_id; /* Zero if there are yet no items. */ - streamID first_id; /* The first non-tombstone entry, zero if empty. */ - streamID max_deleted_entry_id; /* The maximal ID that was deleted. */ - uint64_t entries_added; /* All time count of elements added. */ - size_t alloc_size; /* Total allocated memory (in bytes) by this stream. */ - rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ - rax *cgroups_ref; /* Index mapping message IDs to their consumer groups. */ - streamID min_cgroup_last_id; /* The minimum ID of consume group. */ - unsigned int min_cgroup_last_id_valid: 1; - uint64_t idmp_duration; /* IDMP duration in seconds. */ - uint64_t idmp_max_entries; /* Max number of IID for tracking. */ - rax *idmp_producers; /* IDMP producers radix tree: pid -> idmpProducer */ - uint64_t iids_added; /* All time count of entries with IID added. */ - uint64_t iids_duplicates; /* All time count of duplicate IIDs detected. */ -} stream; - -/* We define an iterator to iterate stream items in an abstract way, without - * caring about the radix tree + listpack representation. Technically speaking - * the iterator is only used inside streamReplyWithRange(), so could just - * be implemented inside the function, but practically there is the AOF - * rewriting code that also needs to iterate the stream to emit the XADD - * commands. */ -typedef struct streamIterator { - stream *stream; /* The stream we are iterating. */ - streamID master_id; /* ID of the master entry at listpack head. */ - uint64_t master_fields_count; /* Master entries # of fields. */ - unsigned char *master_fields_start; /* Master entries start in listpack. */ - unsigned char *master_fields_ptr; /* Master field to emit next. */ - int entry_flags; /* Flags of entry we are emitting. */ - int rev; /* True if iterating end to start (reverse). */ - int skip_tombstones; /* True if not emitting tombstone entries. */ - uint64_t start_key[2]; /* Start key as 128 bit big endian. */ - uint64_t end_key[2]; /* End key as 128 bit big endian. */ - /* Decoded native-endian fields for fast numeric comparison */ - uint64_t start_ms; - uint64_t start_seq; - uint64_t end_ms; - uint64_t end_seq; - raxIterator ri; /* Rax iterator. */ - unsigned char *lp; /* Current listpack. */ - unsigned char *lp_last_ele; /* Previous listpack element position for corruption detection. */ - unsigned char *lp_ele; /* Current listpack cursor. */ - unsigned char *lp_flags; /* Current entry flags pointer. */ - /* Buffers used to hold the string of lpGet() when the element is - * integer encoded, so that there is no string representation of the - * element inside the listpack itself. */ - unsigned char field_buf[LP_INTBUF_SIZE]; - unsigned char value_buf[LP_INTBUF_SIZE]; -} streamIterator; - -/* Consumer group. */ -typedef struct streamCG { - streamID last_id; /* Last delivered (not acknowledged) ID for this - group. Consumers that will just ask for more - messages will served with IDs > than this. */ - long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no - XGROUP SETID, ...), this is the total number of - group reads. In the real world, the reasoning behind - this value is detailed at the top comment of - streamEstimateDistanceFromFirstEverEntry(). */ - rax *pel; /* Pending entries list. This is a radix tree that - has every message delivered to consumers (without - the NOACK option) that was yet not acknowledged - as processed. The key of the radix tree is the - ID as a 64 bit big endian number, while the - associated value is a streamNACK structure.*/ - rax *pel_by_time; /* A radix tree mapping delivery time to pending - entries, so that we can query faster PEL entries - by time. The key is a pelTimeKey structure containing - both delivery_time and stream ID. All information is - in the key; no value is stored. */ - rax *consumers; /* A radix tree representing the consumers by name - and their associated representation in the form - of streamConsumer structures. */ -} streamCG; - -/* A specific consumer in a consumer group. */ -typedef struct streamConsumer { - mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */ - mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */ - sds name; /* Consumer name. This is how the consumer - will be identified in the consumer group - protocol. Case sensitive. */ - rax *pel; /* Consumer specific pending entries list: all - the pending messages delivered to this - consumer not yet acknowledged. Keys are - big endian message IDs, while values are - the same streamNACK structure referenced - in the "pel" of the consumer group structure - itself, so the value is shared. */ -} streamConsumer; - -/* Pending (yet not acknowledged) message in a consumer group. */ -typedef struct streamNACK { - mstime_t delivery_time; /* Last time this message was delivered. */ - uint64_t delivery_count; /* Number of times this message was delivered.*/ - streamConsumer *consumer; /* The consumer this message was delivered to - in the last delivery. */ - listNode *cgroup_ref_node; /* Reference to this NACK in the cgroups_ref list. */ -} streamNACK; - -/* Stream propagation information, passed to functions in order to propagate - * XCLAIM commands to AOF and slaves. */ -typedef struct streamPropInfo { - robj *keyname; - robj *groupname; -} streamPropInfo; - -/* Pending entry in the consumer group's PEL, indexed by delivery time. */ -typedef struct pelTimeKey { - uint64_t delivery_time; - streamID id; -} pelTimeKey; - -/* Prototypes of exported APIs. */ -struct client; - -/* Flags for streamCreateConsumer */ -#define SCC_DEFAULT 0 -#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ -#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */ - -#define SCG_INVALID_ENTRIES_READ -1 - -stream *streamNew(void); -void freeStream(stream *s); -unsigned long streamLength(const robj *subject); -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, long long min_idle_time, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount); -void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); -int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); -void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); -void streamIteratorRemoveEntry(streamIterator *si, streamID *current); -void streamIteratorStop(streamIterator *si); -streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name); -streamConsumer *streamCreateConsumer(stream *s, streamCG *cg, sds name, robj *key, int dbid, int flags); -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); -streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer); -void streamDecodeID(void *buf, streamID *id); -int streamCompareID(streamID *a, streamID *b); -void streamFreeNACK(stream *s, streamNACK *na); -int streamIncrID(streamID *id); -int streamDecrID(streamID *id); -void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); -robj *streamDup(robj *o); -int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep); -int streamParseID(const robj *o, streamID *id); -robj *createObjectFromStreamID(streamID *id); -int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given); -int streamDeleteItem(stream *s, streamID *id); -void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); -long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); -int64_t streamTrimByLength(stream *s, long long maxlen, int approx); -int64_t streamTrimByID(stream *s, streamID minid, int approx); - -listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key); - -void encodePelTimeKey(void* buf, pelTimeKey *timeKey); -void decodePelTimeKey(void *buf, pelTimeKey *timeKey); -void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id); -void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id); - -/* IDMP functions */ -idmpEntry *idmpEntryCreate(const char *iid, size_t iid_len, size_t *alloc_size); -void idmpEntryFree(idmpEntry *entry, size_t *alloc_size); -idmpProducer *idmpProducerCreate(size_t *alloc_size); -void idmpProducerFree(idmpProducer *producer, size_t *alloc_size); -void streamFreeIdmpProducerGeneric(void *producer, void *strm); - -#endif |
