diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/t_hash.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/t_hash.c')
| -rw-r--r-- | examples/redis-unstable/src/t_hash.c | 4068 |
1 files changed, 4068 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/t_hash.c b/examples/redis-unstable/src/t_hash.c new file mode 100644 index 0000000..48fc1e7 --- /dev/null +++ b/examples/redis-unstable/src/t_hash.c @@ -0,0 +1,4068 @@ +/* + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" +#include "redisassert.h" +#include "ebuckets.h" +#include "entry.h" +#include "cluster_asm.h" +#include <math.h> + +/* Threshold for HEXPIRE and HPERSIST to be considered whether it is worth to + * update the expiration time of the hash object in global HFE DS. */ +#define HASH_NEW_EXPIRE_DIFF_THRESHOLD max(4000, 1<<EB_BUCKET_KEY_PRECISION) + +/* Reserve 2 bits out of hash-field expiration time for possible future lightweight + * indexing/categorizing of fields. It can be achieved by hacking HFE as follows: + * + * HPEXPIREAT key [ 2^47 + USER_INDEX ] FIELDS numfields field [field …] + * + * Redis will also need to expose kind of HEXPIRESCAN and HEXPIRECOUNT for this + * idea. Yet to be better defined. + * + * HFE_MAX_ABS_TIME_MSEC constraint must be enforced only at API level. Internally, + * the expiration time can be up to EB_EXPIRE_TIME_MAX for future readiness. + */ +#define HFE_MAX_ABS_TIME_MSEC (EB_EXPIRE_TIME_MAX >> 2) + +typedef enum GetFieldRes { + /* common (Used by hashTypeGet* value family) */ + GETF_OK = 0, /* The field was found. */ + GETF_NOT_FOUND, /* The field was not found. */ + GETF_EXPIRED, /* Logically expired (Might be lazy deleted or not) */ + GETF_EXPIRED_HASH, /* Delete hash since retrieved field was expired and + * it was the last field in the hash. */ +} GetFieldRes; + +typedef listpackEntry CommonEntry; /* extend usage beyond lp */ + +/* hash field expiration (HFE) funcs */ +static ExpireAction onFieldExpire(eItem item, void *ctx); +static ExpireMeta* hentryGetExpireMeta(const eItem field); +static void hexpireGenericCommand(client *c, long long basetime, int unit); +static void hfieldPersist(robj *hashObj, Entry *entry); +static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen); + +/* hash dictType funcs */ +static void dictEntryDestructor(dict *d, void *entry); +static size_t hashDictMetadataBytes(dict *d); +static size_t hashDictWithExpireMetadataBytes(dict *d); +static void hashDictWithExpireOnRelease(dict *d); +static kvobj* hashTypeLookupWriteOrCreate(client *c, robj *key); + +/*----------------------------------------------------------------------------- + * Define dictType of hash + * + * - Stores fields as entry objects (field-value pairs) with optional expiration + * - Note that small hashes are represented with listpacks + * - Once expiration is set for a field, the dict instance and corresponding + * dictType are replaced with a dict containing metadata for Hash Field + * Expiration (HFE) and using dictType `entryHashDictTypeWithHFE` + * - Dict uses no_value=1 since entry contains both field and value + *----------------------------------------------------------------------------*/ +dictType entryHashDictType = { + dictSdsHash, /* lookup hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* lookup key compare */ + dictEntryDestructor, /* key destructor */ + NULL, /* val destructor (value is in entry) */ + .dictMetadataBytes = hashDictMetadataBytes, + .no_value = 1, /* entry contains both field and value */ + .keys_are_odd = 1, /* entry pointers (SDS) are always odd */ +}; + +/* Define alternative dictType of hash with hash-field expiration (HFE) support */ +dictType entryHashDictTypeWithHFE = { + dictSdsHash, /* lookup hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* lookup key compare */ + dictEntryDestructor, /* key destructor */ + NULL, /* val destructor (value is in entry) */ + .dictMetadataBytes = hashDictWithExpireMetadataBytes, + .onDictRelease = hashDictWithExpireOnRelease, + .no_value = 1, /* entry contains both field and value */ + .keys_are_odd = 1, /* entry pointers (SDS) are always odd */ +}; + +/*----------------------------------------------------------------------------- + * Hash Field Expiration (HFE) Feature + * + * Each hash instance maintains its own set of hash field expiration within its + * private ebuckets DS. In order to support HFE active expire cycle across hash + * instances, hashes with associated HFE will be also registered in a global + * ebuckets DS with expiration time value that reflects their next minimum + * time to expire (db->subexpires). The global HFE Active expiration will be + * triggered from activeExpireCycle() function and in turn will invoke "local" + * HFE Active sub-expiration for each hash instance that has expired fields. + *----------------------------------------------------------------------------*/ +EbucketsType subexpiresBucketsType = { + .onDeleteItem = NULL, + .getExpireMeta = hashGetExpireMeta, /* get ExpireMeta attached to each hash */ + .itemsAddrAreOdd = 0, /* Addresses of dict are even */ +}; + +/* htExpireMetadata - ebuckets-type for hash fields with time-Expiration. ebuckets + * instance Will be attached to each hash that has at least one field with expiry + * time. */ +EbucketsType hashFieldExpireBucketsType = { + .onDeleteItem = NULL, + .getExpireMeta = hentryGetExpireMeta, /* get ExpireMeta attached to each field */ + .itemsAddrAreOdd = 1, /* Addresses of hfield (entry/sds) are odd!! */ +}; + +/* OnFieldExpireCtx passed to OnFieldExpire() */ +typedef struct OnFieldExpireCtx { + robj *hashObj; + redisDb *db; +} OnFieldExpireCtx; + +/* The implementation of hashes by dict was modified from storing fields as sds + * strings to store "entry" objects (field-value pairs with optional expiration). + * The entry structure unifies field and value into a single allocation, with + * optional expiration metadata. This is simpler than the previous mstr approach + * and provides better memory locality. + */ + +/* Used by hpersistCommand() */ +typedef enum SetPersistRes { + HFE_PERSIST_NO_FIELD = -2, /* No such hash-field */ + HFE_PERSIST_NO_TTL = -1, /* No TTL attached to the field */ + HFE_PERSIST_OK = 1 +} SetPersistRes; + +static inline int isDictWithMetaHFE(dict *d) { + return d->type == &entryHashDictTypeWithHFE; +} + +/*----------------------------------------------------------------------------- + * setex* - Set field's expiration + * + * Setting expiration time to fields might be time-consuming and complex since + * each update of expiration time, not only updates `ebuckets` of corresponding + * hash, but also might update `ebuckets` of global HFE DS. It is required to opt + * sequence of field updates with expirartion for a given hash, such that only + * once done, the global HFE DS will get updated. + * + * To do so, follow the scheme: + * 1. Call hashTypeSetExInit() to initialize the HashTypeSetEx struct. + * 2. Call hashTypeSetEx() one time or more, for each field/expiration update. + * 3. Call hashTypeSetExDone() for notification and update of global HFE. + *----------------------------------------------------------------------------*/ + +/* Returned value of hashTypeSetEx() */ +typedef enum SetExRes { + HSETEX_OK = 1, /* Expiration time set/updated as expected */ + HSETEX_NO_FIELD = -2, /* No such hash-field */ + HSETEX_NO_CONDITION_MET = 0, /* Specified NX | XX | GT | LT condition not met */ + HSETEX_DELETED = 2, /* Field deleted because the specified time is in the past */ +} SetExRes; + +/* Used by httlGenericCommand() */ +typedef enum GetExpireTimeRes { + HFE_GET_NO_FIELD = -2, /* No such hash-field */ + HFE_GET_NO_TTL = -1, /* No TTL attached to the field */ +} GetExpireTimeRes; + +typedef enum ExpireSetCond { + HFE_NX = 1<<0, + HFE_XX = 1<<1, + HFE_GT = 1<<2, + HFE_LT = 1<<3 +} ExpireSetCond; + +/* Used by hashTypeSetEx() for setting fields or their expiry */ +typedef struct HashTypeSetEx { + + /*** config ***/ + ExpireSetCond expireSetCond; /* [XX | NX | GT | LT] */ + + /*** metadata ***/ + uint64_t minExpire; /* if uninit EB_EXPIRE_TIME_INVALID */ + redisDb *db; + robj *key, *hashObj; + uint64_t minExpireFields; /* Trace updated fields and their previous/new + * minimum expiration time. If minimum recorded + * is above minExpire of the hash, then we don't + * have to update global HFE DS */ + + /* Optionally provide client for notification */ + client *c; + const char *cmd; +} HashTypeSetEx; + +int hashTypeSetExInit(robj *key, kvobj *kvo, client *c, redisDb *db, + ExpireSetCond expireSetCond, HashTypeSetEx *ex); + +SetExRes hashTypeSetEx(robj *o, sds field, uint64_t expireAt, HashTypeSetEx *exInfo); + +void hashTypeSetExDone(HashTypeSetEx *e); + +/*----------------------------------------------------------------------------- + * Accessor functions for dictType of hash + *----------------------------------------------------------------------------*/ + +static void dictEntryDestructor(dict *d, void *entry) { + size_t usable; + size_t *alloc_size = htGetMetadataSize(d); + + /* If attached TTL to the field, then remove it from hash's private ebuckets. */ + if (entryGetExpiry(entry) != EB_EXPIRE_TIME_INVALID) { + htMetadataEx *dictExpireMeta = htGetMetadataEx(d); + ebRemove(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, entry); + } + + entryFree(entry, &usable); + *alloc_size -= usable; + + /* Don't have to update global HFE DS. It's unnecessary. Implementing this + * would introduce significant complexity and overhead for an operation that + * isn't critical. In the worst case scenario, the hash will be efficiently + * updated later by an active-expire operation, or it will be removed by the + * hash's dbGenericDelete() function. */ +} + +static size_t hashDictMetadataBytes(dict *d) { + UNUSED(d); + return sizeof(size_t); +} + +static size_t hashDictWithExpireMetadataBytes(dict *d) { + UNUSED(d); + /* expireMeta of the hash, ref to ebuckets and pointer to hash's key */ + return sizeof(htMetadataEx); +} + +static void hashDictWithExpireOnRelease(dict *d) { + /* for sure allocated with metadata. Otherwise, this func won't be registered */ + htMetadataEx *dictExpireMeta = htGetMetadataEx(d); + ebDestroy(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, NULL); +} + +/*----------------------------------------------------------------------------- + * listpackEx functions + *----------------------------------------------------------------------------*/ +/* + * If any of hash field expiration command is called on a listpack hash object + * for the first time, we convert it to OBJ_ENCODING_LISTPACK_EX encoding. + * We allocate "struct listpackEx" which holds listpack pointer and expiry + * metadata. In the listpack string, we append another TTL entry for each field + * value pair. From now on, listpack will have triplets in it: field-value-ttl. + * If TTL is not set for a field, we store 'zero' as the TTL value. 'zero' is + * encoded as two bytes in the listpack. Memory overhead of a non-existing TTL + * will be two bytes per field. + * + * Fields in the listpack will be ordered by TTL. Field with the smallest expiry + * time will be the first item. Fields without TTL will be at the end of the + * listpack. This way, it is easier/faster to find expired items. + */ + +#define HASH_LP_NO_TTL 0 + +struct listpackEx *listpackExCreate(void) { + listpackEx *lpt = zcalloc(sizeof(*lpt)); + lpt->meta.trash = 1; + lpt->lp = NULL; + return lpt; +} + +static void listpackExFree(listpackEx *lpt) { + lpFree(lpt->lp); + zfree(lpt); +} + +struct lpFingArgs { + uint64_t max_to_search; /* [in] Max number of tuples to search */ + uint64_t expire_time; /* [in] Find the tuple that has a TTL larger than expire_time */ + unsigned char *p; /* [out] First item of the tuple that has a TTL larger than expire_time */ + int expired; /* [out] Number of tuples that have TTLs less than expire_time */ + int index; /* Internally used */ + unsigned char *fptr; /* Internally used, temp ptr */ +}; + +/* Callback for lpFindCb(). Used to find number of expired fields as part of + * active expiry or when trying to find the position for the new field according + * to its expiry time.*/ +static int cbFindInListpack(const unsigned char *lp, unsigned char *p, + void *user, unsigned char *s, long long slen) +{ + (void) lp; + struct lpFingArgs *r = user; + + r->index++; + + if (r->max_to_search == 0) + return 0; /* Break the loop and return */ + + if (r->index % 3 == 1) { + r->fptr = p; /* First item of the tuple. */ + } else if (r->index % 3 == 0) { + serverAssert(!s); + + /* Third item of a tuple is expiry time */ + if (slen == HASH_LP_NO_TTL || (uint64_t) slen >= r->expire_time) { + r->p = r->fptr; + return 0; /* Break the loop and return */ + } + r->expired++; + r->max_to_search--; + } + + return 1; +} + +/* Returns number of expired fields. */ +static uint64_t listpackExExpireDryRun(const robj *o) { + serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); + + listpackEx *lpt = o->ptr; + + struct lpFingArgs r = { + .max_to_search = UINT64_MAX, + .expire_time = commandTimeSnapshot(), + }; + + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); + return r.expired; +} + +/* Returns the expiration time of the item with the nearest expiration. */ +static uint64_t listpackExGetMinExpire(robj *o) { + serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); + + long long expireAt; + unsigned char *fptr; + listpackEx *lpt = o->ptr; + + /* As fields are ordered by expire time, first field will have the smallest + * expiry time. Third element is the expiry time of the first field */ + fptr = lpSeek(lpt->lp, 2); + if (fptr != NULL) { + serverAssert(lpGetIntegerValue(fptr, &expireAt)); + + /* Check if this is a non-volatile field. */ + if (expireAt != HASH_LP_NO_TTL) + return expireAt; + } + + return EB_EXPIRE_TIME_INVALID; +} + +/* Walk over fields and delete the expired ones. */ +void listpackExExpire(redisDb *db, kvobj *kv, ExpireInfo *info) { + serverAssert(kv->encoding == OBJ_ENCODING_LISTPACK_EX); + uint64_t expired = 0, min = EB_EXPIRE_TIME_INVALID; + unsigned char *ptr; + listpackEx *lpt = kv->ptr; + + ptr = lpFirst(lpt->lp); + + sds key = kvobjGetKey(kv); + + while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { + long long val; + int64_t flen; + unsigned char intbuf[LP_INTBUF_SIZE], *fref; + + fref = lpGet(ptr, &flen, intbuf); + + ptr = lpNext(lpt->lp, ptr); + serverAssert(ptr); + ptr = lpNext(lpt->lp, ptr); + serverAssert(ptr && lpGetIntegerValue(ptr, &val)); + + /* Fields are ordered by expiry time. If we reached to a non-expired + * or a non-volatile field, we know rest is not yet expired. */ + if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) + break; + + propagateHashFieldDeletion(db, key, (char *)((fref) ? fref : intbuf), flen); + server.stat_expired_subkeys++; + + ptr = lpNext(lpt->lp, ptr); + + info->itemsExpired++; + expired++; + } + + if (expired) { + size_t oldsize = 0; + if (server.memory_tracking_per_slot) + oldsize = lpBytes(lpt->lp); + lpt->lp = lpDeleteRange(lpt->lp, 0, expired * 3); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(db, getKeySlot(key), oldsize, lpBytes(lpt->lp)); + + /* update keysizes */ + unsigned long l = lpLength(lpt->lp) / 3; + updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l + expired, l); + } + + min = hashTypeGetMinExpire(kv, 1 /*accurate*/); + info->nextExpireTime = min; +} + +static void listpackExAddInternal(robj *o, listpackEntry ent[3]) { + listpackEx *lpt = o->ptr; + + /* Shortcut, just append at the end if this is a non-volatile field. */ + if (ent[2].lval == HASH_LP_NO_TTL) { + lpt->lp = lpBatchAppend(lpt->lp, ent, 3); + return; + } + + struct lpFingArgs r = { + .max_to_search = UINT64_MAX, + .expire_time = ent[2].lval, + }; + + /* Check if there is a field with a larger TTL. */ + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); + + /* If list is empty or there is no field with a larger TTL, result will be + * NULL. Otherwise, just insert before the found item.*/ + if (r.p) + lpt->lp = lpBatchInsert(lpt->lp, r.p, LP_BEFORE, ent, 3, NULL); + else + lpt->lp = lpBatchAppend(lpt->lp, ent, 3); +} + +/* Add new field ordered by expire time. */ +void listpackExAddNew(robj *o, char *field, size_t flen, + char *value, size_t vlen, uint64_t expireAt) { + listpackEntry ent[3] = { + {.sval = (unsigned char*) field, .slen = flen}, + {.sval = (unsigned char*) value, .slen = vlen}, + {.lval = expireAt} + }; + + listpackExAddInternal(o, ent); +} + +/* If expiry time is changed, this function will place field into the correct + * position. First, it deletes the field and re-inserts to the listpack ordered + * by expiry time. */ +static void listpackExUpdateExpiry(robj *o, sds field, + unsigned char *fptr, + unsigned char *vptr, + uint64_t expire_at) { + unsigned int slen = 0; + long long val = 0; + unsigned char tmp[512] = {0}; + unsigned char *valstr; + sds tmpval = NULL; + listpackEx *lpt = o->ptr; + + /* Copy value */ + valstr = lpGetValue(vptr, &slen, &val); + if (valstr) { + /* Normally, item length in the listpack is limited by + * 'hash-max-listpack-value' config. It is unlikely, but it might be + * larger than sizeof(tmp). */ + if (slen > sizeof(tmp)) + tmpval = sdsnewlen(valstr, slen); + else + memcpy(tmp, valstr, slen); + } + + /* Delete field name, value and expiry time */ + lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); + + listpackEntry ent[3] = {{0}}; + + ent[0].sval = (unsigned char*) field; + ent[0].slen = sdslen(field); + + if (valstr) { + ent[1].sval = tmpval ? (unsigned char *) tmpval : tmp; + ent[1].slen = slen; + } else { + ent[1].lval = val; + } + ent[2].lval = expire_at; + + listpackExAddInternal(o, ent); + sdsfree(tmpval); +} + +/* Update field expire time. */ +SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, + unsigned char *fptr, unsigned char *vptr, + unsigned char *tptr, uint64_t expireAt) +{ + long long expireTime; + uint64_t prevExpire = EB_EXPIRE_TIME_INVALID; + + serverAssert(lpGetIntegerValue(tptr, &expireTime)); + + if (expireTime != HASH_LP_NO_TTL) { + prevExpire = (uint64_t) expireTime; + } + + /* Special value of EXPIRE_TIME_INVALID indicates field should be persisted.*/ + if (expireAt == EB_EXPIRE_TIME_INVALID) { + /* Return error if already there is no ttl. */ + if (prevExpire == EB_EXPIRE_TIME_INVALID) + return HSETEX_NO_CONDITION_MET; + listpackExUpdateExpiry(ex->hashObj, field, fptr, vptr, HASH_LP_NO_TTL); + return HSETEX_OK; + } + + if (prevExpire == EB_EXPIRE_TIME_INVALID) { + /* For fields without expiry, LT condition is considered valid */ + if (ex->expireSetCond & (HFE_XX | HFE_GT)) + return HSETEX_NO_CONDITION_MET; + } else { + if (((ex->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) || + ((ex->expireSetCond == HFE_LT) && (prevExpire <= expireAt)) || + (ex->expireSetCond == HFE_NX) ) + return HSETEX_NO_CONDITION_MET; + + /* Track of minimum expiration time (only later update global HFE DS) */ + if (ex->minExpireFields > prevExpire) + ex->minExpireFields = prevExpire; + } + + /* If expired, then delete the field and propagate the deletion. + * If replica, continue like the field is valid */ + if (unlikely(checkAlreadyExpired(expireAt))) { + propagateHashFieldDeletion(ex->db, ex->key->ptr, field, sdslen(field)); + hashTypeDelete(ex->hashObj, field); + server.stat_expired_subkeys++; + return HSETEX_DELETED; + } + + if (ex->minExpireFields > expireAt) + ex->minExpireFields = expireAt; + + listpackExUpdateExpiry(ex->hashObj, field, fptr, vptr, expireAt); + return HSETEX_OK; +} + +/* Returns 1 if expired */ +int hashTypeIsExpired(const robj *o, uint64_t expireAt) { + if (server.allow_access_expired) return 0; + + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + if (expireAt == HASH_LP_NO_TTL) + return 0; + } else if (o->encoding == OBJ_ENCODING_HT) { + if (expireAt == EB_EXPIRE_TIME_INVALID) + return 0; + } else { + serverPanic("Unknown encoding: %d", o->encoding); + } + + return (mstime_t) expireAt < commandTimeSnapshot(); +} + +/* Returns listpack pointer of the object. */ +unsigned char *hashTypeListpackGetLp(robj *o) { + if (o->encoding == OBJ_ENCODING_LISTPACK) + return o->ptr; + else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) + return ((listpackEx*)o->ptr)->lp; + + serverPanic("Unknown encoding: %d", o->encoding); +} + +/*----------------------------------------------------------------------------- + * Hash type API + *----------------------------------------------------------------------------*/ + +/* Check the length of a number of objects to see if we need to convert a + * listpack to a real hash. Note that we only check string encoded objects + * as their string length can be queried in constant time. */ +void hashTypeTryConversion(redisDb *db, kvobj *o, robj **argv, int start, int end) { + int i; + size_t sum = 0; + + if (o->encoding != OBJ_ENCODING_LISTPACK && o->encoding != OBJ_ENCODING_LISTPACK_EX) + return; + + /* We guess that most of the values in the input are unique, so + * if there are enough arguments we create a pre-sized hash, which + * might over allocate memory if there are duplicates. */ + size_t new_fields = (end - start + 1) / 2; + if (new_fields > server.hash_max_listpack_entries) { + hashTypeConvert(db, o, OBJ_ENCODING_HT); + dictExpand(o->ptr, new_fields); + return; + } + + for (i = start; i <= end; i++) { + if (!sdsEncodedObject(argv[i])) + continue; + size_t len = sdslen(argv[i]->ptr); + if (len > server.hash_max_listpack_value) { + hashTypeConvert(db, o, OBJ_ENCODING_HT); + return; + } + sum += len; + } + if (!lpSafeToAdd(hashTypeListpackGetLp(o), sum)) { + hashTypeConvert(db, o, OBJ_ENCODING_HT); + } +} + +/* Get the value from a listpack encoded hash, identified by field. */ +GetFieldRes hashTypeGetFromListpack(robj *o, sds field, + unsigned char **vstr, + unsigned int *vlen, + long long *vll, + uint64_t *expiredAt) +{ + *expiredAt = EB_EXPIRE_TIME_INVALID; + unsigned char *zl, *fptr = NULL, *vptr = NULL; + + if (o->encoding == OBJ_ENCODING_LISTPACK) { + zl = o->ptr; + fptr = lpFirst(zl); + if (fptr != NULL) { + fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1); + if (fptr != NULL) { + /* Grab pointer to the value (fptr points to the field) */ + vptr = lpNext(zl, fptr); + serverAssert(vptr != NULL); + } + } + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + long long expire; + unsigned char *h; + listpackEx *lpt = o->ptr; + + fptr = lpFirst(lpt->lp); + if (fptr != NULL) { + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); + if (fptr != NULL) { + vptr = lpNext(lpt->lp, fptr); + serverAssert(vptr != NULL); + + h = lpNext(lpt->lp, vptr); + serverAssert(h && lpGetIntegerValue(h, &expire)); + if (expire != HASH_LP_NO_TTL) + *expiredAt = expire; + } + } + } else { + serverPanic("Unknown hash encoding: %d", o->encoding); + } + + if (vptr != NULL) { + *vstr = lpGetValue(vptr, vlen, vll); + return GETF_OK; + } + + return GETF_NOT_FOUND; +} + +/* Get the value from a hash table encoded hash, identified by field. + * Returns NULL when the field cannot be found, otherwise the SDS value + * is returned. */ +GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value, uint64_t *expiredAt) { + dictEntry *de; + + *expiredAt = EB_EXPIRE_TIME_INVALID; + + serverAssert(o->encoding == OBJ_ENCODING_HT); + + de = dictFind(o->ptr, field); + + if (de == NULL) + return GETF_NOT_FOUND; + + Entry *entry = dictGetKey(de); + *expiredAt = entryGetExpiry(entry); + *value = entryGetValue(entry); + return GETF_OK; +} + +/* Higher level function of hashTypeGet*() that returns the hash value + * associated with the specified field. + * Arguments: + * hfeFlags - Lookup for HFE_LAZY_* flags + * + * Returned: + * GetFieldRes - Result of get operation + * vstr, vlen - if string, ref in either *vstr and *vlen if it's + * returned in string form, + * vll - or stored in *vll if it's returned as a number. + * If *vll is populated *vstr is set to NULL, so the caller can + * always check the function return by checking the return value + * for GETF_OK and checking if vll (or vstr) is NULL. + * expiredAt - if the field has an expiration time, it will be set to the expiration + * time of the field. Otherwise, will be set to EB_EXPIRE_TIME_INVALID. + */ +GetFieldRes hashTypeGetValue(redisDb *db, kvobj *o, sds field, unsigned char **vstr, + unsigned int *vlen, long long *vll, + int hfeFlags, uint64_t *expiredAt) +{ + sds key = kvobjGetKey(o); + GetFieldRes res; + uint64_t dummy; + size_t oldsize = 0; + if (expiredAt == NULL) expiredAt = &dummy; + if (o->encoding == OBJ_ENCODING_LISTPACK || + o->encoding == OBJ_ENCODING_LISTPACK_EX) { + *vstr = NULL; + res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, expiredAt); + + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; + + } else if (o->encoding == OBJ_ENCODING_HT) { + sds value = NULL; + if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) + oldsize = hashTypeAllocSize(o); + res = hashTypeGetFromHashTable(o, field, &value, expiredAt); + if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) + updateSlotAllocSize(db, getKeySlot(key), oldsize, hashTypeAllocSize(o)); + + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; + + *vstr = (unsigned char*) value; + *vlen = sdslen(value); + } else { + serverPanic("Unknown hash encoding"); + } + + if ((server.allow_access_expired) || + (*expiredAt >= (uint64_t) commandTimeSnapshot()) || + (hfeFlags & HFE_LAZY_ACCESS_EXPIRED)) + return GETF_OK; + + if (server.masterhost || server.cluster_enabled) { + /* If CLIENT_MASTER, assume valid as long as it didn't get delete. + * + * In cluster mode, we also assume valid if we are importing data + * from the source, to avoid deleting fields that are still in use. + * We create a fake master client for data import, which can be + * identified using the CLIENT_MASTER flag. */ + if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) + return GETF_OK; + + /* For replica, if user client, then act as if expired, but don't delete! */ + if (server.masterhost) return GETF_EXPIRED; + } + + if ((server.loading) || + (hfeFlags & HFE_LAZY_AVOID_FIELD_DEL) || + (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) + return GETF_EXPIRED; + + /* delete the field and propagate the deletion */ + if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) + oldsize = hashTypeAllocSize(o); + serverAssert(hashTypeDelete(o, field) == 1); + if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) + updateSlotAllocSize(db, getKeySlot(key), oldsize, hashTypeAllocSize(o)); + propagateHashFieldDeletion(db, key, field, sdslen(field)); + server.stat_expired_subkeys++; + + if (!(hfeFlags & HFE_LAZY_NO_UPDATE_KEYSIZES)) { + uint64_t l = hashTypeLength(o, 0); + updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l+1, l); + } + + /* If the field is the last one in the hash, then the hash will be deleted */ + res = GETF_EXPIRED; + robj *keyObj = createStringObject(key, sdslen(key)); + if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION)) + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", keyObj, db->id); + if ((hashTypeLength(o, 0) == 0) && (!(hfeFlags & HFE_LAZY_AVOID_HASH_DEL))) { + if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION)) + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id); + dbDelete(db,keyObj); + o = NULL; + res = GETF_EXPIRED_HASH; + } + keyModified(NULL, db, keyObj, o, !(hfeFlags & HFE_LAZY_NO_SIGNAL)); + decrRefCount(keyObj); + return res; +} + +/* Like hashTypeGetValue() but returns a Redis object, which is useful for + * interaction with the hash type outside t_hash.c. + * The function returns NULL if the field is not found in the hash. Otherwise + * a newly allocated string object with the value is returned. + * + * hfeFlags - Lookup HFE_LAZY_* flags + * isHashDeleted - If attempted to access expired field and it's the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. + * val - If the field is found, then val will be set to the value object. + * expireTime - If the field exists (`GETF_OK`) then expireTime will be set to + * the expiration time of the field. Otherwise, it will be set to 0. + * + * Returns 1 if the field exists, and 0 when it doesn't. + */ +int hashTypeGetValueObject(redisDb *db, kvobj *o, sds field, int hfeFlags, + robj **val, uint64_t *expireTime, int *isHashDeleted) { + unsigned char *vstr; + unsigned int vlen; + long long vll; + + if (isHashDeleted) *isHashDeleted = 0; + if (val) *val = NULL; + GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll, + hfeFlags, expireTime); + + if (res == GETF_OK) { + /* expireTime set to 0 if the field has no expiration time */ + if (expireTime && (*expireTime == EB_EXPIRE_TIME_INVALID)) + *expireTime = 0; + + /* If expected to return the value, then create a new object */ + if (val) { + if (vstr) *val = createStringObject((char *) vstr, vlen); + else *val = createStringObjectFromLongLong(vll); + } + return 1; + } + + if ((res == GETF_EXPIRED_HASH) && (isHashDeleted)) + *isHashDeleted = 1; + + /* GETF_EXPIRED_HASH, GETF_EXPIRED, GETF_NOT_FOUND */ + return 0; +} + +/* Test if the specified field exists in the given hash. If the field is + * expired (HFE), then it will be lazy deleted unless HFE_LAZY_AVOID_FIELD_DEL + * hfeFlags is set. + * + * hfeFlags - Lookup HFE_LAZY_* flags + * isHashDeleted - If attempted to access expired field and it is the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. + * + * Returns 1 if the field exists, and 0 when it doesn't. + */ +int hashTypeExists(redisDb *db, kvobj *o, sds field, int hfeFlags, int *isHashDeleted) { + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; + + GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll, + hfeFlags, NULL); + if (isHashDeleted) + *isHashDeleted = (res == GETF_EXPIRED_HASH) ? 1 : 0; + return (res == GETF_OK) ? 1 : 0; +} + +/* Add a new field, overwrite the old with the new value if it already exists. + * Return 0 on insert and 1 on update. + * + * By default, the key and value SDS strings are copied if needed, so the + * caller retains ownership of the strings passed. However this behavior + * can be effected by passing appropriate flags (possibly bitwise OR-ed): + * + * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. + * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. + * HASH_SET_KEEP_TTL -- keep original TTL if field already exists + * + * When the flags are used the caller does not need to release the passed + * SDS string(s). It's up to the function to use the string to create a new + * entry or to free the SDS string before returning to the caller. + * + * HASH_SET_COPY corresponds to no flags passed, and means the default + * semantics of copying the values if needed. + * + */ +#define HASH_SET_TAKE_FIELD (1<<0) +#define HASH_SET_TAKE_VALUE (1<<1) +#define HASH_SET_KEEP_TTL (1<<2) + +static_assert(HASH_SET_TAKE_VALUE == ENTRY_TAKE_VALUE, "ENTRY_TAKE_VALUE must match HASH_SET_TAKE_VALUE"); + +int hashTypeSet(redisDb *db, kvobj *o, sds field, sds value, int flags) { + int update = 0; + + /* Check if the field is too long for listpack, and convert before adding the item. + * This is needed for HINCRBY* case since in other commands this is handled early by + * hashTypeTryConversion, so this check will be a NOP. */ + if (o->encoding == OBJ_ENCODING_LISTPACK || + o->encoding == OBJ_ENCODING_LISTPACK_EX) { + if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value) + hashTypeConvert(db, o, OBJ_ENCODING_HT); + } + + if (o->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl, *fptr, *vptr; + + zl = o->ptr; + fptr = lpFirst(zl); + if (fptr != NULL) { + fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1); + if (fptr != NULL) { + /* Grab pointer to the value (fptr points to the field) */ + vptr = lpNext(zl, fptr); + serverAssert(vptr != NULL); + + /* Replace value */ + zl = lpReplace(zl, &vptr, (unsigned char*)value, sdslen(value)); + update = 1; + } + } + + if (!update) { + listpackEntry entries[2] = { + {.sval = (unsigned char*) field, .slen = sdslen(field)}, + {.sval = (unsigned char*) value, .slen = sdslen(value)}, + }; + + /* Push new field/value pair onto the tail of the listpack */ + zl = lpBatchAppend(zl, entries, 2); + } + o->ptr = zl; + + /* Check if the listpack needs to be converted to a hash table */ + if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) + hashTypeConvert(db, o, OBJ_ENCODING_HT); + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; + listpackEx *lpt = o->ptr; + long long expireTime = HASH_LP_NO_TTL; + + fptr = lpFirst(lpt->lp); + if (fptr != NULL) { + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); + if (fptr != NULL) { + /* Grab pointer to the value (fptr points to the field) */ + vptr = lpNext(lpt->lp, fptr); + serverAssert(vptr != NULL); + + /* Replace value */ + lpt->lp = lpReplace(lpt->lp, &vptr, (unsigned char *) value, sdslen(value)); + update = 1; + + fptr = lpPrev(lpt->lp, vptr); + serverAssert(fptr != NULL); + + tptr = lpNext(lpt->lp, vptr); + serverAssert(tptr && lpGetIntegerValue(tptr, &expireTime)); + + if (flags & HASH_SET_KEEP_TTL) { + /* keep old field along with TTL */ + } else if (expireTime != HASH_LP_NO_TTL) { + /* re-insert field and override TTL */ + listpackExUpdateExpiry(o, field, fptr, vptr, HASH_LP_NO_TTL); + } + } + } + + if (!update) + listpackExAddNew(o, field, sdslen(field), value, sdslen(value), + HASH_LP_NO_TTL); + + /* Check if the listpack needs to be converted to a hash table */ + if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) + hashTypeConvert(db, o, OBJ_ENCODING_HT); + + } else if (o->encoding == OBJ_ENCODING_HT) { + dict *ht = o->ptr; + /* check if field already exists */ + dictEntryLink bucket, link = dictFindLink(ht, field, &bucket); + size_t *alloc_size = htGetMetadataSize(ht); + + /* take ownership of value if requested */ + uint32_t newEntryFlags = flags & HASH_SET_TAKE_VALUE; + flags &= ~HASH_SET_TAKE_VALUE; + + if (link == NULL) { + /* Create entry and transfer value ownership if possible */ + size_t usable; + Entry *newEntry = entryCreate(field, value, newEntryFlags, &usable); + + dictSetKeyAtLink(ht, newEntry, &bucket, 1); + *alloc_size += usable; + } else { + /* Existing field - update value in entry */ + Entry *oldEntry = dictGetKey(*link); + + /* Check if old entry has expiration before potentially freeing it */ + uint64_t oldExpireAt = entryGetExpiry(oldEntry); + uint64_t newExpireAt = EB_EXPIRE_TIME_INVALID; + + /* If attached TTL to the old field, then remove it from hash's + * private ebuckets. We do this before updating the value because + * the entry might be reallocated and freed. */ + if (oldExpireAt != EB_EXPIRE_TIME_INVALID) { + hfieldPersist(o, oldEntry); + if (flags & HASH_SET_KEEP_TTL) { + newExpireAt = oldExpireAt; + newEntryFlags |= ENTRY_HAS_EXPIRY; + } + } + + ssize_t usableDiff; + Entry *newEntry = entryUpdate(oldEntry, value, newEntryFlags, &usableDiff); + + /* If entry was reallocated, update the dict key */ + if (newEntry != oldEntry) { + /* entryUpdate already freed the old entry if needed */ + /* Update the dict to point to the new entry using dictSetKeyAtLink (no_value=1) */ + dictSetKeyAtLink(ht, newEntry, &link, 0); + } + + /* If keeping TTL, add the (potentially new) entry back to ebuckets */ + if (newExpireAt != EB_EXPIRE_TIME_INVALID) { + dict *d = o->ptr; + htMetadataEx *dictExpireMeta = htGetMetadataEx(d); + serverAssert(dictExpireMeta->expireMeta.trash == 0); + ebAdd(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, newEntry, newExpireAt); + } + + *alloc_size += usableDiff; + update = 1; + } + } else { + serverPanic("Unknown hash encoding"); + } + + /* Free SDS strings we did not referenced elsewhere if the flags + * want this function to be responsible. */ + if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field); + if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value); + return update; +} + +SetExRes hashTypeSetExpiryHT(HashTypeSetEx *exInfo, sds field, uint64_t expireAt) { + dict *ht = exInfo->hashObj->ptr; + dictEntryLink link = NULL; + Entry *entryNew = NULL; + + link = dictFindLink(ht, field, NULL); + if (link == NULL) + return HSETEX_NO_FIELD; + + dictEntry *existingEntry = *link; + Entry *oldEntry = dictGetKey(existingEntry); + /* Special value of EXPIRE_TIME_INVALID indicates field should be persisted.*/ + if (expireAt == EB_EXPIRE_TIME_INVALID) { + /* Return error if already there is no ttl. */ + if (entryGetExpiry(oldEntry) == EB_EXPIRE_TIME_INVALID) + return HSETEX_NO_CONDITION_MET; + + hfieldPersist(exInfo->hashObj, oldEntry); + return HSETEX_OK; + } + + /* If field doesn't have expiry metadata attached */ + if (!entryHasExpiry(oldEntry)) { + size_t *alloc_size = htGetMetadataSize(ht); + + /* For fields without expiry, LT condition is considered valid */ + if (exInfo->expireSetCond & (HFE_XX | HFE_GT)) + return HSETEX_NO_CONDITION_MET; + + ssize_t usableDiff; + entryNew = entryUpdate(oldEntry, NULL, ENTRY_HAS_EXPIRY, &usableDiff); + *alloc_size += usableDiff; + } else { /* field has ExpireMeta struct attached */ + uint64_t prevExpire = entryGetExpiry(oldEntry); + + /* If field has valid expiration time, then check GT|LT|NX */ + if (prevExpire != EB_EXPIRE_TIME_INVALID) { + if (((exInfo->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) || + ((exInfo->expireSetCond == HFE_LT) && (prevExpire <= expireAt)) || + (exInfo->expireSetCond == HFE_NX) ) + return HSETEX_NO_CONDITION_MET; + + /* If expiry time is the same, then nothing to do */ + if (prevExpire == expireAt) + return HSETEX_OK; + + /* remove old expiry time from hash's private ebuckets */ + htMetadataEx *dm = htGetMetadataEx(ht); + ebRemove(&dm->hfe, &hashFieldExpireBucketsType, oldEntry); + + /* Track of minimum expiration time (only later update global HFE DS) */ + if (exInfo->minExpireFields > prevExpire) + exInfo->minExpireFields = prevExpire; + + } else { + /* field has invalid expiry. No need to ebRemove() */ + + /* Check XX|LT|GT */ + if (exInfo->expireSetCond & (HFE_XX | HFE_GT)) + return HSETEX_NO_CONDITION_MET; + } + + /* Reuse hfOld as hfNew and rewrite its expiry with ebAdd() */ + entryNew = oldEntry; + } + + dictSetKeyAtLink(ht, entryNew, &link, 0); /* newItem=0 for updating existing entry */ + + + /* If expired, then delete the field and propagate the deletion. + * If replica, continue like the field is valid */ + if (unlikely(checkAlreadyExpired(expireAt))) { + /* replicas should not initiate deletion of fields */ + propagateHashFieldDeletion(exInfo->db, exInfo->key->ptr, field, sdslen(field)); + hashTypeDelete(exInfo->hashObj, field); + server.stat_expired_subkeys++; + return HSETEX_DELETED; + } + + if (exInfo->minExpireFields > expireAt) + exInfo->minExpireFields = expireAt; + + htMetadataEx *dm = htGetMetadataEx(ht); + ebAdd(&dm->hfe, &hashFieldExpireBucketsType, entryNew, expireAt); + return HSETEX_OK; +} + +/* + * Set field expiration + * + * Take care to call first hashTypeSetExInit() and then call this function. + * Finally, call hashTypeSetExDone() to notify and update global HFE DS. + * + * Special value of EB_EXPIRE_TIME_INVALID for 'expireAt' argument will persist + * the field. + */ +SetExRes hashTypeSetEx(robj *o, sds field, uint64_t expireAt, HashTypeSetEx *exInfo) { + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; + listpackEx *lpt = o->ptr; + + fptr = lpFirst(lpt->lp); + if (fptr) + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); + + if (!fptr) + return HSETEX_NO_FIELD; + + /* Grab pointer to the value (fptr points to the field) */ + vptr = lpNext(lpt->lp, fptr); + serverAssert(vptr != NULL); + + tptr = lpNext(lpt->lp, vptr); + serverAssert(tptr); + + /* update TTL */ + return hashTypeSetExpiryListpack(exInfo, field, fptr, vptr, tptr, expireAt); + } else if (o->encoding == OBJ_ENCODING_HT) { + /* If needed to set the field along with expiry */ + return hashTypeSetExpiryHT(exInfo, field, expireAt); + } else { + serverPanic("Unknown hash encoding"); + } + + return HSETEX_OK; /* never reach here */ +} + +void initDictExpireMetadata(robj *o) { + dict *ht = o->ptr; + + htMetadataEx *m = htGetMetadataEx(ht); + m->hfe = ebCreate(); /* Allocate HFE DS */ + m->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ +} + +/* Init HashTypeSetEx struct before calling hashTypeSetEx() */ +int hashTypeSetExInit(robj *key, kvobj *o, client *c, redisDb *db, + ExpireSetCond expireSetCond, HashTypeSetEx *ex) +{ + dict *ht = o->ptr; + ex->expireSetCond = expireSetCond; + ex->minExpire = EB_EXPIRE_TIME_INVALID; + ex->c = c; + ex->db = db; + ex->key = key; + ex->hashObj = o; + ex->minExpireFields = EB_EXPIRE_TIME_INVALID; + + /* Take care that HASH support expiration */ + if (o->encoding == OBJ_ENCODING_LISTPACK) { + hashTypeConvert(c->db, o, OBJ_ENCODING_LISTPACK_EX); + } else if (o->encoding == OBJ_ENCODING_HT) { + /* Take care dict has HFE metadata */ + if (!isDictWithMetaHFE(ht)) { + /* Realloc (only header of dict) with metadata for hash-field expiration */ + dictTypeAddMeta(&ht, &entryHashDictTypeWithHFE); + htMetadataEx *m = htGetMetadataEx(ht); + o->ptr = ht; + + /* Find the key in the keyspace. Need to keep reference to the key for + * notifications or even removal of the hash */ + + /* Fillup dict HFE metadata */ + m->hfe = ebCreate(); /* Allocate HFE DS */ + m->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ + } + } + + /* Read minExpire from attached ExpireMeta to the hash */ + ex->minExpire = hashTypeGetMinExpire(o, 0); + return C_OK; +} + +/* + * After calling hashTypeSetEx() for setting fields or their expiry, call this + * function to update global HFE DS. + */ +void hashTypeSetExDone(HashTypeSetEx *ex) { + + if (hashTypeLength(ex->hashObj, 0) == 0) + return; + + /* If minimum HFE of the hash is smaller than expiration time of the + * specified fields in the command as well as it is smaller or equal + * than expiration time provided in the command, then the minimum + * HFE of the hash won't change following this command. */ + if ((ex->minExpire < ex->minExpireFields)) + return; + + /* Retrieve new expired time. It might have changed. */ + uint64_t newMinExpire = hashTypeGetMinExpire(ex->hashObj, 1 /*accurate*/); + + /* Calculate the diff between old minExpire and newMinExpire. If it is + * only few seconds, then don't have to update global HFE DS. At the worst + * case fields of hash will be active-expired up to few seconds later. + * + * In any case, active-expire operation will know to update global + * HFE DS more efficiently than here for a single item. + */ + uint64_t diff = (ex->minExpire > newMinExpire) ? + (ex->minExpire - newMinExpire) : (newMinExpire - ex->minExpire); + if (diff < HASH_NEW_EXPIRE_DIFF_THRESHOLD) return; + + int slot = getKeySlot(ex->key->ptr); + if (ex->minExpire != EB_EXPIRE_TIME_INVALID) { + if (newMinExpire != EB_EXPIRE_TIME_INVALID) + estoreUpdate(ex->db->subexpires, slot, ex->hashObj, newMinExpire); + else + estoreRemove(ex->db->subexpires, slot, ex->hashObj); + } else { + if (newMinExpire != EB_EXPIRE_TIME_INVALID) + estoreAdd(ex->db->subexpires, slot, ex->hashObj, newMinExpire); + } +} + +/* Delete an element from a hash. + * + * Return 1 on deleted and 0 on not found. + * field - sds field name to delete */ +int hashTypeDelete(robj *o, void *field) { + int deleted = 0; + int fieldLen = sdslen((sds)field); + + if (o->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl, *fptr; + + zl = o->ptr; + fptr = lpFirst(zl); + if (fptr != NULL) { + fptr = lpFind(zl, fptr, (unsigned char*)field, fieldLen, 1); + if (fptr != NULL) { + /* Delete both of the key and the value. */ + zl = lpDeleteRangeWithEntry(zl,&fptr,2); + o->ptr = zl; + deleted = 1; + } + } + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + unsigned char *fptr; + listpackEx *lpt = o->ptr; + + fptr = lpFirst(lpt->lp); + if (fptr != NULL) { + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, fieldLen, 2); + if (fptr != NULL) { + /* Delete field, value and ttl */ + lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); + deleted = 1; + } + } + } else if (o->encoding == OBJ_ENCODING_HT) { + /* dictDelete() will call dictEntryDestructor() */ + if (dictDelete((dict*)o->ptr, field) == C_OK) { + deleted = 1; + } + } else { + serverPanic("Unknown hash encoding"); + } + return deleted; +} + +/* Return the number of elements in a hash. + * + * Note, subtractExpiredFields=1 might be pricy in case there are many HFEs + */ +unsigned long hashTypeLength(const robj *o, int subtractExpiredFields) { + unsigned long length = ULONG_MAX; + /* If expired field access is allowed, don't subtract expired fields from the count. */ + if (server.allow_access_expired) + subtractExpiredFields = 0; + + if (o->encoding == OBJ_ENCODING_LISTPACK) { + length = lpLength(o->ptr) / 2; + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = o->ptr; + length = lpLength(lpt->lp) / 3; + + if (subtractExpiredFields && lpt->meta.trash == 0) + length -= listpackExExpireDryRun(o); + } else if (o->encoding == OBJ_ENCODING_HT) { + uint64_t expiredItems = 0; + dict *d = (dict*)o->ptr; + if (subtractExpiredFields && isDictWithMetaHFE(d)) { + htMetadataEx *meta = htGetMetadataEx(d); + /* If dict registered in global HFE DS */ + if (meta->expireMeta.trash == 0) + expiredItems = ebExpireDryRun(meta->hfe, + &hashFieldExpireBucketsType, + commandTimeSnapshot()); + } + length = dictSize(d) - expiredItems; + } else { + serverPanic("Unknown hash encoding"); + } + return length; +} + +size_t hashTypeAllocSize(const robj *o) { + serverAssertWithInfo(NULL,o,o->type == OBJ_HASH); + size_t size = 0; + if (o->encoding == OBJ_ENCODING_LISTPACK) { + size = lpBytes(o->ptr); + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = o->ptr; + size = sizeof(listpackEx) + lpBytes(lpt->lp); + } else if (o->encoding == OBJ_ENCODING_HT) { + dict *d = o->ptr; + size += sizeof(dict) + dictMemUsage(d) + *htGetMetadataSize(d); + } else { + serverPanic("Unknown hash encoding"); + } + return size; +} + +void hashTypeInitIterator(hashTypeIterator *hi, robj *subject) { + hi->subject = subject; + hi->encoding = subject->encoding; + + if (hi->encoding == OBJ_ENCODING_LISTPACK || + hi->encoding == OBJ_ENCODING_LISTPACK_EX) + { + hi->fptr = NULL; + hi->vptr = NULL; + hi->tptr = NULL; + hi->expire_time = EB_EXPIRE_TIME_INVALID; + } else if (hi->encoding == OBJ_ENCODING_HT) { + dictInitIterator(&hi->di, subject->ptr); + } else { + serverPanic("Unknown hash encoding"); + } +} + +void hashTypeResetIterator(hashTypeIterator *hi) { + if (hi->encoding == OBJ_ENCODING_HT) + dictResetIterator(&hi->di); +} + +/* Move to the next entry in the hash. Return C_OK when the next entry + * could be found and C_ERR when the iterator reaches the end. */ +int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { + /* If expired field access is allowed, don't skip expired fields during iteration */ + if (server.allow_access_expired) + skipExpiredFields = 0; + + hi->expire_time = EB_EXPIRE_TIME_INVALID; + if (hi->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl; + unsigned char *fptr, *vptr; + + zl = hi->subject->ptr; + fptr = hi->fptr; + vptr = hi->vptr; + + if (fptr == NULL) { + /* Initialize cursor */ + serverAssert(vptr == NULL); + fptr = lpFirst(zl); + } else { + /* Advance cursor */ + serverAssert(vptr != NULL); + fptr = lpNext(zl, vptr); + } + if (fptr == NULL) return C_ERR; + + /* Grab pointer to the value (fptr points to the field) */ + vptr = lpNext(zl, fptr); + serverAssert(vptr != NULL); + + /* fptr, vptr now point to the first or next pair */ + hi->fptr = fptr; + hi->vptr = vptr; + } else if (hi->encoding == OBJ_ENCODING_LISTPACK_EX) { + long long expire_time; + unsigned char *zl = hashTypeListpackGetLp(hi->subject); + unsigned char *fptr, *vptr, *tptr; + + fptr = hi->fptr; + vptr = hi->vptr; + tptr = hi->tptr; + + if (fptr == NULL) { + /* Initialize cursor */ + serverAssert(vptr == NULL); + fptr = lpFirst(zl); + } else { + /* Advance cursor */ + serverAssert(tptr != NULL); + fptr = lpNext(zl, tptr); + } + if (fptr == NULL) return C_ERR; + + while (fptr != NULL) { + /* Grab pointer to the value (fptr points to the field) */ + vptr = lpNext(zl, fptr); + serverAssert(vptr != NULL); + + tptr = lpNext(zl, vptr); + serverAssert(tptr && lpGetIntegerValue(tptr, &expire_time)); + + if (!skipExpiredFields || !hashTypeIsExpired(hi->subject, expire_time)) + break; + + fptr = lpNext(zl, tptr); + } + if (fptr == NULL) return C_ERR; + + /* fptr, vptr now point to the first or next pair */ + hi->fptr = fptr; + hi->vptr = vptr; + hi->tptr = tptr; + hi->expire_time = (expire_time != HASH_LP_NO_TTL) ? (uint64_t) expire_time : EB_EXPIRE_TIME_INVALID; + } else if (hi->encoding == OBJ_ENCODING_HT) { + + while ((hi->de = dictNext(&hi->di)) != NULL) { + Entry *e = dictGetKey(hi->de); + hi->expire_time = entryGetExpiry(e); + /* this condition still valid if expire_time equals EB_EXPIRE_TIME_INVALID */ + if (skipExpiredFields && ((mstime_t)hi->expire_time < commandTimeSnapshot())) + continue; + return C_OK; + } + return C_ERR; + } else { + serverPanic("Unknown hash encoding"); + } + return C_OK; +} + +/* Get the field or value at iterator cursor, for an iterator on a hash value + * encoded as a listpack. Prototype is similar to `hashTypeGetFromListpack`. */ +void hashTypeCurrentFromListpack(hashTypeIterator *hi, int what, + unsigned char **vstr, + unsigned int *vlen, + long long *vll, + uint64_t *expireTime) +{ + serverAssert(hi->encoding == OBJ_ENCODING_LISTPACK || + hi->encoding == OBJ_ENCODING_LISTPACK_EX); + + if (what & OBJ_HASH_KEY) { + *vstr = lpGetValue(hi->fptr, vlen, vll); + } else { + *vstr = lpGetValue(hi->vptr, vlen, vll); + } + + if (expireTime) + *expireTime = hi->expire_time; +} + +/* Get the field or value at iterator cursor, for an iterator on a hash value + * encoded as a hash table. Prototype is similar to + * `hashTypeGetFromHashTable`. + * + * expireTime - If parameter is not null, then the function will return the expire + * time of the field. If expiry not set, return EB_EXPIRE_TIME_INVALID + */ +void hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, char **str, size_t *len, uint64_t *expireTime) { + serverAssert(hi->encoding == OBJ_ENCODING_HT); + Entry *e = dictGetKey(hi->de); + + if (what & OBJ_HASH_KEY) { + sds field = entryGetField(e); + *str = field; + *len = sdslen(field); + } else { + sds val = entryGetValue(e); + *str = val; + *len = sdslen(val); + } + + if (expireTime) + *expireTime = hi->expire_time; +} + +/* Higher level function of hashTypeCurrent*() that returns the hash value + * at current iterator position. + * + * The returned element is returned by reference in either *vstr and *vlen if + * it's returned in string form, or stored in *vll if it's returned as + * a number. + * + * If *vll is populated *vstr is set to NULL, so the caller + * can always check the function return by checking the return value + * type checking if vstr == NULL. */ +void hashTypeCurrentObject(hashTypeIterator *hi, + int what, + unsigned char **vstr, + unsigned int *vlen, + long long *vll, + uint64_t *expireTime) +{ + if (hi->encoding == OBJ_ENCODING_LISTPACK || + hi->encoding == OBJ_ENCODING_LISTPACK_EX) + { + *vstr = NULL; + hashTypeCurrentFromListpack(hi, what, vstr, vlen, vll, expireTime); + } else if (hi->encoding == OBJ_ENCODING_HT) { + char *ele; + size_t eleLen; + hashTypeCurrentFromHashTable(hi, what, &ele, &eleLen, expireTime); + *vstr = (unsigned char*) ele; + *vlen = eleLen; + } else { + serverPanic("Unknown hash encoding"); + } +} + +/* Return the key or value at the current iterator position as a new + * SDS string. */ +sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what) { + unsigned char *vstr; + unsigned int vlen; + long long vll; + + hashTypeCurrentObject(hi,what,&vstr,&vlen,&vll, NULL); + if (vstr) return sdsnewlen(vstr,vlen); + return sdsfromlonglong(vll); +} + +/* Return the key at the current iterator position as a new entry. */ +Entry *hashTypeCurrentObjectNewEntry(hashTypeIterator *hi, size_t *usable) { + char fieldBuf[LONG_STR_SIZE], valueBuf[LONG_STR_SIZE]; + unsigned char *fieldStr, *valueStr; + unsigned int fieldLen, valueLen; + long long fieldLl, valueLl; + Entry *entry; + + /* Get field */ + hashTypeCurrentObject(hi, OBJ_HASH_KEY, &fieldStr, &fieldLen, &fieldLl, NULL); + if (!fieldStr) { + fieldLen = ll2string(fieldBuf, sizeof(fieldBuf), fieldLl); + fieldStr = (unsigned char *) fieldBuf; + } + sds field = sdsnewlen(fieldStr, fieldLen); + + /* Get value */ + hashTypeCurrentObject(hi, OBJ_HASH_VALUE, &valueStr, &valueLen, &valueLl, NULL); + if (!valueStr) { + valueLen = ll2string(valueBuf, sizeof(valueBuf), valueLl); + valueStr = (unsigned char *) valueBuf; + } + sds value = sdsnewlen(valueStr, valueLen); + int hasExpiry = (hi->expire_time != EB_EXPIRE_TIME_INVALID); + + /* Create entry with field and value, using iterator's expire_time */ + uint32_t entryFlags = ENTRY_TAKE_VALUE | ((hasExpiry) ? ENTRY_HAS_EXPIRY : 0); + entry = entryCreate(field, value, entryFlags, usable); + sdsfree(field); /* entryCreate() doesn't take ownership of field */ + + return entry; +} + +static kvobj *hashTypeLookupWriteOrCreate(client *c, robj *key) { + dictEntryLink link; + kvobj *kv = lookupKeyWriteWithLink(c->db, key, &link); + if (checkType(c, kv, OBJ_HASH)) return NULL; + + if (kv == NULL) { + robj *o = createHashObject(); + kv = dbAddByLink(c->db, key, &o, &link); + } + return kv; +} + + +void hashTypeConvertListpack(robj *o, int enc) { + serverAssert(o->encoding == OBJ_ENCODING_LISTPACK); + + if (enc == OBJ_ENCODING_LISTPACK) { + /* Nothing to do... */ + + } else if (enc == OBJ_ENCODING_LISTPACK_EX) { + unsigned char *p; + + /* Append HASH_LP_NO_TTL to each field name - value pair. */ + p = lpFirst(o->ptr); + while (p != NULL) { + p = lpNext(o->ptr, p); + serverAssert(p); + + o->ptr = lpInsertInteger(o->ptr, HASH_LP_NO_TTL, p, LP_AFTER, &p); + p = lpNext(o->ptr, p); + } + + listpackEx *lpt = listpackExCreate(); + lpt->lp = o->ptr; + o->encoding = OBJ_ENCODING_LISTPACK_EX; + o->ptr = lpt; + } else if (enc == OBJ_ENCODING_HT) { + hashTypeIterator hi; + dict *dict; + int ret; + + hashTypeInitIterator(&hi, o); + dict = dictCreate(&entryHashDictType); + + /* Presize the dict to avoid rehashing */ + dictExpand(dict,hashTypeLength(o, 0)); + + size_t usable, *alloc_size = htGetMetadataSize(dict); + while (hashTypeNext(&hi, 0) != C_ERR) { + Entry *entry = hashTypeCurrentObjectNewEntry(&hi, &usable); + ret = dictAdd(dict, entry, NULL); + if (ret != DICT_OK) { + entryFree(entry, NULL); /* Needed for gcc ASAN */ + hashTypeResetIterator(&hi); /* Needed for gcc ASAN */ + serverLogHexDump(LL_WARNING,"listpack with dup elements dump", + o->ptr,lpBytes(o->ptr)); + serverPanic("Listpack corruption detected"); + } + *alloc_size += usable; + } + hashTypeResetIterator(&hi); + zfree(o->ptr); + o->encoding = OBJ_ENCODING_HT; + o->ptr = dict; + } else { + serverPanic("Unknown hash encoding"); + } +} + +/* db can be NULL to avoid registration in subexpires */ +void hashTypeConvertListpackEx(redisDb *db, robj *o, int enc) { + serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); + + if (enc == OBJ_ENCODING_LISTPACK_EX) { + return; + } else if (enc == OBJ_ENCODING_HT) { + uint64_t minExpire = EB_EXPIRE_TIME_INVALID; + int ret, slot = -1; + hashTypeIterator hi; + dict *dict; + htMetadataEx *dictExpireMeta; + listpackEx *lpt = o->ptr; + + if (db && lpt->meta.trash != 1) { + minExpire = hashTypeGetMinExpire(o, 0); + slot = getKeySlot(kvobjGetKey(o)); + estoreRemove(db->subexpires, slot, o); + } + + dict = dictCreate(&entryHashDictTypeWithHFE); + dictExpand(dict,hashTypeLength(o, 0)); + dictExpireMeta = htGetMetadataEx(dict); + + /* Fillup dict HFE metadata */ + dictExpireMeta->hfe = ebCreate(); /* Allocate HFE DS */ + dictExpireMeta->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ + + hashTypeInitIterator(&hi, o); + + size_t usable, *alloc_size = &dictExpireMeta->alloc_size; + while (hashTypeNext(&hi, 0) != C_ERR) { + /* Create entry with both field and value */ + Entry *entry = hashTypeCurrentObjectNewEntry(&hi, &usable); + ret = dictAdd(dict, entry, NULL); + if (ret != DICT_OK) { + entryFree(entry, NULL); /* Needed for gcc ASAN */ + hashTypeResetIterator(&hi); /* Needed for gcc ASAN */ + serverLogHexDump(LL_WARNING,"listpack with dup elements dump", + lpt->lp,lpBytes(lpt->lp)); + serverPanic("Listpack corruption detected"); + } + *alloc_size += usable; + + if (hi.expire_time != EB_EXPIRE_TIME_INVALID) + ebAdd(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, entry, hi.expire_time); + } + hashTypeResetIterator(&hi); + listpackExFree(lpt); + + o->encoding = OBJ_ENCODING_HT; + o->ptr = dict; + + if (minExpire != EB_EXPIRE_TIME_INVALID) + estoreAdd(db->subexpires, slot, o, minExpire); + } else { + serverPanic("Unknown hash encoding: %d", enc); + } +} + +/* NOTE: db can be NULL (Won't register in global HFE DS) */ +void hashTypeConvert(redisDb *db, robj *o, int enc) { + if (o->encoding == OBJ_ENCODING_LISTPACK) { + hashTypeConvertListpack(o, enc); + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + hashTypeConvertListpackEx(db, o, enc); + } else if (o->encoding == OBJ_ENCODING_HT) { + serverPanic("Not implemented"); + } else { + serverPanic("Unknown hash encoding"); + } +} + +/* This is a helper function for the COPY command. + * Duplicate a hash object, with the guarantee that the returned object + * has the same encoding as the original one. + * + * The resulting object always has refcount set to 1 */ +robj *hashTypeDup(kvobj *o, uint64_t *minHashExpire) { + robj *hobj; + hashTypeIterator hi; + + serverAssert(o->type == OBJ_HASH); + + if(o->encoding == OBJ_ENCODING_LISTPACK) { + unsigned char *zl = o->ptr; + size_t sz = lpBytes(zl); + unsigned char *new_zl = zmalloc(sz); + memcpy(new_zl, zl, sz); + hobj = createObject(OBJ_HASH, new_zl); + hobj->encoding = OBJ_ENCODING_LISTPACK; + } else if(o->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = o->ptr; + + if (lpt->meta.trash == 0) + *minHashExpire = ebGetMetaExpTime(&lpt->meta); + + listpackEx *dup = listpackExCreate(); + + size_t sz = lpBytes(lpt->lp); + dup->lp = lpNew(sz); + memcpy(dup->lp, lpt->lp, sz); + + hobj = createObject(OBJ_HASH, dup); + hobj->encoding = OBJ_ENCODING_LISTPACK_EX; + } else if(o->encoding == OBJ_ENCODING_HT) { + htMetadataEx *dictExpireMetaSrc, *dictExpireMetaDst = NULL; + dict *d; + + /* If dict doesn't have HFE metadata, then create a new dict without it */ + if (!isDictWithMetaHFE(o->ptr)) { + d = dictCreate(&entryHashDictType); + } else { + /* Create a new dict with HFE metadata */ + d = dictCreate(&entryHashDictTypeWithHFE); + dictExpireMetaSrc = htGetMetadataEx((dict *) o->ptr); + dictExpireMetaDst = htGetMetadataEx(d); + dictExpireMetaDst->hfe = ebCreate(); /* Allocate HFE DS */ + dictExpireMetaDst->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ + + /* Extract the minimum expire time of the source hash (Will be used by caller + * to register the new hash in the global subexpires DB) */ + if (dictExpireMetaSrc->expireMeta.trash == 0) + *minHashExpire = ebGetMetaExpTime(&dictExpireMetaSrc->expireMeta); + } + dictExpand(d, dictSize((const dict*)o->ptr)); + + size_t usable, *alloc_size = htGetMetadataSize(d); + hashTypeInitIterator(&hi, o); + while (hashTypeNext(&hi, 0) != C_ERR) { + Entry *newEntry; + uint64_t expireTime; + /* Extract a field-value pair from an original hash object.*/ + char *field, *value; + size_t fieldLen, valueLen; + hashTypeCurrentFromHashTable(&hi, OBJ_HASH_KEY, &field, &fieldLen, &expireTime); + hashTypeCurrentFromHashTable(&hi, OBJ_HASH_VALUE, &value, &valueLen, NULL); + + /* Create new entry with field and value */ + sds newFieldSds = sdsnewlen(field, fieldLen); + sds newValueSds = sdsnewlen(value, valueLen); + /* Create new entry with field and value, optional expiry. */ + if (expireTime == EB_EXPIRE_TIME_INVALID) { + newEntry = entryCreate(newFieldSds, newValueSds, + ENTRY_TAKE_VALUE, &usable); + } else { + newEntry = entryCreate(newFieldSds, newValueSds, + ENTRY_TAKE_VALUE | ENTRY_HAS_EXPIRY, &usable); + ebAdd(&dictExpireMetaDst->hfe, &hashFieldExpireBucketsType, newEntry, expireTime); + } + sdsfree(newFieldSds); /* (Only value ownership transferred to entry) */ + + /* Add entry to new hash object. */ + dictAdd(d, newEntry, NULL); /* no_value=1, so value is NULL */ + *alloc_size += usable; + } + hashTypeResetIterator(&hi); + + hobj = createObject(OBJ_HASH, d); + hobj->encoding = OBJ_ENCODING_HT; + } else { + serverPanic("Unknown hash encoding"); + } + return hobj; +} + +/* Create a new sds string from the listpack entry. */ +sds hashSdsFromListpackEntry(listpackEntry *e) { + return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval); +} + +/* Reply with bulk string from the listpack entry. */ +void hashReplyFromListpackEntry(client *c, listpackEntry *e) { + if (e->sval) + addReplyBulkCBuffer(c, e->sval, e->slen); + else + addReplyBulkLongLong(c, e->lval); +} + +/* Return random element from a non empty hash. + * 'key' and 'val' will be set to hold the element. + * The memory in them is not to be freed or modified by the caller. + * 'val' can be NULL in which case it's not extracted. */ +void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, CommonEntry *key, CommonEntry *val) { + if (hashobj->encoding == OBJ_ENCODING_HT) { + dictEntry *de = dictGetFairRandomKey(hashobj->ptr); + Entry *entry = dictGetKey(de); + sds field = entryGetField(entry); + key->sval = (unsigned char*) field; + key->slen = sdslen(field); + if (val) { + sds s = entryGetValue(entry); + val->sval = (unsigned char*)s; + val->slen = sdslen(s); + } + } else if (hashobj->encoding == OBJ_ENCODING_LISTPACK) { + lpRandomPair(hashobj->ptr, hashsize, (listpackEntry *) key, (listpackEntry *) val, 2); + } else if (hashobj->encoding == OBJ_ENCODING_LISTPACK_EX) { + lpRandomPair(hashTypeListpackGetLp(hashobj), hashsize, (listpackEntry *) key, + (listpackEntry *) val, 3); + } else { + serverPanic("Unknown hash encoding"); + } +} + +/* Delete all expired fields from the hash and delete the hash if left empty. + * + * updateSubexpires - If the hash should be updated in the subexpires DB with new + * expiration time in case expired fields were deleted. + * + * Return next Expire time of the hash + * - 0 if hash got deleted + * - EB_EXPIRE_TIME_INVALID if no more fields to expire + */ +uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int updateSubexpires) { + uint64_t noExpireLeftRes = EB_EXPIRE_TIME_INVALID; + ExpireInfo info = {0}; + + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + info = (ExpireInfo) { + .maxToExpire = *quota, + .now = commandTimeSnapshot(), + .itemsExpired = 0}; + + listpackExExpire(db, o, &info); + } else { + serverAssert(o->encoding == OBJ_ENCODING_HT); + + dict *d = o->ptr; + htMetadataEx *dictExpireMeta = htGetMetadataEx(d); + + OnFieldExpireCtx onFieldExpireCtx = { .hashObj = o, .db = db }; + + info = (ExpireInfo){ + .maxToExpire = *quota, + .onExpireItem = onFieldExpire, + .ctx = &onFieldExpireCtx, + .now = commandTimeSnapshot() + }; + + ebExpire(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, &info); + } + + /* Update quota left */ + *quota -= info.itemsExpired; + + /* In some cases, a field might have been deleted without updating the global DS. + * As a result, active-expire might not expire any fields, in such cases, + * we don't need to send notifications or perform other operations for this key. */ + if (info.itemsExpired) { + sds keystr = kvobjGetKey(o); + robj *key = createStringObject(keystr, sdslen(keystr)); + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", key, db->id); + int slot; + int deleted = 0; + + if (updateSubexpires) { + slot = getKeySlot(keystr); + estoreRemove(db->subexpires, slot, o); + } + + if (hashTypeLength(o, 0) == 0) { + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id); + dbDelete(db, key); + noExpireLeftRes = 0; + deleted = 1; + } else { + if ((updateSubexpires) && (info.nextExpireTime != EB_EXPIRE_TIME_INVALID)) + estoreAdd(db->subexpires, slot, o, info.nextExpireTime); + } + + keyModified(NULL, db, key, deleted ? NULL : o, 1); + decrRefCount(key); + } + + /* return 0 if hash got deleted, EB_EXPIRE_TIME_INVALID if no more fields + * with expiration. Else return next expiration time */ + return (info.nextExpireTime == EB_EXPIRE_TIME_INVALID) ? noExpireLeftRes : info.nextExpireTime; +} + +/* Delete all expired fields in hash if needed (Currently used only by HRANDFIELD) + * + * NOTICE: If we call this function in other places, we should consider the slot + * migration scenario, where we don't want to delete expired fields. See also + * expireIfNeeded(). + * + * Return 1 if the entire hash was deleted, 0 otherwise. + * This function might be pricy in case there are many expired fields. + */ +static int hashTypeExpireIfNeeded(redisDb *db, kvobj *o) { + uint64_t nextExpireTime; + uint64_t minExpire = hashTypeGetMinExpire(o, 1 /*accurate*/); + + /* Nothing to expire */ + if ((mstime_t) minExpire >= commandTimeSnapshot()) + return 0; + + /* Follow expireIfNeeded() conditions of when not lazy-expire */ + if ( (server.loading) || + (server.allow_access_expired) || + (server.masterhost) || /* master-client or user-client, don't delete */ + (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) + return 0; + + /* Take care to expire all the fields */ + uint32_t quota = UINT32_MAX; + nextExpireTime = hashTypeActiveExpire(db, o, "a, 1); + /* return 1 if the entire hash was deleted */ + return nextExpireTime == 0; +} + +/* Return the next/minimum expiry time of the hash-field. + * accurate=1 - Return the exact time by looking into the object DS. + * accurate=0 - Return the minimum expiration time maintained in expireMeta + * (Verify it is not trash before using it) which might not be + * accurate due to optimization reasons. + * + * If not found, return EB_EXPIRE_TIME_INVALID + */ +uint64_t hashTypeGetMinExpire(robj *o, int accurate) { + ExpireMeta *expireMeta = NULL; + + if (!accurate) { + if (o->encoding == OBJ_ENCODING_LISTPACK) { + return EB_EXPIRE_TIME_INVALID; + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = o->ptr; + expireMeta = &lpt->meta; + } else { + serverAssert(o->encoding == OBJ_ENCODING_HT); + + dict *d = o->ptr; + if (!isDictWithMetaHFE(d)) + return EB_EXPIRE_TIME_INVALID; + + expireMeta = &htGetMetadataEx(d)->expireMeta; + } + + /* Keep aside next hash-field expiry before updating HFE DS. Verify it is not trash */ + if (expireMeta->trash == 1) + return EB_EXPIRE_TIME_INVALID; + + return ebGetMetaExpTime(expireMeta); + } + + /* accurate == 1 */ + + if (o->encoding == OBJ_ENCODING_LISTPACK) { + return EB_EXPIRE_TIME_INVALID; + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + return listpackExGetMinExpire(o); + } else { + serverAssert(o->encoding == OBJ_ENCODING_HT); + + dict *d = o->ptr; + if (!isDictWithMetaHFE(d)) + return EB_EXPIRE_TIME_INVALID; + + htMetadataEx *expireMeta = htGetMetadataEx(d); + return ebGetNextTimeToExpire(expireMeta->hfe, &hashFieldExpireBucketsType); + } +} + +int hashTypeIsFieldsWithExpire(robj *o) { + if (o->encoding == OBJ_ENCODING_LISTPACK) { + return 0; + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + return EB_EXPIRE_TIME_INVALID != listpackExGetMinExpire(o); + } else { /* o->encoding == OBJ_ENCODING_HT */ + dict *d = o->ptr; + /* If dict doesn't holds HFE metadata */ + if (!isDictWithMetaHFE(d)) + return 0; + htMetadataEx *meta = htGetMetadataEx(d); + return ebGetTotalItems(meta->hfe, &hashFieldExpireBucketsType) != 0; + } +} + +void hashTypeFree(robj *o) { + switch (o->encoding) { + case OBJ_ENCODING_HT: + /* Verify hash is not registered in global HFE ds */ + if (isDictWithMetaHFE((dict*)o->ptr)) { + htMetadataEx *m = htGetMetadataEx((dict*)o->ptr); + serverAssert(m->expireMeta.trash == 1); + } +#ifdef DEBUG_ASSERTIONS + dictEmpty(o->ptr, NULL); + debugServerAssert(*htGetMetadataSize(o->ptr) == 0); +#endif + dictRelease((dict*) o->ptr); + break; + case OBJ_ENCODING_LISTPACK: + lpFree(o->ptr); + break; + case OBJ_ENCODING_LISTPACK_EX: + /* Verify hash is not registered in global HFE ds */ + serverAssert(((listpackEx *) o->ptr)->meta.trash == 1); + listpackExFree(o->ptr); + break; + default: + serverPanic("Unknown hash encoding type"); + break; + } +} + +ebuckets *hashTypeGetDictMetaHFE(dict *d) { + htMetadataEx *dictExpireMeta = htGetMetadataEx(d); + return &dictExpireMeta->hfe; +} + +/*----------------------------------------------------------------------------- + * Hash type commands + *----------------------------------------------------------------------------*/ + +void hsetnxCommand(client *c) { + unsigned long hlen; + int isHashDeleted; + size_t oldsize = 0; + robj *kv = hashTypeLookupWriteOrCreate(c,c->argv[1]); + if (kv == NULL) return; + + if (hashTypeExists(c->db, kv, c->argv[2]->ptr, HFE_LAZY_EXPIRE, &isHashDeleted)) { + addReply(c, shared.czero); + return; + } + + /* Field expired and in turn hash deleted. Create new one! */ + if (isHashDeleted) { + robj *o = createHashObject(); + kv = dbAdd(c->db,c->argv[1],&o); + } + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(kv); + hashTypeTryConversion(c->db, kv, c->argv, 2, 3); + hashTypeSet(c->db, kv, c->argv[2]->ptr, c->argv[3]->ptr, HASH_SET_COPY); + addReply(c, shared.cone); + keyModified(c,c->db,c->argv[1], kv, 1); + notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); + hlen = hashTypeLength(kv, 0); + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, hlen - 1, hlen); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(kv)); + server.dirty++; +} + +void hsetCommand(client *c) { + int i, created = 0; + size_t oldsize = 0; + kvobj *kv; + + if ((c->argc % 2) == 1) { + addReplyErrorArity(c); + return; + } + + if ((kv = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(kv); + hashTypeTryConversion(c->db, kv, c->argv, 2, c->argc-1); + + for (i = 2; i < c->argc; i += 2) + created += !hashTypeSet(c->db, kv, c->argv[i]->ptr, c->argv[i+1]->ptr, HASH_SET_COPY); + + /* HMSET (deprecated) and HSET return value is different. */ + char *cmdname = c->argv[0]->ptr; + if (cmdname[1] == 's' || cmdname[1] == 'S') { + /* HSET */ + addReplyLongLong(c, created); + } else { + /* HMSET */ + addReply(c, shared.ok); + } + keyModified(c,c->db,c->argv[1],kv,1); + unsigned long l = hashTypeLength(kv, 0); + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l - created, l); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(kv)); + notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); + server.dirty += (c->argc - 2)/2; +} + +/* Parse expire time from argument and do boundary checks. */ +static int parseExpireTime(client *c, robj *o, int unit, long long basetime, + long long *expire) +{ + long long val; + + /* Read the expiry time from command */ + if (getLongLongFromObjectOrReply(c, o, &val, NULL) != C_OK) + return C_ERR; + + if (val < 0) { + addReplyError(c,"invalid expire time, must be >= 0"); + return C_ERR; + } + + if (unit == UNIT_SECONDS) { + if (val > (long long) HFE_MAX_ABS_TIME_MSEC / 1000) { + addReplyErrorExpireTime(c); + return C_ERR; + } + val *= 1000; + } + + if (val > (long long) HFE_MAX_ABS_TIME_MSEC - basetime) { + addReplyErrorExpireTime(c); + return C_ERR; + } + val += basetime; + *expire = val; + return C_OK; +} + +/* Flags that are used as part of HGETEX and HSETEX commands. */ +#define HFE_EX (1<<0) /* Expiration time in seconds */ +#define HFE_PX (1<<1) /* Expiration time in milliseconds */ +#define HFE_EXAT (1<<2) /* Expiration time in unix seconds */ +#define HFE_PXAT (1<<3) /* Expiration time in unix milliseconds */ +#define HFE_PERSIST (1<<4) /* Persist fields */ +#define HFE_KEEPTTL (1<<5) /* Do not discard field ttl on set op */ +#define HFE_FXX (1<<6) /* Set fields if all the fields already exist */ +#define HFE_FNX (1<<7) /* Set fields if none of the fields exist */ + +/* Command types for unified hash argument parser */ +#define HASH_CMD_HGETEX 0 +#define HASH_CMD_HSETEX 1 + +/* Parse hash field expiration command arguments for both HGETEX and HSETEX. + * HGETEX <key> [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|PERSIST] + * FIELDS <numfields> field [field ...] + * HSETEX <key> [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL] + * [FXX|FNX] FIELDS <numfields> field value [field value ...] + */ +static int parseHashFieldExpireArgs(client *c, int *flags, + long long *expire_time, int *expire_time_pos, + int *first_field_pos, int *field_count, + int command_type) { + *flags = 0; + *first_field_pos = -1; + *field_count = -1; + *expire_time_pos = -1; + + for (int i = 2; i < c->argc; i++) { + if (!strcasecmp(c->argv[i]->ptr, "fields")) { + int args_per_field = (command_type == HASH_CMD_HSETEX) ? 2 : 1; + long val; + /* Ensure we have at least the numfields argument */ + if (i + 1 >= c->argc) { + addReplyErrorArity(c); + return C_ERR; + } + + if (getRangeLongFromObjectOrReply(c, c->argv[i + 1], 1, INT_MAX, &val, + "invalid number of fields") != C_OK) + return C_ERR; + + *first_field_pos = i + 2; + *field_count = (int) val; + + /* Validate field count based on command type */ + long long required_args = *first_field_pos + ((long long)*field_count * args_per_field); + if (required_args > c->argc) { + addReplyError(c, "wrong number of arguments"); + return C_ERR; + } + + /* Skip over numfields and all field-value pairs + * Set i to the last position of the FIELDS block, loop will increment past it */ + i = *first_field_pos + (*field_count * args_per_field) - 1; + continue; + } else if (!strcasecmp(c->argv[i]->ptr, "EX")) { + if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) + goto err_expiration; + + if (i >= c->argc - 1) + goto err_missing_expire; + + *flags |= HFE_EX; + i++; + if (parseExpireTime(c, c->argv[i], UNIT_SECONDS, + commandTimeSnapshot(), expire_time) != C_OK) + return C_ERR; + + *expire_time_pos = i; + } else if (!strcasecmp(c->argv[i]->ptr, "PX")) { + if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) + goto err_expiration; + + if (i >= c->argc - 1) + goto err_missing_expire; + + *flags |= HFE_PX; + i++; + if (parseExpireTime(c, c->argv[i], UNIT_MILLISECONDS, + commandTimeSnapshot(), expire_time) != C_OK) + return C_ERR; + + *expire_time_pos = i; + } else if (!strcasecmp(c->argv[i]->ptr, "EXAT")) { + if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) + goto err_expiration; + + if (i >= c->argc - 1) + goto err_missing_expire; + + *flags |= HFE_EXAT; + i++; + if (parseExpireTime(c, c->argv[i], UNIT_SECONDS, 0, expire_time) != C_OK) + return C_ERR; + + *expire_time_pos = i; + } else if (!strcasecmp(c->argv[i]->ptr, "PXAT")) { + if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) + goto err_expiration; + + if (i >= c->argc - 1) + goto err_missing_expire; + + *flags |= HFE_PXAT; + i++; + if (parseExpireTime(c, c->argv[i], UNIT_MILLISECONDS, 0, + expire_time) != C_OK) + return C_ERR; + + *expire_time_pos = i; + } else if (!strcasecmp(c->argv[i]->ptr, "PERSIST")) { + if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_PERSIST)) + goto err_expiration; + *flags |= HFE_PERSIST; + } else if (!strcasecmp(c->argv[i]->ptr, "KEEPTTL")) { + if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL)) + goto err_expiration; + *flags |= HFE_KEEPTTL; + } else if (!strcasecmp(c->argv[i]->ptr, "FXX")) { + if (*flags & (HFE_FXX | HFE_FNX)) + goto err_condition; + *flags |= HFE_FXX; + } else if (!strcasecmp(c->argv[i]->ptr, "FNX")) { + if (*flags & (HFE_FXX | HFE_FNX)) + goto err_condition; + *flags |= HFE_FNX; + } else { + addReplyErrorFormat(c, "unknown argument: %s", (char*) c->argv[i]->ptr); + return C_ERR; + } + } + + /* Validate command-specific argument compatibility */ + if ((command_type == HASH_CMD_HGETEX && (*flags & (HFE_KEEPTTL | HFE_FXX | HFE_FNX))) || + (command_type == HASH_CMD_HSETEX && (*flags & HFE_PERSIST))) { + addReplyError(c, "unknown argument"); + return C_ERR; + } + + return C_OK; + +err_missing_expire: + addReplyError(c, "missing expire time"); + return C_ERR; +err_condition: + addReplyError(c, "Only one of FXX or FNX arguments can be specified"); + return C_ERR; +err_expiration: + if (command_type == HASH_CMD_HSETEX) { + addReplyError(c, "Only one of EX, PX, EXAT, PXAT or KEEPTTL arguments can be specified"); + } else { + addReplyError(c, "Only one of EX, PX, EXAT, PXAT or PERSIST arguments can be specified"); + } + return C_ERR; +} + +/* Set the value of one or more fields of a given hash key, and optionally set + * their expiration. + * + * HSETEX key + * [FNX | FXX] + * [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL] + * FIELDS <numfields> field value [field value...] + * + * Reply: + * Integer reply: 0 if no fields were set (due to FXX/FNX args) + * Integer reply: 1 if all the fields were set + */ +void hsetexCommand(client *c) { + int flags = 0, first_field_pos = 0, field_count = 0, expire_time_pos = -1; + int updated = 0, deleted = 0, set_expiry; + int expired = 0, fields_set = 0; + long long expire_time = EB_EXPIRE_TIME_INVALID; + int64_t oldlen, newlen; + HashTypeSetEx setex; + dictEntryLink link; + size_t oldsize = 0; + + if (parseHashFieldExpireArgs(c, &flags, &expire_time, &expire_time_pos, + &first_field_pos, &field_count, HASH_CMD_HSETEX) != C_OK) + return; + + kvobj *o = lookupKeyWriteWithLink(c->db, c->argv[1], &link); + if (checkType(c, o, OBJ_HASH)) + return; + + if (!o) { + if (flags & HFE_FXX) { + addReplyLongLong(c, 0); + return; + } + o = createHashObject(); + dbAddByLink(c->db, c->argv[1], &o, &link); + } + oldlen = (int64_t) hashTypeLength(o, 0); + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + + if (flags & (HFE_FXX | HFE_FNX)) { + int found = 0; + for (int i = 0; i < field_count; i++) { + sds field = c->argv[first_field_pos + (i * 2)]->ptr; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; + const int opt = HFE_LAZY_NO_NOTIFICATION | + HFE_LAZY_NO_SIGNAL | + HFE_LAZY_AVOID_HASH_DEL | + HFE_LAZY_NO_UPDATE_KEYSIZES | + HFE_LAZY_NO_UPDATE_ALLOCSIZES; + + GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, opt, NULL); + int exists = (res == GETF_OK); + expired += (res == GETF_EXPIRED); + found += exists; + + /* Check for early exit if the condition is already invalid. */ + if (((flags & HFE_FXX) && !exists) || + ((flags & HFE_FNX) && exists)) + break; + } + + int all_exists = (found == field_count); + int non_exists = (found == 0); + + if (((flags & HFE_FNX) && !non_exists) || + ((flags & HFE_FXX) && !all_exists)) + { + addReplyLongLong(c, 0); + goto out; + } + } + hashTypeTryConversion(c->db, o,c->argv, first_field_pos, c->argc - 1); + + /* Check if we will set the expiration time. */ + set_expiry = flags & (HFE_EX | HFE_PX | HFE_EXAT | HFE_PXAT); + if (set_expiry) + hashTypeSetExInit(c->argv[1], o, c, c->db, 0, &setex); + + for (int i = 0; i < field_count; i++) { + sds field = c->argv[first_field_pos + (i * 2)]->ptr; + sds value = c->argv[first_field_pos + (i * 2) + 1]->ptr; + + int opt = HASH_SET_COPY; + /* If we are going to set the expiration time later, no need to discard + * it as part of set operation now. */ + if (flags & (HFE_EX | HFE_PX | HFE_EXAT | HFE_PXAT | HFE_KEEPTTL)) + opt |= HASH_SET_KEEP_TTL; + + hashTypeSet(c->db, o, field, value, opt); + fields_set = 1; + /* Update the expiration time. */ + if (set_expiry) { + int ret = hashTypeSetEx(o, field, expire_time, &setex); + updated += (ret == HSETEX_OK); + deleted += (ret == HSETEX_DELETED); + } + } + + if (set_expiry) + hashTypeSetExDone(&setex); + + server.dirty += field_count; + + if (deleted) { + /* If fields are deleted due to timestamp is being in the past, hdel's + * are already propagated. No need to propagate the command itself. */ + preventCommandPropagation(c); + } else if (set_expiry && !(flags & HFE_PXAT)) { + /* Propagate as 'HSETEX <key> PXAT ..' if there is EX/EXAT/PX flag*/ + + /* Replace EX/EXAT/PX with PXAT */ + rewriteClientCommandArgument(c, expire_time_pos - 1, shared.pxat); + /* Replace timestamp with unix timestamp milliseconds. */ + robj *expire = createStringObjectFromLongLong(expire_time); + rewriteClientCommandArgument(c, expire_time_pos, expire); + decrRefCount(expire); + } + + addReplyLongLong(c, 1); + +out: + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + /* Emit keyspace notifications based on field expiry, mutation, or key deletion */ + if (fields_set || expired) { + keyModified(c, c->db, c->argv[1], o, 1); + if (expired) + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + if (fields_set) { + notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); + if (deleted || updated) + notifyKeyspaceEvent(NOTIFY_HASH, deleted ? "hdel" : "hexpire", c->argv[1], c->db->id); + } + } + /* Key may become empty due to lazy expiry in hashTypeExists() + * or the new expiration time is in the past.*/ + newlen = (int64_t) hashTypeLength(o, 0); + if (newlen == 0) { + newlen = -1; + /* Del key but don't update KEYSIZES. else it will decr wrong bin in histogram */ + dbDeleteSkipKeysizesUpdate(c->db, c->argv[1]); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + } + if (oldlen != newlen) + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, + oldlen, newlen); +} + +void hincrbyCommand(client *c) { + long long value, incr, oldvalue; + kvobj *o; + sds new; + unsigned char *vstr; + unsigned int vlen; + size_t oldsize = 0; + + if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + + GetFieldRes res = hashTypeGetValue(c->db,o,c->argv[2]->ptr,&vstr,&vlen,&value, + HFE_LAZY_EXPIRE, NULL); + if (res == GETF_OK) { + if (vstr) { + if (string2ll((char*)vstr,vlen,&value) == 0) { + addReplyError(c,"hash value is not an integer"); + return; + } + } /* Else hashTypeGetValue() already stored it into &value */ + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; + unsigned long l = hashTypeLength(o, 0); + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1); + } else { + /* Field expired and in turn hash deleted. Create new one! */ + o = createHashObject(); + dbAdd(c->db,c->argv[1],&o); + value = 0; + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1); + } + + oldvalue = value; + if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || + (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) { + addReplyError(c,"increment or decrement would overflow"); + return; + } + value += incr; + new = sdsfromlonglong(value); + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_TTL); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + addReplyLongLong(c,value); + keyModified(c,c->db,c->argv[1], o, 1); + notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); + server.dirty++; +} + +void hincrbyfloatCommand(client *c) { + long double value, incr; + long long ll; + kvobj *o; + sds new; + unsigned char *vstr; + unsigned int vlen; + size_t oldsize = 0; + + if (getLongDoubleFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; + if (isnan(incr) || isinf(incr)) { + addReplyError(c,"value is NaN or Infinity"); + return; + } + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll, + HFE_LAZY_EXPIRE, NULL); + if (res == GETF_OK) { + if (vstr) { + if (string2ld((char*)vstr,vlen,&value) == 0) { + addReplyError(c,"hash value is not a float"); + return; + } + } else { + value = (long double)ll; + } + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; + unsigned long l = hashTypeLength(o, 0); + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1); + } else { + /* Field expired and in turn hash deleted. Create new one! */ + o = createHashObject(); + dbAdd(c->db, c->argv[1], &o); + value = 0; + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1); + } + + value += incr; + if (isnan(value) || isinf(value)) { + addReplyError(c,"increment would produce NaN or Infinity"); + return; + } + + char buf[MAX_LONG_DOUBLE_CHARS]; + int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN); + new = sdsnewlen(buf,len); + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_TTL); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + addReplyBulkCBuffer(c,buf,len); + keyModified(c,c->db,c->argv[1],o,1); + notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); + server.dirty++; + + /* Always replicate HINCRBYFLOAT as an HSETEX command with the final value + * in order to make sure that differences in float precision or formatting + * will not create differences in replicas or after an AOF restart. + * The KEEPTTL flag is used to make sure the field TTL is preserved. */ + robj *newobj; + newobj = createRawStringObject(buf,len); + rewriteClientCommandVector(c, 7, shared.hsetex, c->argv[1], shared.keepttl, + shared.fields, shared.integers[1], c->argv[2], newobj); + decrRefCount(newobj); +} + +static GetFieldRes addHashFieldToReply(client *c, kvobj *o, sds field, int hfeFlags) { + if (o == NULL) { + addReplyNull(c); + return GETF_NOT_FOUND; + } + + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; + + GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, hfeFlags, NULL); + if (res == GETF_OK) { + if (vstr) { + addReplyBulkCBuffer(c, vstr, vlen); + } else { + addReplyBulkLongLong(c, vll); + } + } else { + addReplyNull(c); + } + return res; +} + +void hgetCommand(client *c) { + kvobj *o; + + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL || + checkType(c,o,OBJ_HASH)) return; + + addHashFieldToReply(c, o, c->argv[2]->ptr, HFE_LAZY_EXPIRE); +} + +void hmgetCommand(client *c) { + GetFieldRes res = GETF_OK; + int i; + int expired = 0, deleted = 0; + + /* Don't abort when the key cannot be found. Non-existing keys are empty + * hashes, where HMGET should respond with a series of null bulks. */ + kvobj *o = lookupKeyRead(c->db, c->argv[1]); + if (checkType(c,o,OBJ_HASH)) return; + + addReplyArrayLen(c, c->argc-2); + for (i = 2; i < c->argc ; i++) { + if (!deleted) { + res = addHashFieldToReply(c, o, c->argv[i]->ptr, HFE_LAZY_NO_NOTIFICATION); + expired += (res == GETF_EXPIRED); + deleted += (res == GETF_EXPIRED_HASH); + } else { + /* If hash got lazy expired since all fields are expired (o is invalid), + * then fill the rest with trivial nulls and return. */ + addReplyNull(c); + } + } + + if (expired) { + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + if (deleted) + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + } +} + +/* Get and delete the value of one or more fields of a given hash key. + * HGETDEL <key> FIELDS <numfields> field1 field2 ... + * Reply: list of the value associated with each field or nil if the field + * doesn’t exist. + */ +void hgetdelCommand(client *c) { + int res = 0, hfe = 0, deleted = 0, expired = 0; + int64_t oldlen = -1; /* not exists as long as it is not set */ + long num_fields = 0; + size_t oldsize = 0; + + kvobj *o = lookupKeyWrite(c->db, c->argv[1]); + if (checkType(c, o, OBJ_HASH)) + return; + + if (strcasecmp(c->argv[2]->ptr, "FIELDS") != 0) { + addReplyError(c, "Mandatory argument FIELDS is missing or not at the right position"); + return; + } + + /* Read number of fields */ + if (getRangeLongFromObjectOrReply(c, c->argv[3], 1, LONG_MAX, &num_fields, + "Number of fields must be a positive integer") != C_OK) + return; + + /* Verify `numFields` is consistent with number of arguments */ + if (num_fields != c->argc - 4) { + addReplyError(c, "The `numfields` parameter must match the number of arguments"); + return; + } + + /* Hash field expiration is optimized to avoid frequent update global HFE DS + * for each field deletion. Eventually active-expiration will run and update + * or remove the hash from global HFE DS gracefully. Nevertheless, statistic + * "subexpiry" might reflect wrong number of hashes with HFE to the user if + * it is the last field with expiration. The following logic checks if this + * is the last field with expiration and removes it from global HFE DS. */ + if (o) { + hfe = hashTypeIsFieldsWithExpire(o); + oldlen = hashTypeLength(o, 0); + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + } + + addReplyArrayLen(c, num_fields); + for (int i = 4; i < c->argc; i++) { + const int flags = HFE_LAZY_NO_NOTIFICATION | + HFE_LAZY_NO_SIGNAL | + HFE_LAZY_AVOID_HASH_DEL | + HFE_LAZY_NO_UPDATE_KEYSIZES | + HFE_LAZY_NO_UPDATE_ALLOCSIZES; + res = addHashFieldToReply(c, o, c->argv[i]->ptr, flags); + expired += (res == GETF_EXPIRED); + /* Try to delete only if it's found and not expired lazily. */ + if (res == GETF_OK) { + deleted++; + serverAssert(hashTypeDelete(o, c->argv[i]->ptr) == 1); + } + } + + /* Return if no modification has been made. */ + if (expired == 0 && deleted == 0) + return; + + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + keyModified(c, c->db, c->argv[1], o, 1); + + if (expired) + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + if (deleted) { + notifyKeyspaceEvent(NOTIFY_HASH, "hdel", c->argv[1], c->db->id); + server.dirty += deleted; + + /* Propagate as HDEL command. + * Orig: HGETDEL <key> FIELDS <numfields> field1 field2 ... + * Repl: HDEL <key> field1 field2 ... */ + rewriteClientCommandArgument(c, 0, shared.hdel); + rewriteClientCommandArgument(c, 2, NULL); /* Delete FIELDS arg */ + rewriteClientCommandArgument(c, 2, NULL); /* Delete <numfields> arg */ + } + + /* Key may have become empty because of deleting fields or lazy expire. */ + int64_t newlen = (int64_t) hashTypeLength(o, 0); + if (newlen == 0) { + newlen = -1; + /* Del key but don't update KEYSIZES. else it will decr wrong bin in histogram */ + dbDeleteSkipKeysizesUpdate(c->db, c->argv[1]); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + } else { + if (hfe && (hashTypeIsFieldsWithExpire(o) == 0)) { /*is it last HFE*/ + estoreRemove(c->db->subexpires, getKeySlot(kvobjGetKey(o)), o); + } + } + + if (oldlen != newlen) + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, + oldlen, newlen); +} + +/* Get the value of one or more fields of a given hash key and optionally set + * their expiration. + * + * HGETEX <key> + * [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | PERSIST] + * FIELDS <numfields> field1 field2 ... + * + * Reply: list of the value associated with each field or nil if the field + * doesn’t exist. + */ +void hgetexCommand(client *c) { + int expired = 0, deleted = 0, updated = 0; + int parse_flags = 0, expire_time_pos = -1, first_field_pos = -1, num_fields = -1; + long long expire_time = 0; + int64_t oldlen = 0, newlen = -1; + HashTypeSetEx setex; + size_t oldsize = 0; + + kvobj *o = lookupKeyWrite(c->db, c->argv[1]); + if (checkType(c, o, OBJ_HASH)) + return; + + /* Parse arguments using flexible parser */ + if (parseHashFieldExpireArgs(c, &parse_flags, &expire_time, &expire_time_pos, &first_field_pos, &num_fields, HASH_CMD_HGETEX) != C_OK) + return; + + /* Non-existing keys and empty hashes are the same thing. Reply null if the + * key does not exist.*/ + if (!o) { + addReplyArrayLen(c, num_fields); + for (int i = 0; i < num_fields; i++) + addReplyNull(c); + return; + } + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + oldlen = hashTypeLength(o, 0); + if (parse_flags) + hashTypeSetExInit(c->argv[1], o, c, c->db, 0, &setex); + + addReplyArrayLen(c, num_fields); + for (int i = first_field_pos; i < first_field_pos + num_fields; i++) { + const int flags = HFE_LAZY_NO_NOTIFICATION | + HFE_LAZY_NO_SIGNAL | + HFE_LAZY_AVOID_HASH_DEL | + HFE_LAZY_NO_UPDATE_KEYSIZES | + HFE_LAZY_NO_UPDATE_ALLOCSIZES; + sds field = c->argv[i]->ptr; + int res = addHashFieldToReply(c, o, c->argv[i]->ptr, flags); + expired += (res == GETF_EXPIRED); + + /* Set expiration only if the field exists and not expired lazily. */ + if (res == GETF_OK && parse_flags) { + if (parse_flags & HFE_PERSIST) + expire_time = EB_EXPIRE_TIME_INVALID; + + res = hashTypeSetEx(o, field, expire_time, &setex); + deleted += (res == HSETEX_DELETED); + updated += (res == HSETEX_OK); + } + } + + if (parse_flags) + hashTypeSetExDone(&setex); + + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + + /* Exit early if no modification has been made. */ + if (expired == 0 && deleted == 0 && updated == 0) + return; + + server.dirty += deleted + updated; + keyModified(c, c->db, c->argv[1], o, 1); + + /* This command will never be propagated as it is. It will be propagated as + * HDELs when fields are lazily expired or deleted, if the new timestamp is + * in the past. HDEL's will be emitted as part of addHashFieldToReply() + * or hashTypeSetEx() in this case. + * + * If PERSIST flags is used, it will be propagated as HPERSIST command. + * IF EX/EXAT/PX/PXAT flags are used, it will be replicated as HPEXPRITEAT. + */ + if (expired) + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); + if (updated) { + /* Build canonical command for propagation */ + int canonical_argc; + robj **canonical_argv; + int idx = 0; + + if (parse_flags & HFE_PERSIST) { + notifyKeyspaceEvent(NOTIFY_HASH, "hpersist", c->argv[1], c->db->id); + /* Build canonical HPERSIST command: HPERSIST key FIELDS numfields field1 field2 ... */ + canonical_argc = 4 + num_fields; + canonical_argv = zmalloc(sizeof(robj*) * canonical_argc); + canonical_argv[idx++] = shared.hpersist; + incrRefCount(shared.hpersist); + canonical_argv[idx++] = c->argv[1]; /* key */ + incrRefCount(c->argv[1]); + } else { + notifyKeyspaceEvent(NOTIFY_HASH, "hexpire", c->argv[1], c->db->id); + /* Build canonical HPEXPIREAT command: HPEXPIREAT key timestamp FIELDS numfields field1 field2 ... */ + canonical_argc = 5 + num_fields; + canonical_argv = zmalloc(sizeof(robj*) * canonical_argc); + canonical_argv[idx++] = shared.hpexpireat; + incrRefCount(shared.hpexpireat); + canonical_argv[idx++] = c->argv[1]; /* key */ + incrRefCount(c->argv[1]); + canonical_argv[idx++] = createStringObjectFromLongLong(expire_time); /* timestamp */ + } + + canonical_argv[idx++] = shared.fields; + incrRefCount(shared.fields); + canonical_argv[idx++] = createStringObjectFromLongLong(num_fields); + for (int i = 0; i < num_fields; i++) { + canonical_argv[idx++] = c->argv[first_field_pos + i]; + incrRefCount(c->argv[first_field_pos + i]); + } + + replaceClientCommandVector(c, canonical_argc, canonical_argv); + } else if (deleted) { + /* If we are here, fields are deleted because new timestamp was in the + * past. HDELs are already propagated as part of hashTypeSetEx(). */ + notifyKeyspaceEvent(NOTIFY_HASH, "hdel", c->argv[1], c->db->id); + preventCommandPropagation(c); + } + + /* Key may become empty due to lazy expiry in addHashFieldToReply() + * or the new expiration time is in the past.*/ + newlen = hashTypeLength(o, 0); + + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldlen, newlen); + if (newlen == 0) { + dbDelete(c->db, c->argv[1]); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + } +} + +void hdelCommand(client *c) { + kvobj *o; + int j, deleted = 0, keyremoved = 0; + size_t oldsize = 0; + + if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,OBJ_HASH)) return; + + int64_t oldLen = (int64_t) hashTypeLength(o, 0); + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + + /* Hash field expiration is optimized to avoid frequent update global HFE DS for + * each field deletion. Eventually active-expiration will run and update or remove + * the hash from global HFE DS gracefully. Nevertheless, statistic "subexpiry" + * might reflect wrong number of hashes with HFE to the user if it is the last + * field with expiration. The following logic checks if this is indeed the last + * field with expiration and removes it from global HFE DS. */ + int isHFE = hashTypeIsFieldsWithExpire(o); + + for (j = 2; j < c->argc; j++) { + if (hashTypeDelete(o,c->argv[j]->ptr)) { + deleted++; + if (hashTypeLength(o, 0) == 0) { + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + /* del key but don't update KEYSIZES. Else it will decr wrong bin in histogram */ + dbDeleteSkipKeysizesUpdate(c->db, c->argv[1]); + keyremoved = 1; + break; + } + } + } + if (server.memory_tracking_per_slot && !keyremoved) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + if (deleted) { + int64_t newLen = -1; /* The value -1 indicates that the key is deleted. */ + keyModified(c, c->db, c->argv[1], keyremoved ? NULL : o, 1); + notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id); + if (keyremoved) { + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + } else { + if (isHFE && (hashTypeIsFieldsWithExpire(o) == 0)) /* is it last HFE */ + estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o); + newLen = oldLen - deleted; + } + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldLen, newLen); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); +} + +void hlenCommand(client *c) { + kvobj *o; + + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,OBJ_HASH)) return; + + addReplyLongLong(c,hashTypeLength(o, 0)); +} + +void hstrlenCommand(client *c) { + kvobj *o; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; + + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,OBJ_HASH)) return; + + GetFieldRes res = hashTypeGetValue(c->db, o, c->argv[2]->ptr, &vstr, + &vlen, &vll, HFE_LAZY_EXPIRE, NULL); + + if (res == GETF_NOT_FOUND || res == GETF_EXPIRED || res == GETF_EXPIRED_HASH) { + addReply(c, shared.czero); + return; + } + + size_t len = vstr ? vlen : sdigits10(vll); + addReplyLongLong(c,len); +} + +static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) { + if (hi->encoding == OBJ_ENCODING_LISTPACK || + hi->encoding == OBJ_ENCODING_LISTPACK_EX) + { + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; + + hashTypeCurrentFromListpack(hi, what, &vstr, &vlen, &vll, NULL); + if (vstr) + addReplyBulkCBuffer(c, vstr, vlen); + else + addReplyBulkLongLong(c, vll); + } else if (hi->encoding == OBJ_ENCODING_HT) { + char *value; + size_t len; + hashTypeCurrentFromHashTable(hi, what, &value, &len, NULL); + addReplyBulkCBuffer(c, value, len); + } else { + serverPanic("Unknown hash encoding"); + } +} + +void genericHgetallCommand(client *c, int flags) { + kvobj *o; + hashTypeIterator hi; + int length, count = 0; + size_t oldsize = 0; + + robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ? + shared.emptymap[c->resp] : shared.emptyarray; + if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyResp)) + == NULL || checkType(c,o,OBJ_HASH)) return; + + /* We return a map if the user requested keys and values, like in the + * HGETALL case. Otherwise to use a flat array makes more sense. */ + if ((length = hashTypeLength(o, 1 /*subtractExpiredFields*/)) == 0) { + addReply(c, emptyResp); + return; + } + + if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) { + addReplyMapLen(c, length); + } else { + addReplyArrayLen(c, length); + } + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + hashTypeInitIterator(&hi, o); + + while (hashTypeNext(&hi, 1 /*skipExpiredFields*/) != C_ERR) { + if (flags & OBJ_HASH_KEY) { + addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY); + count++; + } + if (flags & OBJ_HASH_VALUE) { + addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE); + count++; + } + } + + hashTypeResetIterator(&hi); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); + + /* Make sure we returned the right number of elements. */ + if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) count /= 2; + serverAssert(count == length); +} + +void hkeysCommand(client *c) { + genericHgetallCommand(c,OBJ_HASH_KEY); +} + +void hvalsCommand(client *c) { + genericHgetallCommand(c,OBJ_HASH_VALUE); +} + +void hgetallCommand(client *c) { + genericHgetallCommand(c,OBJ_HASH_KEY|OBJ_HASH_VALUE); +} + +void hexistsCommand(client *c) { + kvobj *o; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,OBJ_HASH)) return; + + addReply(c,hashTypeExists(c->db,o,c->argv[2]->ptr,HFE_LAZY_EXPIRE, NULL) ? + shared.cone : shared.czero); +} + +void hscanCommand(client *c) { + kvobj *o; + unsigned long long cursor; + size_t oldsize = 0; + + if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || + checkType(c,o,OBJ_HASH)) return; + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(o); + scanGenericCommand(c,o,cursor); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); +} + +static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) { + for (unsigned long i = 0; i < count; i++) { + if (vals && c->resp > 2) + addReplyArrayLen(c,2); + if (keys[i].sval) + addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen); + else + addReplyBulkLongLong(c, keys[i].lval); + if (vals) { + if (vals[i].sval) + addReplyBulkCBuffer(c, vals[i].sval, vals[i].slen); + else + addReplyBulkLongLong(c, vals[i].lval); + } + } +} + +/* How many times bigger should be the hash compared to the requested size + * for us to not use the "remove elements" strategy? Read later in the + * implementation for more info. */ +#define HRANDFIELD_SUB_STRATEGY_MUL 3 + +/* If client is trying to ask for a very large number of random elements, + * queuing may consume an unlimited amount of memory, so we want to limit + * the number of randoms per time. */ +#define HRANDFIELD_RANDOM_SAMPLE_LIMIT 1000 + +void hrandfieldWithCountCommand(client *c, long l, int withvalues) { + unsigned long count, size; + int uniq = 1; + kvobj *hash; + size_t oldsize = 0; + + if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) + == NULL || checkType(c,hash,OBJ_HASH)) return; + + if(l >= 0) { + count = (unsigned long) l; + } else { + count = -l; + uniq = 0; + } + + /* Delete all expired fields. If the entire hash got deleted then return empty array. */ + if (hashTypeExpireIfNeeded(c->db, hash)) { + addReply(c, shared.emptyarray); + return; + } + + /* Delete expired fields */ + size = hashTypeLength(hash, 0); + + /* If count is zero, serve it ASAP to avoid special cases later. */ + if (count == 0) { + addReply(c,shared.emptyarray); + return; + } + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(hash); + + /* CASE 1: The count was negative, so the extraction method is just: + * "return N random elements" sampling the whole set every time. + * This case is trivial and can be served without auxiliary data + * structures. This case is the only one that also needs to return the + * elements in random order. */ + if (!uniq || count == 1) { + if (withvalues && c->resp == 2) + addReplyArrayLen(c, count*2); + else + addReplyArrayLen(c, count); + if (hash->encoding == OBJ_ENCODING_HT) { + while (count--) { + dictEntry *de = dictGetFairRandomKey(hash->ptr); + Entry *entry = dictGetKey(de); + sds fieldStr = entryGetField(entry); + if (withvalues && c->resp > 2) + addReplyArrayLen(c,2); + addReplyBulkCBuffer(c, fieldStr, sdslen(fieldStr)); + if (withvalues) { + sds value = entryGetValue(entry); + addReplyBulkCBuffer(c, value, sdslen(value)); + } + if (c->flags & CLIENT_CLOSE_ASAP) + break; + } + } else if (hash->encoding == OBJ_ENCODING_LISTPACK || + hash->encoding == OBJ_ENCODING_LISTPACK_EX) + { + listpackEntry *keys, *vals = NULL; + unsigned long limit, sample_count; + unsigned char *lp = hashTypeListpackGetLp(hash); + int tuple_len = hash->encoding == OBJ_ENCODING_LISTPACK ? 2 : 3; + + limit = count > HRANDFIELD_RANDOM_SAMPLE_LIMIT ? HRANDFIELD_RANDOM_SAMPLE_LIMIT : count; + keys = zmalloc(sizeof(listpackEntry)*limit); + if (withvalues) + vals = zmalloc(sizeof(listpackEntry)*limit); + while (count) { + sample_count = count > limit ? limit : count; + count -= sample_count; + lpRandomPairs(lp, sample_count, keys, vals, tuple_len); + hrandfieldReplyWithListpack(c, sample_count, keys, vals); + if (c->flags & CLIENT_CLOSE_ASAP) + break; + } + zfree(keys); + zfree(vals); + } + goto out; + } + + /* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */ + long reply_size = count < size ? count : size; + if (withvalues && c->resp == 2) + addReplyArrayLen(c, reply_size*2); + else + addReplyArrayLen(c, reply_size); + + /* CASE 2: + * The number of requested elements is greater than the number of + * elements inside the hash: simply return the whole hash. */ + if(count >= size) { + hashTypeIterator hi; + hashTypeInitIterator(&hi, hash); + while (hashTypeNext(&hi, 0) != C_ERR) { + if (withvalues && c->resp > 2) + addReplyArrayLen(c,2); + addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY); + if (withvalues) + addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE); + } + hashTypeResetIterator(&hi); + goto out; + } + + /* CASE 2.5 listpack only. Sampling unique elements, in non-random order. + * Listpack encoded hashes are meant to be relatively small, so + * HRANDFIELD_SUB_STRATEGY_MUL isn't necessary and we rather not make + * copies of the entries. Instead, we emit them directly to the output + * buffer. + * + * And it is inefficient to repeatedly pick one random element from a + * listpack in CASE 4. So we use this instead. */ + if (hash->encoding == OBJ_ENCODING_LISTPACK || + hash->encoding == OBJ_ENCODING_LISTPACK_EX) + { + unsigned char *lp = hashTypeListpackGetLp(hash); + int tuple_len = hash->encoding == OBJ_ENCODING_LISTPACK ? 2 : 3; + listpackEntry *keys, *vals = NULL; + keys = zmalloc(sizeof(listpackEntry)*count); + if (withvalues) + vals = zmalloc(sizeof(listpackEntry)*count); + serverAssert(lpRandomPairsUnique(lp, count, keys, vals, tuple_len) == count); + hrandfieldReplyWithListpack(c, count, keys, vals); + zfree(keys); + zfree(vals); + goto out; + } + + /* CASE 3: + * The number of elements inside the hash of type dict is not greater than + * HRANDFIELD_SUB_STRATEGY_MUL times the number of requested elements. + * In this case we create an array of dictEntry pointers from the original hash, + * and subtract random elements to reach the requested number of elements. + * + * This is done because if the number of requested elements is just + * a bit less than the number of elements in the hash, the natural approach + * used into CASE 4 is highly inefficient. */ + if (count*HRANDFIELD_SUB_STRATEGY_MUL > size) { + /* Hashtable encoding (generic implementation) */ + dict *ht = hash->ptr; + dictIterator di; + dictEntry *de; + unsigned long idx = 0; + + /* Allocate a temporary array of pointers to stored key-values in dict and + * assist it to remove random elements to reach the right count. */ + struct FieldValPair { + sds field; + sds value; + } *pairs = zmalloc(sizeof(struct FieldValPair) * size); + + /* Add all the elements into the temporary array. */ + dictInitIterator(&di, ht); + while((de = dictNext(&di)) != NULL) { + Entry *e = dictGetKey(de); + pairs[idx++] = (struct FieldValPair) {entryGetField(e), entryGetValue(e)}; + } + dictResetIterator(&di); + + /* Remove random elements to reach the right count. */ + while (size > count) { + unsigned long toDiscardIdx = rand() % size; + pairs[toDiscardIdx] = pairs[--size]; + } + + /* Reply with what's in the array */ + for (idx = 0; idx < size; idx++) { + if (withvalues && c->resp > 2) + addReplyArrayLen(c,2); + addReplyBulkCBuffer(c, pairs[idx].field, sdslen(pairs[idx].field)); + if (withvalues) + addReplyBulkCBuffer(c, pairs[idx].value, sdslen(pairs[idx].value)); + } + + zfree(pairs); + } + + /* CASE 4: We have a big hash compared to the requested number of elements. + * In this case we can simply get random elements from the hash and add + * to the temporary hash, trying to eventually get enough unique elements + * to reach the specified count. */ + else { + /* Allocate temporary dictUnique to find unique elements. Just keep ref + * to key-value from the original hash. This dict relaxes hash function + * to be based on field's pointer */ + dictType uniqueDictType = { .hashFunction = dictPtrHash }; + dict *dictUnique = dictCreate(&uniqueDictType); + dictExpand(dictUnique, count); + + /* Hashtable encoding (generic implementation) */ + unsigned long added = 0; + + while(added < count) { + dictEntry *de = dictGetFairRandomKey(hash->ptr); + serverAssert(de != NULL); + Entry *e = dictGetKey(de); + sds field = entryGetField(e); + sds value = entryGetValue(e); + + /* Try to add the object to the dictionary. If it already exists + * free it, otherwise increment the number of objects we have + * in the result dictionary. */ + if (dictAdd(dictUnique, field, value) != DICT_OK) + continue; + + added++; + + /* We can reply right away, so that we don't need to store the value in the dict. */ + if (withvalues && c->resp > 2) + addReplyArrayLen(c,2); + + addReplyBulkCBuffer(c, field, sdslen(field)); + if (withvalues) + addReplyBulkCBuffer(c, value, sdslen(value)); + } + + /* Release memory */ + dictRelease(dictUnique); + } +out: + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(hash)); +} + +/* + * HRANDFIELD - Return a random field from the hash value stored at key. + * CLI usage: HRANDFIELD key [<count> [WITHVALUES]] + * + * Considerations for the current imp of HRANDFIELD & HFE feature: + * HRANDFIELD might access any of the fields in the hash as some of them might + * be expired. And so the Implementation of HRANDFIELD along with HFEs + * might be one of the two options: + * 1. Expire hash-fields before diving into handling HRANDFIELD. + * 2. Refine HRANDFIELD cases to deal with expired fields. + * + * Regarding the first option, as reference, the command RANDOMKEY also declares + * on O(1) complexity, yet might be stuck on a very long (but not infinite) loop + * trying to find non-expired keys. Furthermore RANDOMKEY also evicts expired keys + * along the way even though it is categorized as a read-only command. Note that + * the case of HRANDFIELD is more lightweight versus RANDOMKEY since HFEs have + * much more effective and aggressive active-expiration for fields behind. + * + * The second option introduces additional implementation complexity to HRANDFIELD. + * We could further refine HRANDFIELD cases to differentiate between scenarios + * with many expired fields versus few expired fields, and adjust based on the + * percentage of expired fields. However, this approach could still lead to long + * loops or necessitate expiring fields before selecting them. For the “lightweight” + * cases it is also expected to have a lightweight expiration. + * + * Considering the pros and cons, and the fact that HRANDFIELD is an infrequent + * command (particularly with HFEs) and the fact we have effective active-expiration + * behind for hash-fields, it is better to keep it simple and choose the option #1. + */ +void hrandfieldCommand(client *c) { + long l; + int withvalues = 0; + kvobj *hash; + CommonEntry ele; + size_t oldsize = 0; + + if (c->argc >= 3) { + if (getRangeLongFromObjectOrReply(c,c->argv[2],-LONG_MAX,LONG_MAX,&l,NULL) != C_OK) return; + if (c->argc > 4 || (c->argc == 4 && strcasecmp(c->argv[3]->ptr,"withvalues"))) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } else if (c->argc == 4) { + withvalues = 1; + if (l < -LONG_MAX/2 || l > LONG_MAX/2) { + addReplyError(c,"value is out of range"); + return; + } + } + hrandfieldWithCountCommand(c, l, withvalues); + return; + } + + /* Handle variant without <count> argument. Reply with simple bulk string */ + if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))== NULL || + checkType(c,hash,OBJ_HASH)) { + return; + } + + /* Delete all expired fields. If the entire hash got deleted then return null. */ + if (hashTypeExpireIfNeeded(c->db, hash)) { + addReply(c,shared.null[c->resp]); + return; + } + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(hash); + hashTypeRandomElement(hash,hashTypeLength(hash, 0),&ele,NULL); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(hash)); + + if (ele.sval) + addReplyBulkCBuffer(c, ele.sval, ele.slen); + else + addReplyBulkLongLong(c, ele.lval); +} + +/*----------------------------------------------------------------------------- + * Hash Field with optional expiry (based on entry) + *----------------------------------------------------------------------------*/ + +static ExpireMeta* hentryGetExpireMeta(const eItem e) { + /* extract the expireMeta from the field (entry) */ + return entryRefExpiryMeta((Entry *)e); +} + +/* Remove TTL from the field. Assumed ExpireMeta is attached and has valid value */ +static void hfieldPersist(robj *hashObj, Entry *entry) { + uint64_t fieldExpireTime = entryGetExpiry(entry); + if (fieldExpireTime == EB_EXPIRE_TIME_INVALID) + return; + + /* if field is set with expire, then dict must has HFE metadata attached */ + dict *d = hashObj->ptr; + htMetadataEx *dictExpireMeta = htGetMetadataEx(d); + + /* If field has valid expiry then dict must have valid metadata as well */ + serverAssert(dictExpireMeta->expireMeta.trash == 0); + + /* Remove field from private HFE DS */ + ebRemove(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, entry); + + /* Don't have to update global HFE DS. It's unnecessary. Implementing this + * would introduce significant complexity and overhead for an operation that + * isn't critical. In the worst case scenario, the hash will be efficiently + * updated later by an active-expire operation, or it will be removed by the + * hash's dbGenericDelete() function. */ +} + +/*----------------------------------------------------------------------------- + * Hash Field Expiration (HFE) + *----------------------------------------------------------------------------*/ +/* Can be called either by active-expire cron job or query from the client */ +static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen) { + robj *argv[] = { + shared.hdel, + createStringObject((char*) key, sdslen(key)), + createStringObject(field, fieldLen) + }; + + enterExecutionUnit(1, 0); + int prev_replication_allowed = server.replication_allowed; + server.replication_allowed = 1; + alsoPropagate(db->id,argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); + server.replication_allowed = prev_replication_allowed; + exitExecutionUnit(); + + /* Propagate the HDEL command */ + postExecutionUnitOperations(); + + decrRefCount(argv[1]); + decrRefCount(argv[2]); +} + +/* Called during active expiration of hash-fields. Propagate to replica & Delete. */ +static ExpireAction onFieldExpire(eItem item, void *ctx) { + OnFieldExpireCtx *expCtx = ctx; + Entry *e = item; + kvobj *kv = expCtx->hashObj; + size_t oldsize = 0; + sds key = kvobjGetKey(kv); + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(kv); + sds field = entryGetField(e); + propagateHashFieldDeletion(expCtx->db, key, field, sdslen(field)); + + /* update keysizes */ + unsigned long l = hashTypeLength(expCtx->hashObj, 0); + updateKeysizesHist(expCtx->db, getKeySlot(key), OBJ_HASH, l, l - 1); + + serverAssert(hashTypeDelete(expCtx->hashObj, field) == 1); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(expCtx->db, getKeySlot(key), oldsize, hashTypeAllocSize(kv)); + server.stat_expired_subkeys++; + return ACT_REMOVE_EXP_ITEM; +} + +/* Retrieve the ExpireMeta associated with the hash. + * The caller is responsible for ensuring that it is indeed attached. */ +ExpireMeta *hashGetExpireMeta(const eItem hash) { + robj *hashObj = (robj *)hash; + if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = hashObj->ptr; + return &lpt->meta; + } else if (hashObj->encoding == OBJ_ENCODING_HT) { + dict *d = hashObj->ptr; + htMetadataEx *dictExpireMeta = htGetMetadataEx(d); + return &dictExpireMeta->expireMeta; + } else { + serverPanic("Unknown encoding: %d", hashObj->encoding); + } +} + +/* Generic structure to hold parsed arguments for all hash field commands */ +typedef struct { + /* FIELDS arguments */ + int fieldsPos; /* Position of FIELDS keyword (-1 if not found) */ + int numFieldsPos; /* Position of numfields argument */ + int firstFieldPos; /* Position of first field */ + int fieldCount; /* Number of fields */ + + /* HEXPIRE family arguments */ + int expireTimePos; /* Position of expire time argument */ + long long expireTime; /* Parsed expire time */ + int expireCondition; /* HFE_NX, HFE_XX, HFE_GT, HFE_LT */ +} HashCommandArgs; + +/* Parser for HEXPIRE family commands with flexible keyword ordering. + * Returns C_OK on success, C_ERR on error (with reply sent). */ +static int parseHashCommandArgs(client *c, HashCommandArgs *args, + long long basetime, int unit) +{ + memset(args, 0, sizeof(*args)); + args->fieldsPos = -1; + args->expireTimePos = 2; + + if (parseExpireTime(c, c->argv[2], unit, basetime, &args->expireTime) != C_OK) { + return C_ERR; + } + + /* Parse remaining arguments starting from position 3 */ + for (int i = 3; i < c->argc; i++) { + char *arg = c->argv[i]->ptr; + + /* FIELDS keyword - supported by ALL hash field commands */ + if (!strcasecmp(arg, "FIELDS")) { + if (args->fieldsPos != -1) { + addReplyError(c, "FIELDS keyword specified multiple times"); + return C_ERR; + } + + if (i >= c->argc - 2) { + addReplyError(c, "FIELDS requires at least numfields and one field argument"); + return C_ERR; + } + + args->fieldsPos = i; + args->numFieldsPos = i + 1; + long numFields; + if (getRangeLongFromObjectOrReply(c, c->argv[args->numFieldsPos], 1, INT_MAX, + &numFields, "Parameter `numFields` should be greater than 0") != C_OK) + return C_ERR; + + args->fieldCount = (int)numFields; + args->firstFieldPos = i + 2; + + /* Check bounds - we must have exactly the right number of fields */ + if (args->firstFieldPos + args->fieldCount > c->argc) { + addReplyError(c, "wrong number of arguments"); + return C_ERR; + } + + /* Skip over the field arguments */ + i = args->firstFieldPos + args->fieldCount - 1; + continue; + } + + /* Expiration condition keywords - validation moved outside loop for performance */ + if (!strcasecmp(arg, "NX")) { + args->expireCondition |= HFE_NX; + continue; + } else if (!strcasecmp(arg, "XX")) { + args->expireCondition |= HFE_XX; + continue; + } else if (!strcasecmp(arg, "GT")) { + args->expireCondition |= HFE_GT; + continue; + } else if (!strcasecmp(arg, "LT")) { + args->expireCondition |= HFE_LT; + continue; + } + + addReplyErrorFormat(c, "unknown argument: %s", (char*) c->argv[i]->ptr); + return C_ERR; + } + + if (__builtin_popcount(args->expireCondition & (HFE_NX|HFE_XX|HFE_GT|HFE_LT)) > 1) { + addReplyError(c, "Multiple condition flags specified"); + return C_ERR; + } + + return C_OK; +} + +/* HTTL key <FIELDS count field [field ...]> */ +static void httlGenericCommand(client *c, const char *cmd, long long basetime, int unit){ + UNUSED(cmd); + kvobj *hashObj; + long numFields = 0, numFieldsAt = 3; + + /* Read the hash object */ + hashObj = lookupKeyRead(c->db, c->argv[1]); + if (checkType(c, hashObj, OBJ_HASH)) + return; + + if (strcasecmp(c->argv[numFieldsAt-1]->ptr, "FIELDS")) { + addReplyError(c, "Mandatory argument FIELDS is missing or not at the right position"); + return; + } + + /* Read number of fields */ + if (getRangeLongFromObjectOrReply(c, c->argv[numFieldsAt], 1, LONG_MAX, + &numFields, "Number of fields must be a positive integer") != C_OK) + return; + + /* Verify `numFields` is consistent with number of arguments */ + if (numFields != (c->argc - numFieldsAt - 1)) { + addReplyError(c, "The `numfields` parameter must match the number of arguments"); + return; + } + + /* Non-existing keys and empty hashes are the same thing. It also means + * fields in the command don't exist in the hash key. */ + if (!hashObj) { + addReplyArrayLen(c, numFields); + for (int i = 0; i < numFields; i++) { + addReplyLongLong(c, HFE_GET_NO_FIELD); + } + return; + } + + if (hashObj->encoding == OBJ_ENCODING_LISTPACK) { + void *lp = hashObj->ptr; + + addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { + sds field = c->argv[numFieldsAt+1+i]->ptr; + void *fptr = lpFirst(lp); + if (fptr != NULL) + fptr = lpFind(lp, fptr, (unsigned char *) field, sdslen(field), 1); + + if (!fptr) + addReplyLongLong(c, HFE_GET_NO_FIELD); + else + addReplyLongLong(c, HFE_GET_NO_TTL); + } + return; + } else if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = hashObj->ptr; + + addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { + long long expire; + sds field = c->argv[numFieldsAt+1+i]->ptr; + void *fptr = lpFirst(lpt->lp); + if (fptr != NULL) + fptr = lpFind(lpt->lp, fptr, (unsigned char *) field, sdslen(field), 2); + + if (!fptr) { + addReplyLongLong(c, HFE_GET_NO_FIELD); + continue; + } + + fptr = lpNext(lpt->lp, fptr); + serverAssert(fptr); + fptr = lpNext(lpt->lp, fptr); + serverAssert(fptr && lpGetIntegerValue(fptr, &expire)); + + if (expire == HASH_LP_NO_TTL) { + addReplyLongLong(c, HFE_GET_NO_TTL); + continue; + } + + if (expire <= commandTimeSnapshot()) { + addReplyLongLong(c, HFE_GET_NO_FIELD); + continue; + } + + if (unit == UNIT_SECONDS) + addReplyLongLong(c, (expire + 999 - basetime) / 1000); + else + addReplyLongLong(c, (expire - basetime)); + } + return; + } else if (hashObj->encoding == OBJ_ENCODING_HT) { + dict *d = hashObj->ptr; + size_t oldsize = dictMemUsage(d); + + addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { + sds field = c->argv[numFieldsAt+1+i]->ptr; + dictEntry *de = dictFind(d, field); + if (de == NULL) { + addReplyLongLong(c, HFE_GET_NO_FIELD); + continue; + } + + Entry *entry = dictGetKey(de); + uint64_t expire = entryGetExpiry(entry); + if (expire == EB_EXPIRE_TIME_INVALID) { + addReplyLongLong(c, HFE_GET_NO_TTL); /* no ttl */ + continue; + } + + if ( (long long) expire < commandTimeSnapshot()) { + addReplyLongLong(c, HFE_GET_NO_FIELD); + continue; + } + + if (unit == UNIT_SECONDS) + addReplyLongLong(c, (expire + 999 - basetime) / 1000); + else + addReplyLongLong(c, (expire - basetime)); + } + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, dictMemUsage(d)); + return; + } else { + serverPanic("Unknown encoding: %d", hashObj->encoding); + } +} + +/* This is the generic command implementation for HEXPIRE, HPEXPIRE, HEXPIREAT + * and HPEXPIREAT. Because the command second argument may be relative or absolute + * the "basetime" argument is used to signal what the base time is (either 0 + * for *AT variants of the command, or the current time for relative expires). + * + * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for + * the argv[2] parameter. The basetime is always specified in milliseconds. + * + * PROPAGATE TO REPLICA: + * The command will be translated into HPEXPIREAT and the expiration time will be + * converted to absolute time in milliseconds. + * + * As we need to propagate H(P)EXPIRE(AT) command to the replica, each field that + * is mentioned in the command should be categorized into one of the four options: + * 1. Field’s expiration time updated successfully - Propagate it to replica as + * part of the HPEXPIREAT command. + * 2. The field got deleted since the time is in the past - propagate also HDEL + * command to delete the field. Also remove the field from the propagated + * HPEXPIREAT command. + * 3. Condition not met for the field - Remove the field from the propagated + * HPEXPIREAT command. + * 4. Field doesn't exists - Remove the field from propagated HPEXPIREAT command. + * + * If none of the provided fields match option #1, that is provided time of the + * command is in the past, then avoid propagating the HPEXPIREAT command to the + * replica. + * + * This approach is aligned with existing EXPIRE command. If a given key has already + * expired, then DEL will be propagated instead of EXPIRE command. If condition + * not met, then command will be rejected. Otherwise, EXPIRE command will be + * propagated for given key. + */ +static void hexpireGenericCommand(client *c, long long basetime, int unit) { + HashCommandArgs args; + int fieldsNotSet = 0, updated = 0, deleted = 0; + int64_t oldlen, newlen; + robj *keyArg = c->argv[1]; + size_t oldsize = 0; + + /* Read the hash object */ + kvobj *hashObj = lookupKeyWrite(c->db, keyArg); + if (checkType(c, hashObj, OBJ_HASH)) + return; + + /* Parse arguments using flexible keyword-based parsing */ + if (parseHashCommandArgs(c, &args, basetime, unit) != C_OK) + return; + + /* Non-existing keys and empty hashes are the same thing. It also means + * fields in the command don't exist in the hash key. */ + if (!hashObj) { + addReplyArrayLen(c, args.fieldCount); + for (int i = 0; i < args.fieldCount; i++) { + addReplyLongLong(c, HSETEX_NO_FIELD); + } + return; + } + + oldlen = hashTypeLength(hashObj, 0); + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(hashObj); + + HashTypeSetEx exCtx; + hashTypeSetExInit(keyArg, hashObj, c, c->db, args.expireCondition, &exCtx); + addReplyArrayLen(c, args.fieldCount); + + /* Lazy allocation of fieldsToRemove - only allocate when failures occur */ + int *fieldsToRemove = NULL; + int removeCount = 0; + + for (int i = 0; i < args.fieldCount; i++) { + int fieldPos = args.firstFieldPos + i; + sds field = c->argv[fieldPos]->ptr; + SetExRes res = hashTypeSetEx(hashObj, field, args.expireTime, &exCtx); + updated += (res == HSETEX_OK); + deleted += (res == HSETEX_DELETED); + + if (unlikely(res != HSETEX_OK)) { + if (fieldsToRemove == NULL) { + fieldsToRemove = zmalloc(sizeof(int) * (args.fieldCount > 0 ? args.fieldCount : 1)); + } + /* Remember this field position for later removal from propagation */ + fieldsToRemove[removeCount++] = fieldPos; + fieldsNotSet = 1; + } + + addReplyLongLong(c, res); + } + + hashTypeSetExDone(&exCtx); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(keyArg->ptr), oldsize, hashTypeAllocSize(hashObj)); + + if (deleted + updated > 0) { + server.dirty += deleted + updated; + keyModified(c, c->db, keyArg, hashObj, 1); + notifyKeyspaceEvent(NOTIFY_HASH, deleted ? "hdel" : "hexpire", + keyArg, c->db->id); + } + + newlen = (int64_t) hashTypeLength(hashObj, 0); + if (newlen == 0) { + newlen = -1; + /* Del key but don't update KEYSIZES. Else it will decr wrong bin in histogram */ + dbDeleteSkipKeysizesUpdate(c->db, keyArg); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyArg, c->db->id); + } + + if (oldlen != newlen) + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, + oldlen, newlen); + + /* Avoid propagating command if not even one field was updated (Either because + * the time is in the past, and corresponding HDELs were sent, or conditions + * not met) then it is useless and invalid to propagate command with no fields */ + if (updated == 0) { + preventCommandPropagation(c); + zfree(fieldsToRemove); + return; + } + + /* Handle propagation using command rewriting + * Rewrite to canonical HPEXPIREAT command */ + if (c->cmd->proc != hpexpireatCommand) { + rewriteClientCommandArgument(c, 0, shared.hpexpireat); + + robj *expireTimeObj = createStringObjectFromLongLong(args.expireTime); + rewriteClientCommandArgument(c, args.expireTimePos, expireTimeObj); + decrRefCount(expireTimeObj); + } + + /* For partial failures, remove failed fields from the original command */ + if (fieldsNotSet) { + for (int i = removeCount - 1; i >= 0; i--) { + rewriteClientCommandArgument(c, fieldsToRemove[i], NULL); + } + robj *newFieldCount = createStringObjectFromLongLong(updated); + rewriteClientCommandArgument(c, args.fieldsPos + 1, newFieldCount); + decrRefCount(newFieldCount); + } + + if (fieldsToRemove) + zfree(fieldsToRemove); +} + +/* HPEXPIRE key milliseconds [ NX | XX | GT | LT] FIELDS numfields <field [field ...]> */ +void hpexpireCommand(client *c) { + hexpireGenericCommand(c,commandTimeSnapshot(),UNIT_MILLISECONDS); +} + +/* HEXPIRE key seconds [NX | XX | GT | LT] FIELDS numfields <field [field ...]> */ +void hexpireCommand(client *c) { + hexpireGenericCommand(c,commandTimeSnapshot(),UNIT_SECONDS); +} + +/* HEXPIREAT key unix-time-seconds [NX | XX | GT | LT] FIELDS numfields <field [field ...]> */ +void hexpireatCommand(client *c) { + hexpireGenericCommand(c,0,UNIT_SECONDS); +} + +/* HPEXPIREAT key unix-time-milliseconds [NX | XX | GT | LT] FIELDS numfields <field [field ...]> */ +void hpexpireatCommand(client *c) { + hexpireGenericCommand(c,0,UNIT_MILLISECONDS); +} + +/* for each specified field: get the remaining time to live in seconds*/ +/* HTTL key FIELDS numfields <field [field ...]> */ +void httlCommand(client *c) { + httlGenericCommand(c, "httl", commandTimeSnapshot(), UNIT_SECONDS); +} + +/* HPTTL key FIELDS numfields <field [field ...]> */ +void hpttlCommand(client *c) { + httlGenericCommand(c, "hpttl", commandTimeSnapshot(), UNIT_MILLISECONDS); +} + +/* HEXPIRETIME key FIELDS numfields <field [field ...]> */ +void hexpiretimeCommand(client *c) { + httlGenericCommand(c, "hexpiretime", 0, UNIT_SECONDS); +} + +/* HPEXPIRETIME key FIELDS numfields <field [field ...]> */ +void hpexpiretimeCommand(client *c) { + httlGenericCommand(c, "hexpiretime", 0, UNIT_MILLISECONDS); +} + +/* HPERSIST key FIELDS numfields <field [field ...]> */ +void hpersistCommand(client *c) { + long numFields = 0, numFieldsAt = 3; + int changed = 0; /* Used to determine whether to send a notification. */ + + /* Read the hash object */ + kvobj *hashObj = lookupKeyWrite(c->db, c->argv[1]); + if (checkType(c, hashObj, OBJ_HASH)) + return; + + if (strcasecmp(c->argv[numFieldsAt-1]->ptr, "FIELDS")) { + addReplyError(c, "Mandatory argument FIELDS is missing or not at the right position"); + return; + } + + /* Read number of fields */ + if (getRangeLongFromObjectOrReply(c, c->argv[numFieldsAt], 1, LONG_MAX, + &numFields, "Number of fields must be a positive integer") != C_OK) + return; + + /* Verify `numFields` is consistent with number of arguments */ + if (numFields != (c->argc - numFieldsAt - 1)) { + addReplyError(c, "The `numfields` parameter must match the number of arguments"); + return; + } + + /* Non-existing keys and empty hashes are the same thing. It also means + * fields in the command don't exist in the hash key. */ + if (!hashObj) { + addReplyArrayLen(c, numFields); + for (int i = 0; i < numFields; i++) { + addReplyLongLong(c, HFE_PERSIST_NO_FIELD); + } + return; + } + + if (hashObj->encoding == OBJ_ENCODING_LISTPACK) { + addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { + sds field = c->argv[numFieldsAt + 1 + i]->ptr; + unsigned char *fptr, *zl = hashObj->ptr; + + fptr = lpFirst(zl); + if (fptr != NULL) + fptr = lpFind(zl, fptr, (unsigned char *) field, sdslen(field), 1); + + if (!fptr) + addReplyLongLong(c, HFE_PERSIST_NO_FIELD); + else + addReplyLongLong(c, HFE_PERSIST_NO_TTL); + } + return; + } else if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { + long long prevExpire; + unsigned char *fptr, *vptr, *tptr; + listpackEx *lpt = hashObj->ptr; + size_t oldsize = 0; + + addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { + sds field = c->argv[numFieldsAt + 1 + i]->ptr; + + fptr = lpFirst(lpt->lp); + if (fptr != NULL) + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); + + if (!fptr) { + addReplyLongLong(c, HFE_PERSIST_NO_FIELD); + continue; + } + + vptr = lpNext(lpt->lp, fptr); + serverAssert(vptr); + tptr = lpNext(lpt->lp, vptr); + serverAssert(tptr && lpGetIntegerValue(tptr, &prevExpire)); + + if (prevExpire == HASH_LP_NO_TTL) { + addReplyLongLong(c, HFE_PERSIST_NO_TTL); + continue; + } + + if (prevExpire < commandTimeSnapshot()) { + addReplyLongLong(c, HFE_PERSIST_NO_FIELD); + continue; + } + + if (server.memory_tracking_per_slot) + oldsize = hashTypeAllocSize(hashObj); + listpackExUpdateExpiry(hashObj, field, fptr, vptr, HASH_LP_NO_TTL); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(hashObj)); + addReplyLongLong(c, HFE_PERSIST_OK); + changed = 1; + } + } else if (hashObj->encoding == OBJ_ENCODING_HT) { + dict *d = hashObj->ptr; + size_t oldsize = dictMemUsage(d); + + addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { + sds field = c->argv[numFieldsAt + 1 + i]->ptr; + dictEntry *de = dictFind(d, field); + if (de == NULL) { + addReplyLongLong(c, HFE_PERSIST_NO_FIELD); + continue; + } + + Entry *entry = dictGetKey(de); + uint64_t expire = entryGetExpiry(entry); + if (expire == EB_EXPIRE_TIME_INVALID) { + addReplyLongLong(c, HFE_PERSIST_NO_TTL); + continue; + } + + /* Already expired. Pretend there is no such field */ + if ( (long long) expire < commandTimeSnapshot()) { + addReplyLongLong(c, HFE_PERSIST_NO_FIELD); + continue; + } + + hfieldPersist(hashObj, entry); + addReplyLongLong(c, HFE_PERSIST_OK); + changed = 1; + } + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, dictMemUsage(d)); + } else { + serverPanic("Unknown encoding: %d", hashObj->encoding); + } + + /* Generates a hpersist event if the expiry time associated with any field + * has been successfully deleted. */ + if (changed) { + notifyKeyspaceEvent(NOTIFY_HASH, "hpersist", c->argv[1], c->db->id); + keyModified(c, c->db, c->argv[1], hashObj, 1); + server.dirty++; + } +} |
