/* * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * * Licensed under your choice of (a) the Redis Source Available License 2.0 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * GNU Affero General Public License v3 (AGPLv3). */ #include "server.h" #include "redisassert.h" #include "ebuckets.h" #include "entry.h" #include "cluster_asm.h" #include /* Threshold for HEXPIRE and HPERSIST to be considered whether it is worth to * update the expiration time of the hash object in global HFE DS. */ #define HASH_NEW_EXPIRE_DIFF_THRESHOLD max(4000, 1<> 2) typedef enum GetFieldRes { /* common (Used by hashTypeGet* value family) */ GETF_OK = 0, /* The field was found. */ GETF_NOT_FOUND, /* The field was not found. */ GETF_EXPIRED, /* Logically expired (Might be lazy deleted or not) */ GETF_EXPIRED_HASH, /* Delete hash since retrieved field was expired and * it was the last field in the hash. */ } GetFieldRes; typedef listpackEntry CommonEntry; /* extend usage beyond lp */ /* hash field expiration (HFE) funcs */ static ExpireAction onFieldExpire(eItem item, void *ctx); static ExpireMeta* hentryGetExpireMeta(const eItem field); static void hexpireGenericCommand(client *c, long long basetime, int unit); static void hfieldPersist(robj *hashObj, Entry *entry); static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen); /* hash dictType funcs */ static void dictEntryDestructor(dict *d, void *entry); static size_t hashDictMetadataBytes(dict *d); static size_t hashDictWithExpireMetadataBytes(dict *d); static void hashDictWithExpireOnRelease(dict *d); static kvobj* hashTypeLookupWriteOrCreate(client *c, robj *key); /*----------------------------------------------------------------------------- * Define dictType of hash * * - Stores fields as entry objects (field-value pairs) with optional expiration * - Note that small hashes are represented with listpacks * - Once expiration is set for a field, the dict instance and corresponding * dictType are replaced with a dict containing metadata for Hash Field * Expiration (HFE) and using dictType `entryHashDictTypeWithHFE` * - Dict uses no_value=1 since entry contains both field and value *----------------------------------------------------------------------------*/ dictType entryHashDictType = { dictSdsHash, /* lookup hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* lookup key compare */ dictEntryDestructor, /* key destructor */ NULL, /* val destructor (value is in entry) */ .dictMetadataBytes = hashDictMetadataBytes, .no_value = 1, /* entry contains both field and value */ .keys_are_odd = 1, /* entry pointers (SDS) are always odd */ }; /* Define alternative dictType of hash with hash-field expiration (HFE) support */ dictType entryHashDictTypeWithHFE = { dictSdsHash, /* lookup hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* lookup key compare */ dictEntryDestructor, /* key destructor */ NULL, /* val destructor (value is in entry) */ .dictMetadataBytes = hashDictWithExpireMetadataBytes, .onDictRelease = hashDictWithExpireOnRelease, .no_value = 1, /* entry contains both field and value */ .keys_are_odd = 1, /* entry pointers (SDS) are always odd */ }; /*----------------------------------------------------------------------------- * Hash Field Expiration (HFE) Feature * * Each hash instance maintains its own set of hash field expiration within its * private ebuckets DS. In order to support HFE active expire cycle across hash * instances, hashes with associated HFE will be also registered in a global * ebuckets DS with expiration time value that reflects their next minimum * time to expire (db->subexpires). The global HFE Active expiration will be * triggered from activeExpireCycle() function and in turn will invoke "local" * HFE Active sub-expiration for each hash instance that has expired fields. *----------------------------------------------------------------------------*/ EbucketsType subexpiresBucketsType = { .onDeleteItem = NULL, .getExpireMeta = hashGetExpireMeta, /* get ExpireMeta attached to each hash */ .itemsAddrAreOdd = 0, /* Addresses of dict are even */ }; /* htExpireMetadata - ebuckets-type for hash fields with time-Expiration. ebuckets * instance Will be attached to each hash that has at least one field with expiry * time. */ EbucketsType hashFieldExpireBucketsType = { .onDeleteItem = NULL, .getExpireMeta = hentryGetExpireMeta, /* get ExpireMeta attached to each field */ .itemsAddrAreOdd = 1, /* Addresses of hfield (entry/sds) are odd!! */ }; /* OnFieldExpireCtx passed to OnFieldExpire() */ typedef struct OnFieldExpireCtx { robj *hashObj; redisDb *db; } OnFieldExpireCtx; /* The implementation of hashes by dict was modified from storing fields as sds * strings to store "entry" objects (field-value pairs with optional expiration). * The entry structure unifies field and value into a single allocation, with * optional expiration metadata. This is simpler than the previous mstr approach * and provides better memory locality. */ /* Used by hpersistCommand() */ typedef enum SetPersistRes { HFE_PERSIST_NO_FIELD = -2, /* No such hash-field */ HFE_PERSIST_NO_TTL = -1, /* No TTL attached to the field */ HFE_PERSIST_OK = 1 } SetPersistRes; static inline int isDictWithMetaHFE(dict *d) { return d->type == &entryHashDictTypeWithHFE; } /*----------------------------------------------------------------------------- * setex* - Set field's expiration * * Setting expiration time to fields might be time-consuming and complex since * each update of expiration time, not only updates `ebuckets` of corresponding * hash, but also might update `ebuckets` of global HFE DS. It is required to opt * sequence of field updates with expirartion for a given hash, such that only * once done, the global HFE DS will get updated. * * To do so, follow the scheme: * 1. Call hashTypeSetExInit() to initialize the HashTypeSetEx struct. * 2. Call hashTypeSetEx() one time or more, for each field/expiration update. * 3. Call hashTypeSetExDone() for notification and update of global HFE. *----------------------------------------------------------------------------*/ /* Returned value of hashTypeSetEx() */ typedef enum SetExRes { HSETEX_OK = 1, /* Expiration time set/updated as expected */ HSETEX_NO_FIELD = -2, /* No such hash-field */ HSETEX_NO_CONDITION_MET = 0, /* Specified NX | XX | GT | LT condition not met */ HSETEX_DELETED = 2, /* Field deleted because the specified time is in the past */ } SetExRes; /* Used by httlGenericCommand() */ typedef enum GetExpireTimeRes { HFE_GET_NO_FIELD = -2, /* No such hash-field */ HFE_GET_NO_TTL = -1, /* No TTL attached to the field */ } GetExpireTimeRes; typedef enum ExpireSetCond { HFE_NX = 1<<0, HFE_XX = 1<<1, HFE_GT = 1<<2, HFE_LT = 1<<3 } ExpireSetCond; /* Used by hashTypeSetEx() for setting fields or their expiry */ typedef struct HashTypeSetEx { /*** config ***/ ExpireSetCond expireSetCond; /* [XX | NX | GT | LT] */ /*** metadata ***/ uint64_t minExpire; /* if uninit EB_EXPIRE_TIME_INVALID */ redisDb *db; robj *key, *hashObj; uint64_t minExpireFields; /* Trace updated fields and their previous/new * minimum expiration time. If minimum recorded * is above minExpire of the hash, then we don't * have to update global HFE DS */ /* Optionally provide client for notification */ client *c; const char *cmd; } HashTypeSetEx; int hashTypeSetExInit(robj *key, kvobj *kvo, client *c, redisDb *db, ExpireSetCond expireSetCond, HashTypeSetEx *ex); SetExRes hashTypeSetEx(robj *o, sds field, uint64_t expireAt, HashTypeSetEx *exInfo); void hashTypeSetExDone(HashTypeSetEx *e); /*----------------------------------------------------------------------------- * Accessor functions for dictType of hash *----------------------------------------------------------------------------*/ static void dictEntryDestructor(dict *d, void *entry) { size_t usable; size_t *alloc_size = htGetMetadataSize(d); /* If attached TTL to the field, then remove it from hash's private ebuckets. */ if (entryGetExpiry(entry) != EB_EXPIRE_TIME_INVALID) { htMetadataEx *dictExpireMeta = htGetMetadataEx(d); ebRemove(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, entry); } entryFree(entry, &usable); *alloc_size -= usable; /* Don't have to update global HFE DS. It's unnecessary. Implementing this * would introduce significant complexity and overhead for an operation that * isn't critical. In the worst case scenario, the hash will be efficiently * updated later by an active-expire operation, or it will be removed by the * hash's dbGenericDelete() function. */ } static size_t hashDictMetadataBytes(dict *d) { UNUSED(d); return sizeof(size_t); } static size_t hashDictWithExpireMetadataBytes(dict *d) { UNUSED(d); /* expireMeta of the hash, ref to ebuckets and pointer to hash's key */ return sizeof(htMetadataEx); } static void hashDictWithExpireOnRelease(dict *d) { /* for sure allocated with metadata. Otherwise, this func won't be registered */ htMetadataEx *dictExpireMeta = htGetMetadataEx(d); ebDestroy(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, NULL); } /*----------------------------------------------------------------------------- * listpackEx functions *----------------------------------------------------------------------------*/ /* * If any of hash field expiration command is called on a listpack hash object * for the first time, we convert it to OBJ_ENCODING_LISTPACK_EX encoding. * We allocate "struct listpackEx" which holds listpack pointer and expiry * metadata. In the listpack string, we append another TTL entry for each field * value pair. From now on, listpack will have triplets in it: field-value-ttl. * If TTL is not set for a field, we store 'zero' as the TTL value. 'zero' is * encoded as two bytes in the listpack. Memory overhead of a non-existing TTL * will be two bytes per field. * * Fields in the listpack will be ordered by TTL. Field with the smallest expiry * time will be the first item. Fields without TTL will be at the end of the * listpack. This way, it is easier/faster to find expired items. */ #define HASH_LP_NO_TTL 0 struct listpackEx *listpackExCreate(void) { listpackEx *lpt = zcalloc(sizeof(*lpt)); lpt->meta.trash = 1; lpt->lp = NULL; return lpt; } static void listpackExFree(listpackEx *lpt) { lpFree(lpt->lp); zfree(lpt); } struct lpFingArgs { uint64_t max_to_search; /* [in] Max number of tuples to search */ uint64_t expire_time; /* [in] Find the tuple that has a TTL larger than expire_time */ unsigned char *p; /* [out] First item of the tuple that has a TTL larger than expire_time */ int expired; /* [out] Number of tuples that have TTLs less than expire_time */ int index; /* Internally used */ unsigned char *fptr; /* Internally used, temp ptr */ }; /* Callback for lpFindCb(). Used to find number of expired fields as part of * active expiry or when trying to find the position for the new field according * to its expiry time.*/ static int cbFindInListpack(const unsigned char *lp, unsigned char *p, void *user, unsigned char *s, long long slen) { (void) lp; struct lpFingArgs *r = user; r->index++; if (r->max_to_search == 0) return 0; /* Break the loop and return */ if (r->index % 3 == 1) { r->fptr = p; /* First item of the tuple. */ } else if (r->index % 3 == 0) { serverAssert(!s); /* Third item of a tuple is expiry time */ if (slen == HASH_LP_NO_TTL || (uint64_t) slen >= r->expire_time) { r->p = r->fptr; return 0; /* Break the loop and return */ } r->expired++; r->max_to_search--; } return 1; } /* Returns number of expired fields. */ static uint64_t listpackExExpireDryRun(const robj *o) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); listpackEx *lpt = o->ptr; struct lpFingArgs r = { .max_to_search = UINT64_MAX, .expire_time = commandTimeSnapshot(), }; lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); return r.expired; } /* Returns the expiration time of the item with the nearest expiration. */ static uint64_t listpackExGetMinExpire(robj *o) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); long long expireAt; unsigned char *fptr; listpackEx *lpt = o->ptr; /* As fields are ordered by expire time, first field will have the smallest * expiry time. Third element is the expiry time of the first field */ fptr = lpSeek(lpt->lp, 2); if (fptr != NULL) { serverAssert(lpGetIntegerValue(fptr, &expireAt)); /* Check if this is a non-volatile field. */ if (expireAt != HASH_LP_NO_TTL) return expireAt; } return EB_EXPIRE_TIME_INVALID; } /* Walk over fields and delete the expired ones. */ void listpackExExpire(redisDb *db, kvobj *kv, ExpireInfo *info) { serverAssert(kv->encoding == OBJ_ENCODING_LISTPACK_EX); uint64_t expired = 0, min = EB_EXPIRE_TIME_INVALID; unsigned char *ptr; listpackEx *lpt = kv->ptr; ptr = lpFirst(lpt->lp); sds key = kvobjGetKey(kv); while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { long long val; int64_t flen; unsigned char intbuf[LP_INTBUF_SIZE], *fref; fref = lpGet(ptr, &flen, intbuf); ptr = lpNext(lpt->lp, ptr); serverAssert(ptr); ptr = lpNext(lpt->lp, ptr); serverAssert(ptr && lpGetIntegerValue(ptr, &val)); /* Fields are ordered by expiry time. If we reached to a non-expired * or a non-volatile field, we know rest is not yet expired. */ if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) break; propagateHashFieldDeletion(db, key, (char *)((fref) ? fref : intbuf), flen); server.stat_expired_subkeys++; ptr = lpNext(lpt->lp, ptr); info->itemsExpired++; expired++; } if (expired) { size_t oldsize = 0; if (server.memory_tracking_per_slot) oldsize = lpBytes(lpt->lp); lpt->lp = lpDeleteRange(lpt->lp, 0, expired * 3); if (server.memory_tracking_per_slot) updateSlotAllocSize(db, getKeySlot(key), oldsize, lpBytes(lpt->lp)); /* update keysizes */ unsigned long l = lpLength(lpt->lp) / 3; updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l + expired, l); } min = hashTypeGetMinExpire(kv, 1 /*accurate*/); info->nextExpireTime = min; } static void listpackExAddInternal(robj *o, listpackEntry ent[3]) { listpackEx *lpt = o->ptr; /* Shortcut, just append at the end if this is a non-volatile field. */ if (ent[2].lval == HASH_LP_NO_TTL) { lpt->lp = lpBatchAppend(lpt->lp, ent, 3); return; } struct lpFingArgs r = { .max_to_search = UINT64_MAX, .expire_time = ent[2].lval, }; /* Check if there is a field with a larger TTL. */ lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); /* If list is empty or there is no field with a larger TTL, result will be * NULL. Otherwise, just insert before the found item.*/ if (r.p) lpt->lp = lpBatchInsert(lpt->lp, r.p, LP_BEFORE, ent, 3, NULL); else lpt->lp = lpBatchAppend(lpt->lp, ent, 3); } /* Add new field ordered by expire time. */ void listpackExAddNew(robj *o, char *field, size_t flen, char *value, size_t vlen, uint64_t expireAt) { listpackEntry ent[3] = { {.sval = (unsigned char*) field, .slen = flen}, {.sval = (unsigned char*) value, .slen = vlen}, {.lval = expireAt} }; listpackExAddInternal(o, ent); } /* If expiry time is changed, this function will place field into the correct * position. First, it deletes the field and re-inserts to the listpack ordered * by expiry time. */ static void listpackExUpdateExpiry(robj *o, sds field, unsigned char *fptr, unsigned char *vptr, uint64_t expire_at) { unsigned int slen = 0; long long val = 0; unsigned char tmp[512] = {0}; unsigned char *valstr; sds tmpval = NULL; listpackEx *lpt = o->ptr; /* Copy value */ valstr = lpGetValue(vptr, &slen, &val); if (valstr) { /* Normally, item length in the listpack is limited by * 'hash-max-listpack-value' config. It is unlikely, but it might be * larger than sizeof(tmp). */ if (slen > sizeof(tmp)) tmpval = sdsnewlen(valstr, slen); else memcpy(tmp, valstr, slen); } /* Delete field name, value and expiry time */ lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); listpackEntry ent[3] = {{0}}; ent[0].sval = (unsigned char*) field; ent[0].slen = sdslen(field); if (valstr) { ent[1].sval = tmpval ? (unsigned char *) tmpval : tmp; ent[1].slen = slen; } else { ent[1].lval = val; } ent[2].lval = expire_at; listpackExAddInternal(o, ent); sdsfree(tmpval); } /* Update field expire time. */ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, unsigned char *fptr, unsigned char *vptr, unsigned char *tptr, uint64_t expireAt) { long long expireTime; uint64_t prevExpire = EB_EXPIRE_TIME_INVALID; serverAssert(lpGetIntegerValue(tptr, &expireTime)); if (expireTime != HASH_LP_NO_TTL) { prevExpire = (uint64_t) expireTime; } /* Special value of EXPIRE_TIME_INVALID indicates field should be persisted.*/ if (expireAt == EB_EXPIRE_TIME_INVALID) { /* Return error if already there is no ttl. */ if (prevExpire == EB_EXPIRE_TIME_INVALID) return HSETEX_NO_CONDITION_MET; listpackExUpdateExpiry(ex->hashObj, field, fptr, vptr, HASH_LP_NO_TTL); return HSETEX_OK; } if (prevExpire == EB_EXPIRE_TIME_INVALID) { /* For fields without expiry, LT condition is considered valid */ if (ex->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } else { if (((ex->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) || ((ex->expireSetCond == HFE_LT) && (prevExpire <= expireAt)) || (ex->expireSetCond == HFE_NX) ) return HSETEX_NO_CONDITION_MET; /* Track of minimum expiration time (only later update global HFE DS) */ if (ex->minExpireFields > prevExpire) ex->minExpireFields = prevExpire; } /* If expired, then delete the field and propagate the deletion. * If replica, continue like the field is valid */ if (unlikely(checkAlreadyExpired(expireAt))) { propagateHashFieldDeletion(ex->db, ex->key->ptr, field, sdslen(field)); hashTypeDelete(ex->hashObj, field); server.stat_expired_subkeys++; return HSETEX_DELETED; } if (ex->minExpireFields > expireAt) ex->minExpireFields = expireAt; listpackExUpdateExpiry(ex->hashObj, field, fptr, vptr, expireAt); return HSETEX_OK; } /* Returns 1 if expired */ int hashTypeIsExpired(const robj *o, uint64_t expireAt) { if (server.allow_access_expired) return 0; if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { if (expireAt == HASH_LP_NO_TTL) return 0; } else if (o->encoding == OBJ_ENCODING_HT) { if (expireAt == EB_EXPIRE_TIME_INVALID) return 0; } else { serverPanic("Unknown encoding: %d", o->encoding); } return (mstime_t) expireAt < commandTimeSnapshot(); } /* Returns listpack pointer of the object. */ unsigned char *hashTypeListpackGetLp(robj *o) { if (o->encoding == OBJ_ENCODING_LISTPACK) return o->ptr; else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) return ((listpackEx*)o->ptr)->lp; serverPanic("Unknown encoding: %d", o->encoding); } /*----------------------------------------------------------------------------- * Hash type API *----------------------------------------------------------------------------*/ /* Check the length of a number of objects to see if we need to convert a * listpack to a real hash. Note that we only check string encoded objects * as their string length can be queried in constant time. */ void hashTypeTryConversion(redisDb *db, kvobj *o, robj **argv, int start, int end) { int i; size_t sum = 0; if (o->encoding != OBJ_ENCODING_LISTPACK && o->encoding != OBJ_ENCODING_LISTPACK_EX) return; /* We guess that most of the values in the input are unique, so * if there are enough arguments we create a pre-sized hash, which * might over allocate memory if there are duplicates. */ size_t new_fields = (end - start + 1) / 2; if (new_fields > server.hash_max_listpack_entries) { hashTypeConvert(db, o, OBJ_ENCODING_HT); dictExpand(o->ptr, new_fields); return; } for (i = start; i <= end; i++) { if (!sdsEncodedObject(argv[i])) continue; size_t len = sdslen(argv[i]->ptr); if (len > server.hash_max_listpack_value) { hashTypeConvert(db, o, OBJ_ENCODING_HT); return; } sum += len; } if (!lpSafeToAdd(hashTypeListpackGetLp(o), sum)) { hashTypeConvert(db, o, OBJ_ENCODING_HT); } } /* Get the value from a listpack encoded hash, identified by field. */ GetFieldRes hashTypeGetFromListpack(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll, uint64_t *expiredAt) { *expiredAt = EB_EXPIRE_TIME_INVALID; unsigned char *zl, *fptr = NULL, *vptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1); if (fptr != NULL) { /* Grab pointer to the value (fptr points to the field) */ vptr = lpNext(zl, fptr); serverAssert(vptr != NULL); } } } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { long long expire; unsigned char *h; listpackEx *lpt = o->ptr; fptr = lpFirst(lpt->lp); if (fptr != NULL) { fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); if (fptr != NULL) { vptr = lpNext(lpt->lp, fptr); serverAssert(vptr != NULL); h = lpNext(lpt->lp, vptr); serverAssert(h && lpGetIntegerValue(h, &expire)); if (expire != HASH_LP_NO_TTL) *expiredAt = expire; } } } else { serverPanic("Unknown hash encoding: %d", o->encoding); } if (vptr != NULL) { *vstr = lpGetValue(vptr, vlen, vll); return GETF_OK; } return GETF_NOT_FOUND; } /* Get the value from a hash table encoded hash, identified by field. * Returns NULL when the field cannot be found, otherwise the SDS value * is returned. */ GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value, uint64_t *expiredAt) { dictEntry *de; *expiredAt = EB_EXPIRE_TIME_INVALID; serverAssert(o->encoding == OBJ_ENCODING_HT); de = dictFind(o->ptr, field); if (de == NULL) return GETF_NOT_FOUND; Entry *entry = dictGetKey(de); *expiredAt = entryGetExpiry(entry); *value = entryGetValue(entry); return GETF_OK; } /* Higher level function of hashTypeGet*() that returns the hash value * associated with the specified field. * Arguments: * hfeFlags - Lookup for HFE_LAZY_* flags * * Returned: * GetFieldRes - Result of get operation * vstr, vlen - if string, ref in either *vstr and *vlen if it's * returned in string form, * vll - or stored in *vll if it's returned as a number. * If *vll is populated *vstr is set to NULL, so the caller can * always check the function return by checking the return value * for GETF_OK and checking if vll (or vstr) is NULL. * expiredAt - if the field has an expiration time, it will be set to the expiration * time of the field. Otherwise, will be set to EB_EXPIRE_TIME_INVALID. */ GetFieldRes hashTypeGetValue(redisDb *db, kvobj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll, int hfeFlags, uint64_t *expiredAt) { sds key = kvobjGetKey(o); GetFieldRes res; uint64_t dummy; size_t oldsize = 0; if (expiredAt == NULL) expiredAt = &dummy; if (o->encoding == OBJ_ENCODING_LISTPACK || o->encoding == OBJ_ENCODING_LISTPACK_EX) { *vstr = NULL; res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, expiredAt); if (res == GETF_NOT_FOUND) return GETF_NOT_FOUND; } else if (o->encoding == OBJ_ENCODING_HT) { sds value = NULL; if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) oldsize = hashTypeAllocSize(o); res = hashTypeGetFromHashTable(o, field, &value, expiredAt); if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) updateSlotAllocSize(db, getKeySlot(key), oldsize, hashTypeAllocSize(o)); if (res == GETF_NOT_FOUND) return GETF_NOT_FOUND; *vstr = (unsigned char*) value; *vlen = sdslen(value); } else { serverPanic("Unknown hash encoding"); } if ((server.allow_access_expired) || (*expiredAt >= (uint64_t) commandTimeSnapshot()) || (hfeFlags & HFE_LAZY_ACCESS_EXPIRED)) return GETF_OK; if (server.masterhost || server.cluster_enabled) { /* If CLIENT_MASTER, assume valid as long as it didn't get delete. * * In cluster mode, we also assume valid if we are importing data * from the source, to avoid deleting fields that are still in use. * We create a fake master client for data import, which can be * identified using the CLIENT_MASTER flag. */ if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) return GETF_OK; /* For replica, if user client, then act as if expired, but don't delete! */ if (server.masterhost) return GETF_EXPIRED; } if ((server.loading) || (hfeFlags & HFE_LAZY_AVOID_FIELD_DEL) || (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) return GETF_EXPIRED; /* delete the field and propagate the deletion */ if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) oldsize = hashTypeAllocSize(o); serverAssert(hashTypeDelete(o, field) == 1); if (server.memory_tracking_per_slot && !(hfeFlags & HFE_LAZY_NO_UPDATE_ALLOCSIZES)) updateSlotAllocSize(db, getKeySlot(key), oldsize, hashTypeAllocSize(o)); propagateHashFieldDeletion(db, key, field, sdslen(field)); server.stat_expired_subkeys++; if (!(hfeFlags & HFE_LAZY_NO_UPDATE_KEYSIZES)) { uint64_t l = hashTypeLength(o, 0); updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l+1, l); } /* If the field is the last one in the hash, then the hash will be deleted */ res = GETF_EXPIRED; robj *keyObj = createStringObject(key, sdslen(key)); if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION)) notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", keyObj, db->id); if ((hashTypeLength(o, 0) == 0) && (!(hfeFlags & HFE_LAZY_AVOID_HASH_DEL))) { if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION)) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id); dbDelete(db,keyObj); o = NULL; res = GETF_EXPIRED_HASH; } keyModified(NULL, db, keyObj, o, !(hfeFlags & HFE_LAZY_NO_SIGNAL)); decrRefCount(keyObj); return res; } /* Like hashTypeGetValue() but returns a Redis object, which is useful for * interaction with the hash type outside t_hash.c. * The function returns NULL if the field is not found in the hash. Otherwise * a newly allocated string object with the value is returned. * * hfeFlags - Lookup HFE_LAZY_* flags * isHashDeleted - If attempted to access expired field and it's the last field * in the hash, then the hash will as well be deleted. In this case, * isHashDeleted will be set to 1. * val - If the field is found, then val will be set to the value object. * expireTime - If the field exists (`GETF_OK`) then expireTime will be set to * the expiration time of the field. Otherwise, it will be set to 0. * * Returns 1 if the field exists, and 0 when it doesn't. */ int hashTypeGetValueObject(redisDb *db, kvobj *o, sds field, int hfeFlags, robj **val, uint64_t *expireTime, int *isHashDeleted) { unsigned char *vstr; unsigned int vlen; long long vll; if (isHashDeleted) *isHashDeleted = 0; if (val) *val = NULL; GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll, hfeFlags, expireTime); if (res == GETF_OK) { /* expireTime set to 0 if the field has no expiration time */ if (expireTime && (*expireTime == EB_EXPIRE_TIME_INVALID)) *expireTime = 0; /* If expected to return the value, then create a new object */ if (val) { if (vstr) *val = createStringObject((char *) vstr, vlen); else *val = createStringObjectFromLongLong(vll); } return 1; } if ((res == GETF_EXPIRED_HASH) && (isHashDeleted)) *isHashDeleted = 1; /* GETF_EXPIRED_HASH, GETF_EXPIRED, GETF_NOT_FOUND */ return 0; } /* Test if the specified field exists in the given hash. If the field is * expired (HFE), then it will be lazy deleted unless HFE_LAZY_AVOID_FIELD_DEL * hfeFlags is set. * * hfeFlags - Lookup HFE_LAZY_* flags * isHashDeleted - If attempted to access expired field and it is the last field * in the hash, then the hash will as well be deleted. In this case, * isHashDeleted will be set to 1. * * Returns 1 if the field exists, and 0 when it doesn't. */ int hashTypeExists(redisDb *db, kvobj *o, sds field, int hfeFlags, int *isHashDeleted) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll, hfeFlags, NULL); if (isHashDeleted) *isHashDeleted = (res == GETF_EXPIRED_HASH) ? 1 : 0; return (res == GETF_OK) ? 1 : 0; } /* Add a new field, overwrite the old with the new value if it already exists. * Return 0 on insert and 1 on update. * * By default, the key and value SDS strings are copied if needed, so the * caller retains ownership of the strings passed. However this behavior * can be effected by passing appropriate flags (possibly bitwise OR-ed): * * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. * HASH_SET_KEEP_TTL -- keep original TTL if field already exists * * When the flags are used the caller does not need to release the passed * SDS string(s). It's up to the function to use the string to create a new * entry or to free the SDS string before returning to the caller. * * HASH_SET_COPY corresponds to no flags passed, and means the default * semantics of copying the values if needed. * */ #define HASH_SET_TAKE_FIELD (1<<0) #define HASH_SET_TAKE_VALUE (1<<1) #define HASH_SET_KEEP_TTL (1<<2) static_assert(HASH_SET_TAKE_VALUE == ENTRY_TAKE_VALUE, "ENTRY_TAKE_VALUE must match HASH_SET_TAKE_VALUE"); int hashTypeSet(redisDb *db, kvobj *o, sds field, sds value, int flags) { int update = 0; /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ if (o->encoding == OBJ_ENCODING_LISTPACK || o->encoding == OBJ_ENCODING_LISTPACK_EX) { if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value) hashTypeConvert(db, o, OBJ_ENCODING_HT); } if (o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl, *fptr, *vptr; zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1); if (fptr != NULL) { /* Grab pointer to the value (fptr points to the field) */ vptr = lpNext(zl, fptr); serverAssert(vptr != NULL); /* Replace value */ zl = lpReplace(zl, &vptr, (unsigned char*)value, sdslen(value)); update = 1; } } if (!update) { listpackEntry entries[2] = { {.sval = (unsigned char*) field, .slen = sdslen(field)}, {.sval = (unsigned char*) value, .slen = sdslen(value)}, }; /* Push new field/value pair onto the tail of the listpack */ zl = lpBatchAppend(zl, entries, 2); } o->ptr = zl; /* Check if the listpack needs to be converted to a hash table */ if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) hashTypeConvert(db, o, OBJ_ENCODING_HT); } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; listpackEx *lpt = o->ptr; long long expireTime = HASH_LP_NO_TTL; fptr = lpFirst(lpt->lp); if (fptr != NULL) { fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); if (fptr != NULL) { /* Grab pointer to the value (fptr points to the field) */ vptr = lpNext(lpt->lp, fptr); serverAssert(vptr != NULL); /* Replace value */ lpt->lp = lpReplace(lpt->lp, &vptr, (unsigned char *) value, sdslen(value)); update = 1; fptr = lpPrev(lpt->lp, vptr); serverAssert(fptr != NULL); tptr = lpNext(lpt->lp, vptr); serverAssert(tptr && lpGetIntegerValue(tptr, &expireTime)); if (flags & HASH_SET_KEEP_TTL) { /* keep old field along with TTL */ } else if (expireTime != HASH_LP_NO_TTL) { /* re-insert field and override TTL */ listpackExUpdateExpiry(o, field, fptr, vptr, HASH_LP_NO_TTL); } } } if (!update) listpackExAddNew(o, field, sdslen(field), value, sdslen(value), HASH_LP_NO_TTL); /* Check if the listpack needs to be converted to a hash table */ if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) hashTypeConvert(db, o, OBJ_ENCODING_HT); } else if (o->encoding == OBJ_ENCODING_HT) { dict *ht = o->ptr; /* check if field already exists */ dictEntryLink bucket, link = dictFindLink(ht, field, &bucket); size_t *alloc_size = htGetMetadataSize(ht); /* take ownership of value if requested */ uint32_t newEntryFlags = flags & HASH_SET_TAKE_VALUE; flags &= ~HASH_SET_TAKE_VALUE; if (link == NULL) { /* Create entry and transfer value ownership if possible */ size_t usable; Entry *newEntry = entryCreate(field, value, newEntryFlags, &usable); dictSetKeyAtLink(ht, newEntry, &bucket, 1); *alloc_size += usable; } else { /* Existing field - update value in entry */ Entry *oldEntry = dictGetKey(*link); /* Check if old entry has expiration before potentially freeing it */ uint64_t oldExpireAt = entryGetExpiry(oldEntry); uint64_t newExpireAt = EB_EXPIRE_TIME_INVALID; /* If attached TTL to the old field, then remove it from hash's * private ebuckets. We do this before updating the value because * the entry might be reallocated and freed. */ if (oldExpireAt != EB_EXPIRE_TIME_INVALID) { hfieldPersist(o, oldEntry); if (flags & HASH_SET_KEEP_TTL) { newExpireAt = oldExpireAt; newEntryFlags |= ENTRY_HAS_EXPIRY; } } ssize_t usableDiff; Entry *newEntry = entryUpdate(oldEntry, value, newEntryFlags, &usableDiff); /* If entry was reallocated, update the dict key */ if (newEntry != oldEntry) { /* entryUpdate already freed the old entry if needed */ /* Update the dict to point to the new entry using dictSetKeyAtLink (no_value=1) */ dictSetKeyAtLink(ht, newEntry, &link, 0); } /* If keeping TTL, add the (potentially new) entry back to ebuckets */ if (newExpireAt != EB_EXPIRE_TIME_INVALID) { dict *d = o->ptr; htMetadataEx *dictExpireMeta = htGetMetadataEx(d); serverAssert(dictExpireMeta->expireMeta.trash == 0); ebAdd(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, newEntry, newExpireAt); } *alloc_size += usableDiff; update = 1; } } else { serverPanic("Unknown hash encoding"); } /* Free SDS strings we did not referenced elsewhere if the flags * want this function to be responsible. */ if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field); if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value); return update; } SetExRes hashTypeSetExpiryHT(HashTypeSetEx *exInfo, sds field, uint64_t expireAt) { dict *ht = exInfo->hashObj->ptr; dictEntryLink link = NULL; Entry *entryNew = NULL; link = dictFindLink(ht, field, NULL); if (link == NULL) return HSETEX_NO_FIELD; dictEntry *existingEntry = *link; Entry *oldEntry = dictGetKey(existingEntry); /* Special value of EXPIRE_TIME_INVALID indicates field should be persisted.*/ if (expireAt == EB_EXPIRE_TIME_INVALID) { /* Return error if already there is no ttl. */ if (entryGetExpiry(oldEntry) == EB_EXPIRE_TIME_INVALID) return HSETEX_NO_CONDITION_MET; hfieldPersist(exInfo->hashObj, oldEntry); return HSETEX_OK; } /* If field doesn't have expiry metadata attached */ if (!entryHasExpiry(oldEntry)) { size_t *alloc_size = htGetMetadataSize(ht); /* For fields without expiry, LT condition is considered valid */ if (exInfo->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; ssize_t usableDiff; entryNew = entryUpdate(oldEntry, NULL, ENTRY_HAS_EXPIRY, &usableDiff); *alloc_size += usableDiff; } else { /* field has ExpireMeta struct attached */ uint64_t prevExpire = entryGetExpiry(oldEntry); /* If field has valid expiration time, then check GT|LT|NX */ if (prevExpire != EB_EXPIRE_TIME_INVALID) { if (((exInfo->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) || ((exInfo->expireSetCond == HFE_LT) && (prevExpire <= expireAt)) || (exInfo->expireSetCond == HFE_NX) ) return HSETEX_NO_CONDITION_MET; /* If expiry time is the same, then nothing to do */ if (prevExpire == expireAt) return HSETEX_OK; /* remove old expiry time from hash's private ebuckets */ htMetadataEx *dm = htGetMetadataEx(ht); ebRemove(&dm->hfe, &hashFieldExpireBucketsType, oldEntry); /* Track of minimum expiration time (only later update global HFE DS) */ if (exInfo->minExpireFields > prevExpire) exInfo->minExpireFields = prevExpire; } else { /* field has invalid expiry. No need to ebRemove() */ /* Check XX|LT|GT */ if (exInfo->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } /* Reuse hfOld as hfNew and rewrite its expiry with ebAdd() */ entryNew = oldEntry; } dictSetKeyAtLink(ht, entryNew, &link, 0); /* newItem=0 for updating existing entry */ /* If expired, then delete the field and propagate the deletion. * If replica, continue like the field is valid */ if (unlikely(checkAlreadyExpired(expireAt))) { /* replicas should not initiate deletion of fields */ propagateHashFieldDeletion(exInfo->db, exInfo->key->ptr, field, sdslen(field)); hashTypeDelete(exInfo->hashObj, field); server.stat_expired_subkeys++; return HSETEX_DELETED; } if (exInfo->minExpireFields > expireAt) exInfo->minExpireFields = expireAt; htMetadataEx *dm = htGetMetadataEx(ht); ebAdd(&dm->hfe, &hashFieldExpireBucketsType, entryNew, expireAt); return HSETEX_OK; } /* * Set field expiration * * Take care to call first hashTypeSetExInit() and then call this function. * Finally, call hashTypeSetExDone() to notify and update global HFE DS. * * Special value of EB_EXPIRE_TIME_INVALID for 'expireAt' argument will persist * the field. */ SetExRes hashTypeSetEx(robj *o, sds field, uint64_t expireAt, HashTypeSetEx *exInfo) { if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; listpackEx *lpt = o->ptr; fptr = lpFirst(lpt->lp); if (fptr) fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); if (!fptr) return HSETEX_NO_FIELD; /* Grab pointer to the value (fptr points to the field) */ vptr = lpNext(lpt->lp, fptr); serverAssert(vptr != NULL); tptr = lpNext(lpt->lp, vptr); serverAssert(tptr); /* update TTL */ return hashTypeSetExpiryListpack(exInfo, field, fptr, vptr, tptr, expireAt); } else if (o->encoding == OBJ_ENCODING_HT) { /* If needed to set the field along with expiry */ return hashTypeSetExpiryHT(exInfo, field, expireAt); } else { serverPanic("Unknown hash encoding"); } return HSETEX_OK; /* never reach here */ } void initDictExpireMetadata(robj *o) { dict *ht = o->ptr; htMetadataEx *m = htGetMetadataEx(ht); m->hfe = ebCreate(); /* Allocate HFE DS */ m->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ } /* Init HashTypeSetEx struct before calling hashTypeSetEx() */ int hashTypeSetExInit(robj *key, kvobj *o, client *c, redisDb *db, ExpireSetCond expireSetCond, HashTypeSetEx *ex) { dict *ht = o->ptr; ex->expireSetCond = expireSetCond; ex->minExpire = EB_EXPIRE_TIME_INVALID; ex->c = c; ex->db = db; ex->key = key; ex->hashObj = o; ex->minExpireFields = EB_EXPIRE_TIME_INVALID; /* Take care that HASH support expiration */ if (o->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvert(c->db, o, OBJ_ENCODING_LISTPACK_EX); } else if (o->encoding == OBJ_ENCODING_HT) { /* Take care dict has HFE metadata */ if (!isDictWithMetaHFE(ht)) { /* Realloc (only header of dict) with metadata for hash-field expiration */ dictTypeAddMeta(&ht, &entryHashDictTypeWithHFE); htMetadataEx *m = htGetMetadataEx(ht); o->ptr = ht; /* Find the key in the keyspace. Need to keep reference to the key for * notifications or even removal of the hash */ /* Fillup dict HFE metadata */ m->hfe = ebCreate(); /* Allocate HFE DS */ m->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ } } /* Read minExpire from attached ExpireMeta to the hash */ ex->minExpire = hashTypeGetMinExpire(o, 0); return C_OK; } /* * After calling hashTypeSetEx() for setting fields or their expiry, call this * function to update global HFE DS. */ void hashTypeSetExDone(HashTypeSetEx *ex) { if (hashTypeLength(ex->hashObj, 0) == 0) return; /* If minimum HFE of the hash is smaller than expiration time of the * specified fields in the command as well as it is smaller or equal * than expiration time provided in the command, then the minimum * HFE of the hash won't change following this command. */ if ((ex->minExpire < ex->minExpireFields)) return; /* Retrieve new expired time. It might have changed. */ uint64_t newMinExpire = hashTypeGetMinExpire(ex->hashObj, 1 /*accurate*/); /* Calculate the diff between old minExpire and newMinExpire. If it is * only few seconds, then don't have to update global HFE DS. At the worst * case fields of hash will be active-expired up to few seconds later. * * In any case, active-expire operation will know to update global * HFE DS more efficiently than here for a single item. */ uint64_t diff = (ex->minExpire > newMinExpire) ? (ex->minExpire - newMinExpire) : (newMinExpire - ex->minExpire); if (diff < HASH_NEW_EXPIRE_DIFF_THRESHOLD) return; int slot = getKeySlot(ex->key->ptr); if (ex->minExpire != EB_EXPIRE_TIME_INVALID) { if (newMinExpire != EB_EXPIRE_TIME_INVALID) estoreUpdate(ex->db->subexpires, slot, ex->hashObj, newMinExpire); else estoreRemove(ex->db->subexpires, slot, ex->hashObj); } else { if (newMinExpire != EB_EXPIRE_TIME_INVALID) estoreAdd(ex->db->subexpires, slot, ex->hashObj, newMinExpire); } } /* Delete an element from a hash. * * Return 1 on deleted and 0 on not found. * field - sds field name to delete */ int hashTypeDelete(robj *o, void *field) { int deleted = 0; int fieldLen = sdslen((sds)field); if (o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl, *fptr; zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { fptr = lpFind(zl, fptr, (unsigned char*)field, fieldLen, 1); if (fptr != NULL) { /* Delete both of the key and the value. */ zl = lpDeleteRangeWithEntry(zl,&fptr,2); o->ptr = zl; deleted = 1; } } } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { unsigned char *fptr; listpackEx *lpt = o->ptr; fptr = lpFirst(lpt->lp); if (fptr != NULL) { fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, fieldLen, 2); if (fptr != NULL) { /* Delete field, value and ttl */ lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); deleted = 1; } } } else if (o->encoding == OBJ_ENCODING_HT) { /* dictDelete() will call dictEntryDestructor() */ if (dictDelete((dict*)o->ptr, field) == C_OK) { deleted = 1; } } else { serverPanic("Unknown hash encoding"); } return deleted; } /* Return the number of elements in a hash. * * Note, subtractExpiredFields=1 might be pricy in case there are many HFEs */ unsigned long hashTypeLength(const robj *o, int subtractExpiredFields) { unsigned long length = ULONG_MAX; /* If expired field access is allowed, don't subtract expired fields from the count. */ if (server.allow_access_expired) subtractExpiredFields = 0; if (o->encoding == OBJ_ENCODING_LISTPACK) { length = lpLength(o->ptr) / 2; } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = o->ptr; length = lpLength(lpt->lp) / 3; if (subtractExpiredFields && lpt->meta.trash == 0) length -= listpackExExpireDryRun(o); } else if (o->encoding == OBJ_ENCODING_HT) { uint64_t expiredItems = 0; dict *d = (dict*)o->ptr; if (subtractExpiredFields && isDictWithMetaHFE(d)) { htMetadataEx *meta = htGetMetadataEx(d); /* If dict registered in global HFE DS */ if (meta->expireMeta.trash == 0) expiredItems = ebExpireDryRun(meta->hfe, &hashFieldExpireBucketsType, commandTimeSnapshot()); } length = dictSize(d) - expiredItems; } else { serverPanic("Unknown hash encoding"); } return length; } size_t hashTypeAllocSize(const robj *o) { serverAssertWithInfo(NULL,o,o->type == OBJ_HASH); size_t size = 0; if (o->encoding == OBJ_ENCODING_LISTPACK) { size = lpBytes(o->ptr); } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = o->ptr; size = sizeof(listpackEx) + lpBytes(lpt->lp); } else if (o->encoding == OBJ_ENCODING_HT) { dict *d = o->ptr; size += sizeof(dict) + dictMemUsage(d) + *htGetMetadataSize(d); } else { serverPanic("Unknown hash encoding"); } return size; } void hashTypeInitIterator(hashTypeIterator *hi, robj *subject) { hi->subject = subject; hi->encoding = subject->encoding; if (hi->encoding == OBJ_ENCODING_LISTPACK || hi->encoding == OBJ_ENCODING_LISTPACK_EX) { hi->fptr = NULL; hi->vptr = NULL; hi->tptr = NULL; hi->expire_time = EB_EXPIRE_TIME_INVALID; } else if (hi->encoding == OBJ_ENCODING_HT) { dictInitIterator(&hi->di, subject->ptr); } else { serverPanic("Unknown hash encoding"); } } void hashTypeResetIterator(hashTypeIterator *hi) { if (hi->encoding == OBJ_ENCODING_HT) dictResetIterator(&hi->di); } /* Move to the next entry in the hash. Return C_OK when the next entry * could be found and C_ERR when the iterator reaches the end. */ int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { /* If expired field access is allowed, don't skip expired fields during iteration */ if (server.allow_access_expired) skipExpiredFields = 0; hi->expire_time = EB_EXPIRE_TIME_INVALID; if (hi->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl; unsigned char *fptr, *vptr; zl = hi->subject->ptr; fptr = hi->fptr; vptr = hi->vptr; if (fptr == NULL) { /* Initialize cursor */ serverAssert(vptr == NULL); fptr = lpFirst(zl); } else { /* Advance cursor */ serverAssert(vptr != NULL); fptr = lpNext(zl, vptr); } if (fptr == NULL) return C_ERR; /* Grab pointer to the value (fptr points to the field) */ vptr = lpNext(zl, fptr); serverAssert(vptr != NULL); /* fptr, vptr now point to the first or next pair */ hi->fptr = fptr; hi->vptr = vptr; } else if (hi->encoding == OBJ_ENCODING_LISTPACK_EX) { long long expire_time; unsigned char *zl = hashTypeListpackGetLp(hi->subject); unsigned char *fptr, *vptr, *tptr; fptr = hi->fptr; vptr = hi->vptr; tptr = hi->tptr; if (fptr == NULL) { /* Initialize cursor */ serverAssert(vptr == NULL); fptr = lpFirst(zl); } else { /* Advance cursor */ serverAssert(tptr != NULL); fptr = lpNext(zl, tptr); } if (fptr == NULL) return C_ERR; while (fptr != NULL) { /* Grab pointer to the value (fptr points to the field) */ vptr = lpNext(zl, fptr); serverAssert(vptr != NULL); tptr = lpNext(zl, vptr); serverAssert(tptr && lpGetIntegerValue(tptr, &expire_time)); if (!skipExpiredFields || !hashTypeIsExpired(hi->subject, expire_time)) break; fptr = lpNext(zl, tptr); } if (fptr == NULL) return C_ERR; /* fptr, vptr now point to the first or next pair */ hi->fptr = fptr; hi->vptr = vptr; hi->tptr = tptr; hi->expire_time = (expire_time != HASH_LP_NO_TTL) ? (uint64_t) expire_time : EB_EXPIRE_TIME_INVALID; } else if (hi->encoding == OBJ_ENCODING_HT) { while ((hi->de = dictNext(&hi->di)) != NULL) { Entry *e = dictGetKey(hi->de); hi->expire_time = entryGetExpiry(e); /* this condition still valid if expire_time equals EB_EXPIRE_TIME_INVALID */ if (skipExpiredFields && ((mstime_t)hi->expire_time < commandTimeSnapshot())) continue; return C_OK; } return C_ERR; } else { serverPanic("Unknown hash encoding"); } return C_OK; } /* Get the field or value at iterator cursor, for an iterator on a hash value * encoded as a listpack. Prototype is similar to `hashTypeGetFromListpack`. */ void hashTypeCurrentFromListpack(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll, uint64_t *expireTime) { serverAssert(hi->encoding == OBJ_ENCODING_LISTPACK || hi->encoding == OBJ_ENCODING_LISTPACK_EX); if (what & OBJ_HASH_KEY) { *vstr = lpGetValue(hi->fptr, vlen, vll); } else { *vstr = lpGetValue(hi->vptr, vlen, vll); } if (expireTime) *expireTime = hi->expire_time; } /* Get the field or value at iterator cursor, for an iterator on a hash value * encoded as a hash table. Prototype is similar to * `hashTypeGetFromHashTable`. * * expireTime - If parameter is not null, then the function will return the expire * time of the field. If expiry not set, return EB_EXPIRE_TIME_INVALID */ void hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, char **str, size_t *len, uint64_t *expireTime) { serverAssert(hi->encoding == OBJ_ENCODING_HT); Entry *e = dictGetKey(hi->de); if (what & OBJ_HASH_KEY) { sds field = entryGetField(e); *str = field; *len = sdslen(field); } else { sds val = entryGetValue(e); *str = val; *len = sdslen(val); } if (expireTime) *expireTime = hi->expire_time; } /* Higher level function of hashTypeCurrent*() that returns the hash value * at current iterator position. * * The returned element is returned by reference in either *vstr and *vlen if * it's returned in string form, or stored in *vll if it's returned as * a number. * * If *vll is populated *vstr is set to NULL, so the caller * can always check the function return by checking the return value * type checking if vstr == NULL. */ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll, uint64_t *expireTime) { if (hi->encoding == OBJ_ENCODING_LISTPACK || hi->encoding == OBJ_ENCODING_LISTPACK_EX) { *vstr = NULL; hashTypeCurrentFromListpack(hi, what, vstr, vlen, vll, expireTime); } else if (hi->encoding == OBJ_ENCODING_HT) { char *ele; size_t eleLen; hashTypeCurrentFromHashTable(hi, what, &ele, &eleLen, expireTime); *vstr = (unsigned char*) ele; *vlen = eleLen; } else { serverPanic("Unknown hash encoding"); } } /* Return the key or value at the current iterator position as a new * SDS string. */ sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what) { unsigned char *vstr; unsigned int vlen; long long vll; hashTypeCurrentObject(hi,what,&vstr,&vlen,&vll, NULL); if (vstr) return sdsnewlen(vstr,vlen); return sdsfromlonglong(vll); } /* Return the key at the current iterator position as a new entry. */ Entry *hashTypeCurrentObjectNewEntry(hashTypeIterator *hi, size_t *usable) { char fieldBuf[LONG_STR_SIZE], valueBuf[LONG_STR_SIZE]; unsigned char *fieldStr, *valueStr; unsigned int fieldLen, valueLen; long long fieldLl, valueLl; Entry *entry; /* Get field */ hashTypeCurrentObject(hi, OBJ_HASH_KEY, &fieldStr, &fieldLen, &fieldLl, NULL); if (!fieldStr) { fieldLen = ll2string(fieldBuf, sizeof(fieldBuf), fieldLl); fieldStr = (unsigned char *) fieldBuf; } sds field = sdsnewlen(fieldStr, fieldLen); /* Get value */ hashTypeCurrentObject(hi, OBJ_HASH_VALUE, &valueStr, &valueLen, &valueLl, NULL); if (!valueStr) { valueLen = ll2string(valueBuf, sizeof(valueBuf), valueLl); valueStr = (unsigned char *) valueBuf; } sds value = sdsnewlen(valueStr, valueLen); int hasExpiry = (hi->expire_time != EB_EXPIRE_TIME_INVALID); /* Create entry with field and value, using iterator's expire_time */ uint32_t entryFlags = ENTRY_TAKE_VALUE | ((hasExpiry) ? ENTRY_HAS_EXPIRY : 0); entry = entryCreate(field, value, entryFlags, usable); sdsfree(field); /* entryCreate() doesn't take ownership of field */ return entry; } static kvobj *hashTypeLookupWriteOrCreate(client *c, robj *key) { dictEntryLink link; kvobj *kv = lookupKeyWriteWithLink(c->db, key, &link); if (checkType(c, kv, OBJ_HASH)) return NULL; if (kv == NULL) { robj *o = createHashObject(); kv = dbAddByLink(c->db, key, &o, &link); } return kv; } void hashTypeConvertListpack(robj *o, int enc) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK); if (enc == OBJ_ENCODING_LISTPACK) { /* Nothing to do... */ } else if (enc == OBJ_ENCODING_LISTPACK_EX) { unsigned char *p; /* Append HASH_LP_NO_TTL to each field name - value pair. */ p = lpFirst(o->ptr); while (p != NULL) { p = lpNext(o->ptr, p); serverAssert(p); o->ptr = lpInsertInteger(o->ptr, HASH_LP_NO_TTL, p, LP_AFTER, &p); p = lpNext(o->ptr, p); } listpackEx *lpt = listpackExCreate(); lpt->lp = o->ptr; o->encoding = OBJ_ENCODING_LISTPACK_EX; o->ptr = lpt; } else if (enc == OBJ_ENCODING_HT) { hashTypeIterator hi; dict *dict; int ret; hashTypeInitIterator(&hi, o); dict = dictCreate(&entryHashDictType); /* Presize the dict to avoid rehashing */ dictExpand(dict,hashTypeLength(o, 0)); size_t usable, *alloc_size = htGetMetadataSize(dict); while (hashTypeNext(&hi, 0) != C_ERR) { Entry *entry = hashTypeCurrentObjectNewEntry(&hi, &usable); ret = dictAdd(dict, entry, NULL); if (ret != DICT_OK) { entryFree(entry, NULL); /* Needed for gcc ASAN */ hashTypeResetIterator(&hi); /* Needed for gcc ASAN */ serverLogHexDump(LL_WARNING,"listpack with dup elements dump", o->ptr,lpBytes(o->ptr)); serverPanic("Listpack corruption detected"); } *alloc_size += usable; } hashTypeResetIterator(&hi); zfree(o->ptr); o->encoding = OBJ_ENCODING_HT; o->ptr = dict; } else { serverPanic("Unknown hash encoding"); } } /* db can be NULL to avoid registration in subexpires */ void hashTypeConvertListpackEx(redisDb *db, robj *o, int enc) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); if (enc == OBJ_ENCODING_LISTPACK_EX) { return; } else if (enc == OBJ_ENCODING_HT) { uint64_t minExpire = EB_EXPIRE_TIME_INVALID; int ret, slot = -1; hashTypeIterator hi; dict *dict; htMetadataEx *dictExpireMeta; listpackEx *lpt = o->ptr; if (db && lpt->meta.trash != 1) { minExpire = hashTypeGetMinExpire(o, 0); slot = getKeySlot(kvobjGetKey(o)); estoreRemove(db->subexpires, slot, o); } dict = dictCreate(&entryHashDictTypeWithHFE); dictExpand(dict,hashTypeLength(o, 0)); dictExpireMeta = htGetMetadataEx(dict); /* Fillup dict HFE metadata */ dictExpireMeta->hfe = ebCreate(); /* Allocate HFE DS */ dictExpireMeta->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ hashTypeInitIterator(&hi, o); size_t usable, *alloc_size = &dictExpireMeta->alloc_size; while (hashTypeNext(&hi, 0) != C_ERR) { /* Create entry with both field and value */ Entry *entry = hashTypeCurrentObjectNewEntry(&hi, &usable); ret = dictAdd(dict, entry, NULL); if (ret != DICT_OK) { entryFree(entry, NULL); /* Needed for gcc ASAN */ hashTypeResetIterator(&hi); /* Needed for gcc ASAN */ serverLogHexDump(LL_WARNING,"listpack with dup elements dump", lpt->lp,lpBytes(lpt->lp)); serverPanic("Listpack corruption detected"); } *alloc_size += usable; if (hi.expire_time != EB_EXPIRE_TIME_INVALID) ebAdd(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, entry, hi.expire_time); } hashTypeResetIterator(&hi); listpackExFree(lpt); o->encoding = OBJ_ENCODING_HT; o->ptr = dict; if (minExpire != EB_EXPIRE_TIME_INVALID) estoreAdd(db->subexpires, slot, o, minExpire); } else { serverPanic("Unknown hash encoding: %d", enc); } } /* NOTE: db can be NULL (Won't register in global HFE DS) */ void hashTypeConvert(redisDb *db, robj *o, int enc) { if (o->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvertListpack(o, enc); } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { hashTypeConvertListpackEx(db, o, enc); } else if (o->encoding == OBJ_ENCODING_HT) { serverPanic("Not implemented"); } else { serverPanic("Unknown hash encoding"); } } /* This is a helper function for the COPY command. * Duplicate a hash object, with the guarantee that the returned object * has the same encoding as the original one. * * The resulting object always has refcount set to 1 */ robj *hashTypeDup(kvobj *o, uint64_t *minHashExpire) { robj *hobj; hashTypeIterator hi; serverAssert(o->type == OBJ_HASH); if(o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl = o->ptr; size_t sz = lpBytes(zl); unsigned char *new_zl = zmalloc(sz); memcpy(new_zl, zl, sz); hobj = createObject(OBJ_HASH, new_zl); hobj->encoding = OBJ_ENCODING_LISTPACK; } else if(o->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = o->ptr; if (lpt->meta.trash == 0) *minHashExpire = ebGetMetaExpTime(&lpt->meta); listpackEx *dup = listpackExCreate(); size_t sz = lpBytes(lpt->lp); dup->lp = lpNew(sz); memcpy(dup->lp, lpt->lp, sz); hobj = createObject(OBJ_HASH, dup); hobj->encoding = OBJ_ENCODING_LISTPACK_EX; } else if(o->encoding == OBJ_ENCODING_HT) { htMetadataEx *dictExpireMetaSrc, *dictExpireMetaDst = NULL; dict *d; /* If dict doesn't have HFE metadata, then create a new dict without it */ if (!isDictWithMetaHFE(o->ptr)) { d = dictCreate(&entryHashDictType); } else { /* Create a new dict with HFE metadata */ d = dictCreate(&entryHashDictTypeWithHFE); dictExpireMetaSrc = htGetMetadataEx((dict *) o->ptr); dictExpireMetaDst = htGetMetadataEx(d); dictExpireMetaDst->hfe = ebCreate(); /* Allocate HFE DS */ dictExpireMetaDst->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */ /* Extract the minimum expire time of the source hash (Will be used by caller * to register the new hash in the global subexpires DB) */ if (dictExpireMetaSrc->expireMeta.trash == 0) *minHashExpire = ebGetMetaExpTime(&dictExpireMetaSrc->expireMeta); } dictExpand(d, dictSize((const dict*)o->ptr)); size_t usable, *alloc_size = htGetMetadataSize(d); hashTypeInitIterator(&hi, o); while (hashTypeNext(&hi, 0) != C_ERR) { Entry *newEntry; uint64_t expireTime; /* Extract a field-value pair from an original hash object.*/ char *field, *value; size_t fieldLen, valueLen; hashTypeCurrentFromHashTable(&hi, OBJ_HASH_KEY, &field, &fieldLen, &expireTime); hashTypeCurrentFromHashTable(&hi, OBJ_HASH_VALUE, &value, &valueLen, NULL); /* Create new entry with field and value */ sds newFieldSds = sdsnewlen(field, fieldLen); sds newValueSds = sdsnewlen(value, valueLen); /* Create new entry with field and value, optional expiry. */ if (expireTime == EB_EXPIRE_TIME_INVALID) { newEntry = entryCreate(newFieldSds, newValueSds, ENTRY_TAKE_VALUE, &usable); } else { newEntry = entryCreate(newFieldSds, newValueSds, ENTRY_TAKE_VALUE | ENTRY_HAS_EXPIRY, &usable); ebAdd(&dictExpireMetaDst->hfe, &hashFieldExpireBucketsType, newEntry, expireTime); } sdsfree(newFieldSds); /* (Only value ownership transferred to entry) */ /* Add entry to new hash object. */ dictAdd(d, newEntry, NULL); /* no_value=1, so value is NULL */ *alloc_size += usable; } hashTypeResetIterator(&hi); hobj = createObject(OBJ_HASH, d); hobj->encoding = OBJ_ENCODING_HT; } else { serverPanic("Unknown hash encoding"); } return hobj; } /* Create a new sds string from the listpack entry. */ sds hashSdsFromListpackEntry(listpackEntry *e) { return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval); } /* Reply with bulk string from the listpack entry. */ void hashReplyFromListpackEntry(client *c, listpackEntry *e) { if (e->sval) addReplyBulkCBuffer(c, e->sval, e->slen); else addReplyBulkLongLong(c, e->lval); } /* Return random element from a non empty hash. * 'key' and 'val' will be set to hold the element. * The memory in them is not to be freed or modified by the caller. * 'val' can be NULL in which case it's not extracted. */ void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, CommonEntry *key, CommonEntry *val) { if (hashobj->encoding == OBJ_ENCODING_HT) { dictEntry *de = dictGetFairRandomKey(hashobj->ptr); Entry *entry = dictGetKey(de); sds field = entryGetField(entry); key->sval = (unsigned char*) field; key->slen = sdslen(field); if (val) { sds s = entryGetValue(entry); val->sval = (unsigned char*)s; val->slen = sdslen(s); } } else if (hashobj->encoding == OBJ_ENCODING_LISTPACK) { lpRandomPair(hashobj->ptr, hashsize, (listpackEntry *) key, (listpackEntry *) val, 2); } else if (hashobj->encoding == OBJ_ENCODING_LISTPACK_EX) { lpRandomPair(hashTypeListpackGetLp(hashobj), hashsize, (listpackEntry *) key, (listpackEntry *) val, 3); } else { serverPanic("Unknown hash encoding"); } } /* Delete all expired fields from the hash and delete the hash if left empty. * * updateSubexpires - If the hash should be updated in the subexpires DB with new * expiration time in case expired fields were deleted. * * Return next Expire time of the hash * - 0 if hash got deleted * - EB_EXPIRE_TIME_INVALID if no more fields to expire */ uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int updateSubexpires) { uint64_t noExpireLeftRes = EB_EXPIRE_TIME_INVALID; ExpireInfo info = {0}; if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { info = (ExpireInfo) { .maxToExpire = *quota, .now = commandTimeSnapshot(), .itemsExpired = 0}; listpackExExpire(db, o, &info); } else { serverAssert(o->encoding == OBJ_ENCODING_HT); dict *d = o->ptr; htMetadataEx *dictExpireMeta = htGetMetadataEx(d); OnFieldExpireCtx onFieldExpireCtx = { .hashObj = o, .db = db }; info = (ExpireInfo){ .maxToExpire = *quota, .onExpireItem = onFieldExpire, .ctx = &onFieldExpireCtx, .now = commandTimeSnapshot() }; ebExpire(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, &info); } /* Update quota left */ *quota -= info.itemsExpired; /* In some cases, a field might have been deleted without updating the global DS. * As a result, active-expire might not expire any fields, in such cases, * we don't need to send notifications or perform other operations for this key. */ if (info.itemsExpired) { sds keystr = kvobjGetKey(o); robj *key = createStringObject(keystr, sdslen(keystr)); notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", key, db->id); int slot; int deleted = 0; if (updateSubexpires) { slot = getKeySlot(keystr); estoreRemove(db->subexpires, slot, o); } if (hashTypeLength(o, 0) == 0) { notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id); dbDelete(db, key); noExpireLeftRes = 0; deleted = 1; } else { if ((updateSubexpires) && (info.nextExpireTime != EB_EXPIRE_TIME_INVALID)) estoreAdd(db->subexpires, slot, o, info.nextExpireTime); } keyModified(NULL, db, key, deleted ? NULL : o, 1); decrRefCount(key); } /* return 0 if hash got deleted, EB_EXPIRE_TIME_INVALID if no more fields * with expiration. Else return next expiration time */ return (info.nextExpireTime == EB_EXPIRE_TIME_INVALID) ? noExpireLeftRes : info.nextExpireTime; } /* Delete all expired fields in hash if needed (Currently used only by HRANDFIELD) * * NOTICE: If we call this function in other places, we should consider the slot * migration scenario, where we don't want to delete expired fields. See also * expireIfNeeded(). * * Return 1 if the entire hash was deleted, 0 otherwise. * This function might be pricy in case there are many expired fields. */ static int hashTypeExpireIfNeeded(redisDb *db, kvobj *o) { uint64_t nextExpireTime; uint64_t minExpire = hashTypeGetMinExpire(o, 1 /*accurate*/); /* Nothing to expire */ if ((mstime_t) minExpire >= commandTimeSnapshot()) return 0; /* Follow expireIfNeeded() conditions of when not lazy-expire */ if ( (server.loading) || (server.allow_access_expired) || (server.masterhost) || /* master-client or user-client, don't delete */ (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) return 0; /* Take care to expire all the fields */ uint32_t quota = UINT32_MAX; nextExpireTime = hashTypeActiveExpire(db, o, "a, 1); /* return 1 if the entire hash was deleted */ return nextExpireTime == 0; } /* Return the next/minimum expiry time of the hash-field. * accurate=1 - Return the exact time by looking into the object DS. * accurate=0 - Return the minimum expiration time maintained in expireMeta * (Verify it is not trash before using it) which might not be * accurate due to optimization reasons. * * If not found, return EB_EXPIRE_TIME_INVALID */ uint64_t hashTypeGetMinExpire(robj *o, int accurate) { ExpireMeta *expireMeta = NULL; if (!accurate) { if (o->encoding == OBJ_ENCODING_LISTPACK) { return EB_EXPIRE_TIME_INVALID; } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = o->ptr; expireMeta = &lpt->meta; } else { serverAssert(o->encoding == OBJ_ENCODING_HT); dict *d = o->ptr; if (!isDictWithMetaHFE(d)) return EB_EXPIRE_TIME_INVALID; expireMeta = &htGetMetadataEx(d)->expireMeta; } /* Keep aside next hash-field expiry before updating HFE DS. Verify it is not trash */ if (expireMeta->trash == 1) return EB_EXPIRE_TIME_INVALID; return ebGetMetaExpTime(expireMeta); } /* accurate == 1 */ if (o->encoding == OBJ_ENCODING_LISTPACK) { return EB_EXPIRE_TIME_INVALID; } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { return listpackExGetMinExpire(o); } else { serverAssert(o->encoding == OBJ_ENCODING_HT); dict *d = o->ptr; if (!isDictWithMetaHFE(d)) return EB_EXPIRE_TIME_INVALID; htMetadataEx *expireMeta = htGetMetadataEx(d); return ebGetNextTimeToExpire(expireMeta->hfe, &hashFieldExpireBucketsType); } } int hashTypeIsFieldsWithExpire(robj *o) { if (o->encoding == OBJ_ENCODING_LISTPACK) { return 0; } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { return EB_EXPIRE_TIME_INVALID != listpackExGetMinExpire(o); } else { /* o->encoding == OBJ_ENCODING_HT */ dict *d = o->ptr; /* If dict doesn't holds HFE metadata */ if (!isDictWithMetaHFE(d)) return 0; htMetadataEx *meta = htGetMetadataEx(d); return ebGetTotalItems(meta->hfe, &hashFieldExpireBucketsType) != 0; } } void hashTypeFree(robj *o) { switch (o->encoding) { case OBJ_ENCODING_HT: /* Verify hash is not registered in global HFE ds */ if (isDictWithMetaHFE((dict*)o->ptr)) { htMetadataEx *m = htGetMetadataEx((dict*)o->ptr); serverAssert(m->expireMeta.trash == 1); } #ifdef DEBUG_ASSERTIONS dictEmpty(o->ptr, NULL); debugServerAssert(*htGetMetadataSize(o->ptr) == 0); #endif dictRelease((dict*) o->ptr); break; case OBJ_ENCODING_LISTPACK: lpFree(o->ptr); break; case OBJ_ENCODING_LISTPACK_EX: /* Verify hash is not registered in global HFE ds */ serverAssert(((listpackEx *) o->ptr)->meta.trash == 1); listpackExFree(o->ptr); break; default: serverPanic("Unknown hash encoding type"); break; } } ebuckets *hashTypeGetDictMetaHFE(dict *d) { htMetadataEx *dictExpireMeta = htGetMetadataEx(d); return &dictExpireMeta->hfe; } /*----------------------------------------------------------------------------- * Hash type commands *----------------------------------------------------------------------------*/ void hsetnxCommand(client *c) { unsigned long hlen; int isHashDeleted; size_t oldsize = 0; robj *kv = hashTypeLookupWriteOrCreate(c,c->argv[1]); if (kv == NULL) return; if (hashTypeExists(c->db, kv, c->argv[2]->ptr, HFE_LAZY_EXPIRE, &isHashDeleted)) { addReply(c, shared.czero); return; } /* Field expired and in turn hash deleted. Create new one! */ if (isHashDeleted) { robj *o = createHashObject(); kv = dbAdd(c->db,c->argv[1],&o); } if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(kv); hashTypeTryConversion(c->db, kv, c->argv, 2, 3); hashTypeSet(c->db, kv, c->argv[2]->ptr, c->argv[3]->ptr, HASH_SET_COPY); addReply(c, shared.cone); keyModified(c,c->db,c->argv[1], kv, 1); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); hlen = hashTypeLength(kv, 0); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, hlen - 1, hlen); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(kv)); server.dirty++; } void hsetCommand(client *c) { int i, created = 0; size_t oldsize = 0; kvobj *kv; if ((c->argc % 2) == 1) { addReplyErrorArity(c); return; } if ((kv = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(kv); hashTypeTryConversion(c->db, kv, c->argv, 2, c->argc-1); for (i = 2; i < c->argc; i += 2) created += !hashTypeSet(c->db, kv, c->argv[i]->ptr, c->argv[i+1]->ptr, HASH_SET_COPY); /* HMSET (deprecated) and HSET return value is different. */ char *cmdname = c->argv[0]->ptr; if (cmdname[1] == 's' || cmdname[1] == 'S') { /* HSET */ addReplyLongLong(c, created); } else { /* HMSET */ addReply(c, shared.ok); } keyModified(c,c->db,c->argv[1],kv,1); unsigned long l = hashTypeLength(kv, 0); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l - created, l); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(kv)); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); server.dirty += (c->argc - 2)/2; } /* Parse expire time from argument and do boundary checks. */ static int parseExpireTime(client *c, robj *o, int unit, long long basetime, long long *expire) { long long val; /* Read the expiry time from command */ if (getLongLongFromObjectOrReply(c, o, &val, NULL) != C_OK) return C_ERR; if (val < 0) { addReplyError(c,"invalid expire time, must be >= 0"); return C_ERR; } if (unit == UNIT_SECONDS) { if (val > (long long) HFE_MAX_ABS_TIME_MSEC / 1000) { addReplyErrorExpireTime(c); return C_ERR; } val *= 1000; } if (val > (long long) HFE_MAX_ABS_TIME_MSEC - basetime) { addReplyErrorExpireTime(c); return C_ERR; } val += basetime; *expire = val; return C_OK; } /* Flags that are used as part of HGETEX and HSETEX commands. */ #define HFE_EX (1<<0) /* Expiration time in seconds */ #define HFE_PX (1<<1) /* Expiration time in milliseconds */ #define HFE_EXAT (1<<2) /* Expiration time in unix seconds */ #define HFE_PXAT (1<<3) /* Expiration time in unix milliseconds */ #define HFE_PERSIST (1<<4) /* Persist fields */ #define HFE_KEEPTTL (1<<5) /* Do not discard field ttl on set op */ #define HFE_FXX (1<<6) /* Set fields if all the fields already exist */ #define HFE_FNX (1<<7) /* Set fields if none of the fields exist */ /* Command types for unified hash argument parser */ #define HASH_CMD_HGETEX 0 #define HASH_CMD_HSETEX 1 /* Parse hash field expiration command arguments for both HGETEX and HSETEX. * HGETEX [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|PERSIST] * FIELDS field [field ...] * HSETEX [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL] * [FXX|FNX] FIELDS field value [field value ...] */ static int parseHashFieldExpireArgs(client *c, int *flags, long long *expire_time, int *expire_time_pos, int *first_field_pos, int *field_count, int command_type) { *flags = 0; *first_field_pos = -1; *field_count = -1; *expire_time_pos = -1; for (int i = 2; i < c->argc; i++) { if (!strcasecmp(c->argv[i]->ptr, "fields")) { int args_per_field = (command_type == HASH_CMD_HSETEX) ? 2 : 1; long val; /* Ensure we have at least the numfields argument */ if (i + 1 >= c->argc) { addReplyErrorArity(c); return C_ERR; } if (getRangeLongFromObjectOrReply(c, c->argv[i + 1], 1, INT_MAX, &val, "invalid number of fields") != C_OK) return C_ERR; *first_field_pos = i + 2; *field_count = (int) val; /* Validate field count based on command type */ long long required_args = *first_field_pos + ((long long)*field_count * args_per_field); if (required_args > c->argc) { addReplyError(c, "wrong number of arguments"); return C_ERR; } /* Skip over numfields and all field-value pairs * Set i to the last position of the FIELDS block, loop will increment past it */ i = *first_field_pos + (*field_count * args_per_field) - 1; continue; } else if (!strcasecmp(c->argv[i]->ptr, "EX")) { if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) goto err_expiration; if (i >= c->argc - 1) goto err_missing_expire; *flags |= HFE_EX; i++; if (parseExpireTime(c, c->argv[i], UNIT_SECONDS, commandTimeSnapshot(), expire_time) != C_OK) return C_ERR; *expire_time_pos = i; } else if (!strcasecmp(c->argv[i]->ptr, "PX")) { if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) goto err_expiration; if (i >= c->argc - 1) goto err_missing_expire; *flags |= HFE_PX; i++; if (parseExpireTime(c, c->argv[i], UNIT_MILLISECONDS, commandTimeSnapshot(), expire_time) != C_OK) return C_ERR; *expire_time_pos = i; } else if (!strcasecmp(c->argv[i]->ptr, "EXAT")) { if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) goto err_expiration; if (i >= c->argc - 1) goto err_missing_expire; *flags |= HFE_EXAT; i++; if (parseExpireTime(c, c->argv[i], UNIT_SECONDS, 0, expire_time) != C_OK) return C_ERR; *expire_time_pos = i; } else if (!strcasecmp(c->argv[i]->ptr, "PXAT")) { if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL | HFE_PERSIST)) goto err_expiration; if (i >= c->argc - 1) goto err_missing_expire; *flags |= HFE_PXAT; i++; if (parseExpireTime(c, c->argv[i], UNIT_MILLISECONDS, 0, expire_time) != C_OK) return C_ERR; *expire_time_pos = i; } else if (!strcasecmp(c->argv[i]->ptr, "PERSIST")) { if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_PERSIST)) goto err_expiration; *flags |= HFE_PERSIST; } else if (!strcasecmp(c->argv[i]->ptr, "KEEPTTL")) { if (*flags & (HFE_EX | HFE_EXAT | HFE_PX | HFE_PXAT | HFE_KEEPTTL)) goto err_expiration; *flags |= HFE_KEEPTTL; } else if (!strcasecmp(c->argv[i]->ptr, "FXX")) { if (*flags & (HFE_FXX | HFE_FNX)) goto err_condition; *flags |= HFE_FXX; } else if (!strcasecmp(c->argv[i]->ptr, "FNX")) { if (*flags & (HFE_FXX | HFE_FNX)) goto err_condition; *flags |= HFE_FNX; } else { addReplyErrorFormat(c, "unknown argument: %s", (char*) c->argv[i]->ptr); return C_ERR; } } /* Validate command-specific argument compatibility */ if ((command_type == HASH_CMD_HGETEX && (*flags & (HFE_KEEPTTL | HFE_FXX | HFE_FNX))) || (command_type == HASH_CMD_HSETEX && (*flags & HFE_PERSIST))) { addReplyError(c, "unknown argument"); return C_ERR; } return C_OK; err_missing_expire: addReplyError(c, "missing expire time"); return C_ERR; err_condition: addReplyError(c, "Only one of FXX or FNX arguments can be specified"); return C_ERR; err_expiration: if (command_type == HASH_CMD_HSETEX) { addReplyError(c, "Only one of EX, PX, EXAT, PXAT or KEEPTTL arguments can be specified"); } else { addReplyError(c, "Only one of EX, PX, EXAT, PXAT or PERSIST arguments can be specified"); } return C_ERR; } /* Set the value of one or more fields of a given hash key, and optionally set * their expiration. * * HSETEX key * [FNX | FXX] * [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL] * FIELDS field value [field value...] * * Reply: * Integer reply: 0 if no fields were set (due to FXX/FNX args) * Integer reply: 1 if all the fields were set */ void hsetexCommand(client *c) { int flags = 0, first_field_pos = 0, field_count = 0, expire_time_pos = -1; int updated = 0, deleted = 0, set_expiry; int expired = 0, fields_set = 0; long long expire_time = EB_EXPIRE_TIME_INVALID; int64_t oldlen, newlen; HashTypeSetEx setex; dictEntryLink link; size_t oldsize = 0; if (parseHashFieldExpireArgs(c, &flags, &expire_time, &expire_time_pos, &first_field_pos, &field_count, HASH_CMD_HSETEX) != C_OK) return; kvobj *o = lookupKeyWriteWithLink(c->db, c->argv[1], &link); if (checkType(c, o, OBJ_HASH)) return; if (!o) { if (flags & HFE_FXX) { addReplyLongLong(c, 0); return; } o = createHashObject(); dbAddByLink(c->db, c->argv[1], &o, &link); } oldlen = (int64_t) hashTypeLength(o, 0); if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); if (flags & (HFE_FXX | HFE_FNX)) { int found = 0; for (int i = 0; i < field_count; i++) { sds field = c->argv[first_field_pos + (i * 2)]->ptr; unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; const int opt = HFE_LAZY_NO_NOTIFICATION | HFE_LAZY_NO_SIGNAL | HFE_LAZY_AVOID_HASH_DEL | HFE_LAZY_NO_UPDATE_KEYSIZES | HFE_LAZY_NO_UPDATE_ALLOCSIZES; GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, opt, NULL); int exists = (res == GETF_OK); expired += (res == GETF_EXPIRED); found += exists; /* Check for early exit if the condition is already invalid. */ if (((flags & HFE_FXX) && !exists) || ((flags & HFE_FNX) && exists)) break; } int all_exists = (found == field_count); int non_exists = (found == 0); if (((flags & HFE_FNX) && !non_exists) || ((flags & HFE_FXX) && !all_exists)) { addReplyLongLong(c, 0); goto out; } } hashTypeTryConversion(c->db, o,c->argv, first_field_pos, c->argc - 1); /* Check if we will set the expiration time. */ set_expiry = flags & (HFE_EX | HFE_PX | HFE_EXAT | HFE_PXAT); if (set_expiry) hashTypeSetExInit(c->argv[1], o, c, c->db, 0, &setex); for (int i = 0; i < field_count; i++) { sds field = c->argv[first_field_pos + (i * 2)]->ptr; sds value = c->argv[first_field_pos + (i * 2) + 1]->ptr; int opt = HASH_SET_COPY; /* If we are going to set the expiration time later, no need to discard * it as part of set operation now. */ if (flags & (HFE_EX | HFE_PX | HFE_EXAT | HFE_PXAT | HFE_KEEPTTL)) opt |= HASH_SET_KEEP_TTL; hashTypeSet(c->db, o, field, value, opt); fields_set = 1; /* Update the expiration time. */ if (set_expiry) { int ret = hashTypeSetEx(o, field, expire_time, &setex); updated += (ret == HSETEX_OK); deleted += (ret == HSETEX_DELETED); } } if (set_expiry) hashTypeSetExDone(&setex); server.dirty += field_count; if (deleted) { /* If fields are deleted due to timestamp is being in the past, hdel's * are already propagated. No need to propagate the command itself. */ preventCommandPropagation(c); } else if (set_expiry && !(flags & HFE_PXAT)) { /* Propagate as 'HSETEX PXAT ..' if there is EX/EXAT/PX flag*/ /* Replace EX/EXAT/PX with PXAT */ rewriteClientCommandArgument(c, expire_time_pos - 1, shared.pxat); /* Replace timestamp with unix timestamp milliseconds. */ robj *expire = createStringObjectFromLongLong(expire_time); rewriteClientCommandArgument(c, expire_time_pos, expire); decrRefCount(expire); } addReplyLongLong(c, 1); out: if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); /* Emit keyspace notifications based on field expiry, mutation, or key deletion */ if (fields_set || expired) { keyModified(c, c->db, c->argv[1], o, 1); if (expired) notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); if (fields_set) { notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); if (deleted || updated) notifyKeyspaceEvent(NOTIFY_HASH, deleted ? "hdel" : "hexpire", c->argv[1], c->db->id); } } /* Key may become empty due to lazy expiry in hashTypeExists() * or the new expiration time is in the past.*/ newlen = (int64_t) hashTypeLength(o, 0); if (newlen == 0) { newlen = -1; /* Del key but don't update KEYSIZES. else it will decr wrong bin in histogram */ dbDeleteSkipKeysizesUpdate(c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } if (oldlen != newlen) updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldlen, newlen); } void hincrbyCommand(client *c) { long long value, incr, oldvalue; kvobj *o; sds new; unsigned char *vstr; unsigned int vlen; size_t oldsize = 0; if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; GetFieldRes res = hashTypeGetValue(c->db,o,c->argv[2]->ptr,&vstr,&vlen,&value, HFE_LAZY_EXPIRE, NULL); if (res == GETF_OK) { if (vstr) { if (string2ll((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not an integer"); return; } } /* Else hashTypeGetValue() already stored it into &value */ } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { value = 0; unsigned long l = hashTypeLength(o, 0); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1); } else { /* Field expired and in turn hash deleted. Create new one! */ o = createHashObject(); dbAdd(c->db,c->argv[1],&o); value = 0; updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1); } oldvalue = value; if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) { addReplyError(c,"increment or decrement would overflow"); return; } value += incr; new = sdsfromlonglong(value); if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_TTL); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); addReplyLongLong(c,value); keyModified(c,c->db,c->argv[1], o, 1); notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); server.dirty++; } void hincrbyfloatCommand(client *c) { long double value, incr; long long ll; kvobj *o; sds new; unsigned char *vstr; unsigned int vlen; size_t oldsize = 0; if (getLongDoubleFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; if (isnan(incr) || isinf(incr)) { addReplyError(c,"value is NaN or Infinity"); return; } if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll, HFE_LAZY_EXPIRE, NULL); if (res == GETF_OK) { if (vstr) { if (string2ld((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not a float"); return; } } else { value = (long double)ll; } } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { value = 0; unsigned long l = hashTypeLength(o, 0); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1); } else { /* Field expired and in turn hash deleted. Create new one! */ o = createHashObject(); dbAdd(c->db, c->argv[1], &o); value = 0; updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1); } value += incr; if (isnan(value) || isinf(value)) { addReplyError(c,"increment would produce NaN or Infinity"); return; } char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN); new = sdsnewlen(buf,len); if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_TTL); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); addReplyBulkCBuffer(c,buf,len); keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); server.dirty++; /* Always replicate HINCRBYFLOAT as an HSETEX command with the final value * in order to make sure that differences in float precision or formatting * will not create differences in replicas or after an AOF restart. * The KEEPTTL flag is used to make sure the field TTL is preserved. */ robj *newobj; newobj = createRawStringObject(buf,len); rewriteClientCommandVector(c, 7, shared.hsetex, c->argv[1], shared.keepttl, shared.fields, shared.integers[1], c->argv[2], newobj); decrRefCount(newobj); } static GetFieldRes addHashFieldToReply(client *c, kvobj *o, sds field, int hfeFlags) { if (o == NULL) { addReplyNull(c); return GETF_NOT_FOUND; } unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll, hfeFlags, NULL); if (res == GETF_OK) { if (vstr) { addReplyBulkCBuffer(c, vstr, vlen); } else { addReplyBulkLongLong(c, vll); } } else { addReplyNull(c); } return res; } void hgetCommand(client *c) { kvobj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL || checkType(c,o,OBJ_HASH)) return; addHashFieldToReply(c, o, c->argv[2]->ptr, HFE_LAZY_EXPIRE); } void hmgetCommand(client *c) { GetFieldRes res = GETF_OK; int i; int expired = 0, deleted = 0; /* Don't abort when the key cannot be found. Non-existing keys are empty * hashes, where HMGET should respond with a series of null bulks. */ kvobj *o = lookupKeyRead(c->db, c->argv[1]); if (checkType(c,o,OBJ_HASH)) return; addReplyArrayLen(c, c->argc-2); for (i = 2; i < c->argc ; i++) { if (!deleted) { res = addHashFieldToReply(c, o, c->argv[i]->ptr, HFE_LAZY_NO_NOTIFICATION); expired += (res == GETF_EXPIRED); deleted += (res == GETF_EXPIRED_HASH); } else { /* If hash got lazy expired since all fields are expired (o is invalid), * then fill the rest with trivial nulls and return. */ addReplyNull(c); } } if (expired) { notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); if (deleted) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } } /* Get and delete the value of one or more fields of a given hash key. * HGETDEL FIELDS field1 field2 ... * Reply: list of the value associated with each field or nil if the field * doesn’t exist. */ void hgetdelCommand(client *c) { int res = 0, hfe = 0, deleted = 0, expired = 0; int64_t oldlen = -1; /* not exists as long as it is not set */ long num_fields = 0; size_t oldsize = 0; kvobj *o = lookupKeyWrite(c->db, c->argv[1]); if (checkType(c, o, OBJ_HASH)) return; if (strcasecmp(c->argv[2]->ptr, "FIELDS") != 0) { addReplyError(c, "Mandatory argument FIELDS is missing or not at the right position"); return; } /* Read number of fields */ if (getRangeLongFromObjectOrReply(c, c->argv[3], 1, LONG_MAX, &num_fields, "Number of fields must be a positive integer") != C_OK) return; /* Verify `numFields` is consistent with number of arguments */ if (num_fields != c->argc - 4) { addReplyError(c, "The `numfields` parameter must match the number of arguments"); return; } /* Hash field expiration is optimized to avoid frequent update global HFE DS * for each field deletion. Eventually active-expiration will run and update * or remove the hash from global HFE DS gracefully. Nevertheless, statistic * "subexpiry" might reflect wrong number of hashes with HFE to the user if * it is the last field with expiration. The following logic checks if this * is the last field with expiration and removes it from global HFE DS. */ if (o) { hfe = hashTypeIsFieldsWithExpire(o); oldlen = hashTypeLength(o, 0); if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); } addReplyArrayLen(c, num_fields); for (int i = 4; i < c->argc; i++) { const int flags = HFE_LAZY_NO_NOTIFICATION | HFE_LAZY_NO_SIGNAL | HFE_LAZY_AVOID_HASH_DEL | HFE_LAZY_NO_UPDATE_KEYSIZES | HFE_LAZY_NO_UPDATE_ALLOCSIZES; res = addHashFieldToReply(c, o, c->argv[i]->ptr, flags); expired += (res == GETF_EXPIRED); /* Try to delete only if it's found and not expired lazily. */ if (res == GETF_OK) { deleted++; serverAssert(hashTypeDelete(o, c->argv[i]->ptr) == 1); } } /* Return if no modification has been made. */ if (expired == 0 && deleted == 0) return; if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); keyModified(c, c->db, c->argv[1], o, 1); if (expired) notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); if (deleted) { notifyKeyspaceEvent(NOTIFY_HASH, "hdel", c->argv[1], c->db->id); server.dirty += deleted; /* Propagate as HDEL command. * Orig: HGETDEL FIELDS field1 field2 ... * Repl: HDEL field1 field2 ... */ rewriteClientCommandArgument(c, 0, shared.hdel); rewriteClientCommandArgument(c, 2, NULL); /* Delete FIELDS arg */ rewriteClientCommandArgument(c, 2, NULL); /* Delete arg */ } /* Key may have become empty because of deleting fields or lazy expire. */ int64_t newlen = (int64_t) hashTypeLength(o, 0); if (newlen == 0) { newlen = -1; /* Del key but don't update KEYSIZES. else it will decr wrong bin in histogram */ dbDeleteSkipKeysizesUpdate(c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } else { if (hfe && (hashTypeIsFieldsWithExpire(o) == 0)) { /*is it last HFE*/ estoreRemove(c->db->subexpires, getKeySlot(kvobjGetKey(o)), o); } } if (oldlen != newlen) updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldlen, newlen); } /* Get the value of one or more fields of a given hash key and optionally set * their expiration. * * HGETEX * [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | PERSIST] * FIELDS field1 field2 ... * * Reply: list of the value associated with each field or nil if the field * doesn’t exist. */ void hgetexCommand(client *c) { int expired = 0, deleted = 0, updated = 0; int parse_flags = 0, expire_time_pos = -1, first_field_pos = -1, num_fields = -1; long long expire_time = 0; int64_t oldlen = 0, newlen = -1; HashTypeSetEx setex; size_t oldsize = 0; kvobj *o = lookupKeyWrite(c->db, c->argv[1]); if (checkType(c, o, OBJ_HASH)) return; /* Parse arguments using flexible parser */ if (parseHashFieldExpireArgs(c, &parse_flags, &expire_time, &expire_time_pos, &first_field_pos, &num_fields, HASH_CMD_HGETEX) != C_OK) return; /* Non-existing keys and empty hashes are the same thing. Reply null if the * key does not exist.*/ if (!o) { addReplyArrayLen(c, num_fields); for (int i = 0; i < num_fields; i++) addReplyNull(c); return; } if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); oldlen = hashTypeLength(o, 0); if (parse_flags) hashTypeSetExInit(c->argv[1], o, c, c->db, 0, &setex); addReplyArrayLen(c, num_fields); for (int i = first_field_pos; i < first_field_pos + num_fields; i++) { const int flags = HFE_LAZY_NO_NOTIFICATION | HFE_LAZY_NO_SIGNAL | HFE_LAZY_AVOID_HASH_DEL | HFE_LAZY_NO_UPDATE_KEYSIZES | HFE_LAZY_NO_UPDATE_ALLOCSIZES; sds field = c->argv[i]->ptr; int res = addHashFieldToReply(c, o, c->argv[i]->ptr, flags); expired += (res == GETF_EXPIRED); /* Set expiration only if the field exists and not expired lazily. */ if (res == GETF_OK && parse_flags) { if (parse_flags & HFE_PERSIST) expire_time = EB_EXPIRE_TIME_INVALID; res = hashTypeSetEx(o, field, expire_time, &setex); deleted += (res == HSETEX_DELETED); updated += (res == HSETEX_OK); } } if (parse_flags) hashTypeSetExDone(&setex); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); /* Exit early if no modification has been made. */ if (expired == 0 && deleted == 0 && updated == 0) return; server.dirty += deleted + updated; keyModified(c, c->db, c->argv[1], o, 1); /* This command will never be propagated as it is. It will be propagated as * HDELs when fields are lazily expired or deleted, if the new timestamp is * in the past. HDEL's will be emitted as part of addHashFieldToReply() * or hashTypeSetEx() in this case. * * If PERSIST flags is used, it will be propagated as HPERSIST command. * IF EX/EXAT/PX/PXAT flags are used, it will be replicated as HPEXPRITEAT. */ if (expired) notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); if (updated) { /* Build canonical command for propagation */ int canonical_argc; robj **canonical_argv; int idx = 0; if (parse_flags & HFE_PERSIST) { notifyKeyspaceEvent(NOTIFY_HASH, "hpersist", c->argv[1], c->db->id); /* Build canonical HPERSIST command: HPERSIST key FIELDS numfields field1 field2 ... */ canonical_argc = 4 + num_fields; canonical_argv = zmalloc(sizeof(robj*) * canonical_argc); canonical_argv[idx++] = shared.hpersist; incrRefCount(shared.hpersist); canonical_argv[idx++] = c->argv[1]; /* key */ incrRefCount(c->argv[1]); } else { notifyKeyspaceEvent(NOTIFY_HASH, "hexpire", c->argv[1], c->db->id); /* Build canonical HPEXPIREAT command: HPEXPIREAT key timestamp FIELDS numfields field1 field2 ... */ canonical_argc = 5 + num_fields; canonical_argv = zmalloc(sizeof(robj*) * canonical_argc); canonical_argv[idx++] = shared.hpexpireat; incrRefCount(shared.hpexpireat); canonical_argv[idx++] = c->argv[1]; /* key */ incrRefCount(c->argv[1]); canonical_argv[idx++] = createStringObjectFromLongLong(expire_time); /* timestamp */ } canonical_argv[idx++] = shared.fields; incrRefCount(shared.fields); canonical_argv[idx++] = createStringObjectFromLongLong(num_fields); for (int i = 0; i < num_fields; i++) { canonical_argv[idx++] = c->argv[first_field_pos + i]; incrRefCount(c->argv[first_field_pos + i]); } replaceClientCommandVector(c, canonical_argc, canonical_argv); } else if (deleted) { /* If we are here, fields are deleted because new timestamp was in the * past. HDELs are already propagated as part of hashTypeSetEx(). */ notifyKeyspaceEvent(NOTIFY_HASH, "hdel", c->argv[1], c->db->id); preventCommandPropagation(c); } /* Key may become empty due to lazy expiry in addHashFieldToReply() * or the new expiration time is in the past.*/ newlen = hashTypeLength(o, 0); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldlen, newlen); if (newlen == 0) { dbDelete(c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } } void hdelCommand(client *c) { kvobj *o; int j, deleted = 0, keyremoved = 0; size_t oldsize = 0; if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; int64_t oldLen = (int64_t) hashTypeLength(o, 0); if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); /* Hash field expiration is optimized to avoid frequent update global HFE DS for * each field deletion. Eventually active-expiration will run and update or remove * the hash from global HFE DS gracefully. Nevertheless, statistic "subexpiry" * might reflect wrong number of hashes with HFE to the user if it is the last * field with expiration. The following logic checks if this is indeed the last * field with expiration and removes it from global HFE DS. */ int isHFE = hashTypeIsFieldsWithExpire(o); for (j = 2; j < c->argc; j++) { if (hashTypeDelete(o,c->argv[j]->ptr)) { deleted++; if (hashTypeLength(o, 0) == 0) { if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); /* del key but don't update KEYSIZES. Else it will decr wrong bin in histogram */ dbDeleteSkipKeysizesUpdate(c->db, c->argv[1]); keyremoved = 1; break; } } } if (server.memory_tracking_per_slot && !keyremoved) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); if (deleted) { int64_t newLen = -1; /* The value -1 indicates that the key is deleted. */ keyModified(c, c->db, c->argv[1], keyremoved ? NULL : o, 1); notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id); if (keyremoved) { notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } else { if (isHFE && (hashTypeIsFieldsWithExpire(o) == 0)) /* is it last HFE */ estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o); newLen = oldLen - deleted; } updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldLen, newLen); server.dirty += deleted; } addReplyLongLong(c,deleted); } void hlenCommand(client *c) { kvobj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; addReplyLongLong(c,hashTypeLength(o, 0)); } void hstrlenCommand(client *c) { kvobj *o; unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; GetFieldRes res = hashTypeGetValue(c->db, o, c->argv[2]->ptr, &vstr, &vlen, &vll, HFE_LAZY_EXPIRE, NULL); if (res == GETF_NOT_FOUND || res == GETF_EXPIRED || res == GETF_EXPIRED_HASH) { addReply(c, shared.czero); return; } size_t len = vstr ? vlen : sdigits10(vll); addReplyLongLong(c,len); } static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) { if (hi->encoding == OBJ_ENCODING_LISTPACK || hi->encoding == OBJ_ENCODING_LISTPACK_EX) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; hashTypeCurrentFromListpack(hi, what, &vstr, &vlen, &vll, NULL); if (vstr) addReplyBulkCBuffer(c, vstr, vlen); else addReplyBulkLongLong(c, vll); } else if (hi->encoding == OBJ_ENCODING_HT) { char *value; size_t len; hashTypeCurrentFromHashTable(hi, what, &value, &len, NULL); addReplyBulkCBuffer(c, value, len); } else { serverPanic("Unknown hash encoding"); } } void genericHgetallCommand(client *c, int flags) { kvobj *o; hashTypeIterator hi; int length, count = 0; size_t oldsize = 0; robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ? shared.emptymap[c->resp] : shared.emptyarray; if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyResp)) == NULL || checkType(c,o,OBJ_HASH)) return; /* We return a map if the user requested keys and values, like in the * HGETALL case. Otherwise to use a flat array makes more sense. */ if ((length = hashTypeLength(o, 1 /*subtractExpiredFields*/)) == 0) { addReply(c, emptyResp); return; } if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) { addReplyMapLen(c, length); } else { addReplyArrayLen(c, length); } if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); hashTypeInitIterator(&hi, o); while (hashTypeNext(&hi, 1 /*skipExpiredFields*/) != C_ERR) { if (flags & OBJ_HASH_KEY) { addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY); count++; } if (flags & OBJ_HASH_VALUE) { addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE); count++; } } hashTypeResetIterator(&hi); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); /* Make sure we returned the right number of elements. */ if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) count /= 2; serverAssert(count == length); } void hkeysCommand(client *c) { genericHgetallCommand(c,OBJ_HASH_KEY); } void hvalsCommand(client *c) { genericHgetallCommand(c,OBJ_HASH_VALUE); } void hgetallCommand(client *c) { genericHgetallCommand(c,OBJ_HASH_KEY|OBJ_HASH_VALUE); } void hexistsCommand(client *c) { kvobj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; addReply(c,hashTypeExists(c->db,o,c->argv[2]->ptr,HFE_LAZY_EXPIRE, NULL) ? shared.cone : shared.czero); } void hscanCommand(client *c) { kvobj *o; unsigned long long cursor; size_t oldsize = 0; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || checkType(c,o,OBJ_HASH)) return; if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(o); scanGenericCommand(c,o,cursor); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); } static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) { for (unsigned long i = 0; i < count; i++) { if (vals && c->resp > 2) addReplyArrayLen(c,2); if (keys[i].sval) addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen); else addReplyBulkLongLong(c, keys[i].lval); if (vals) { if (vals[i].sval) addReplyBulkCBuffer(c, vals[i].sval, vals[i].slen); else addReplyBulkLongLong(c, vals[i].lval); } } } /* How many times bigger should be the hash compared to the requested size * for us to not use the "remove elements" strategy? Read later in the * implementation for more info. */ #define HRANDFIELD_SUB_STRATEGY_MUL 3 /* If client is trying to ask for a very large number of random elements, * queuing may consume an unlimited amount of memory, so we want to limit * the number of randoms per time. */ #define HRANDFIELD_RANDOM_SAMPLE_LIMIT 1000 void hrandfieldWithCountCommand(client *c, long l, int withvalues) { unsigned long count, size; int uniq = 1; kvobj *hash; size_t oldsize = 0; if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || checkType(c,hash,OBJ_HASH)) return; if(l >= 0) { count = (unsigned long) l; } else { count = -l; uniq = 0; } /* Delete all expired fields. If the entire hash got deleted then return empty array. */ if (hashTypeExpireIfNeeded(c->db, hash)) { addReply(c, shared.emptyarray); return; } /* Delete expired fields */ size = hashTypeLength(hash, 0); /* If count is zero, serve it ASAP to avoid special cases later. */ if (count == 0) { addReply(c,shared.emptyarray); return; } if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(hash); /* CASE 1: The count was negative, so the extraction method is just: * "return N random elements" sampling the whole set every time. * This case is trivial and can be served without auxiliary data * structures. This case is the only one that also needs to return the * elements in random order. */ if (!uniq || count == 1) { if (withvalues && c->resp == 2) addReplyArrayLen(c, count*2); else addReplyArrayLen(c, count); if (hash->encoding == OBJ_ENCODING_HT) { while (count--) { dictEntry *de = dictGetFairRandomKey(hash->ptr); Entry *entry = dictGetKey(de); sds fieldStr = entryGetField(entry); if (withvalues && c->resp > 2) addReplyArrayLen(c,2); addReplyBulkCBuffer(c, fieldStr, sdslen(fieldStr)); if (withvalues) { sds value = entryGetValue(entry); addReplyBulkCBuffer(c, value, sdslen(value)); } if (c->flags & CLIENT_CLOSE_ASAP) break; } } else if (hash->encoding == OBJ_ENCODING_LISTPACK || hash->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEntry *keys, *vals = NULL; unsigned long limit, sample_count; unsigned char *lp = hashTypeListpackGetLp(hash); int tuple_len = hash->encoding == OBJ_ENCODING_LISTPACK ? 2 : 3; limit = count > HRANDFIELD_RANDOM_SAMPLE_LIMIT ? HRANDFIELD_RANDOM_SAMPLE_LIMIT : count; keys = zmalloc(sizeof(listpackEntry)*limit); if (withvalues) vals = zmalloc(sizeof(listpackEntry)*limit); while (count) { sample_count = count > limit ? limit : count; count -= sample_count; lpRandomPairs(lp, sample_count, keys, vals, tuple_len); hrandfieldReplyWithListpack(c, sample_count, keys, vals); if (c->flags & CLIENT_CLOSE_ASAP) break; } zfree(keys); zfree(vals); } goto out; } /* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */ long reply_size = count < size ? count : size; if (withvalues && c->resp == 2) addReplyArrayLen(c, reply_size*2); else addReplyArrayLen(c, reply_size); /* CASE 2: * The number of requested elements is greater than the number of * elements inside the hash: simply return the whole hash. */ if(count >= size) { hashTypeIterator hi; hashTypeInitIterator(&hi, hash); while (hashTypeNext(&hi, 0) != C_ERR) { if (withvalues && c->resp > 2) addReplyArrayLen(c,2); addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY); if (withvalues) addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE); } hashTypeResetIterator(&hi); goto out; } /* CASE 2.5 listpack only. Sampling unique elements, in non-random order. * Listpack encoded hashes are meant to be relatively small, so * HRANDFIELD_SUB_STRATEGY_MUL isn't necessary and we rather not make * copies of the entries. Instead, we emit them directly to the output * buffer. * * And it is inefficient to repeatedly pick one random element from a * listpack in CASE 4. So we use this instead. */ if (hash->encoding == OBJ_ENCODING_LISTPACK || hash->encoding == OBJ_ENCODING_LISTPACK_EX) { unsigned char *lp = hashTypeListpackGetLp(hash); int tuple_len = hash->encoding == OBJ_ENCODING_LISTPACK ? 2 : 3; listpackEntry *keys, *vals = NULL; keys = zmalloc(sizeof(listpackEntry)*count); if (withvalues) vals = zmalloc(sizeof(listpackEntry)*count); serverAssert(lpRandomPairsUnique(lp, count, keys, vals, tuple_len) == count); hrandfieldReplyWithListpack(c, count, keys, vals); zfree(keys); zfree(vals); goto out; } /* CASE 3: * The number of elements inside the hash of type dict is not greater than * HRANDFIELD_SUB_STRATEGY_MUL times the number of requested elements. * In this case we create an array of dictEntry pointers from the original hash, * and subtract random elements to reach the requested number of elements. * * This is done because if the number of requested elements is just * a bit less than the number of elements in the hash, the natural approach * used into CASE 4 is highly inefficient. */ if (count*HRANDFIELD_SUB_STRATEGY_MUL > size) { /* Hashtable encoding (generic implementation) */ dict *ht = hash->ptr; dictIterator di; dictEntry *de; unsigned long idx = 0; /* Allocate a temporary array of pointers to stored key-values in dict and * assist it to remove random elements to reach the right count. */ struct FieldValPair { sds field; sds value; } *pairs = zmalloc(sizeof(struct FieldValPair) * size); /* Add all the elements into the temporary array. */ dictInitIterator(&di, ht); while((de = dictNext(&di)) != NULL) { Entry *e = dictGetKey(de); pairs[idx++] = (struct FieldValPair) {entryGetField(e), entryGetValue(e)}; } dictResetIterator(&di); /* Remove random elements to reach the right count. */ while (size > count) { unsigned long toDiscardIdx = rand() % size; pairs[toDiscardIdx] = pairs[--size]; } /* Reply with what's in the array */ for (idx = 0; idx < size; idx++) { if (withvalues && c->resp > 2) addReplyArrayLen(c,2); addReplyBulkCBuffer(c, pairs[idx].field, sdslen(pairs[idx].field)); if (withvalues) addReplyBulkCBuffer(c, pairs[idx].value, sdslen(pairs[idx].value)); } zfree(pairs); } /* CASE 4: We have a big hash compared to the requested number of elements. * In this case we can simply get random elements from the hash and add * to the temporary hash, trying to eventually get enough unique elements * to reach the specified count. */ else { /* Allocate temporary dictUnique to find unique elements. Just keep ref * to key-value from the original hash. This dict relaxes hash function * to be based on field's pointer */ dictType uniqueDictType = { .hashFunction = dictPtrHash }; dict *dictUnique = dictCreate(&uniqueDictType); dictExpand(dictUnique, count); /* Hashtable encoding (generic implementation) */ unsigned long added = 0; while(added < count) { dictEntry *de = dictGetFairRandomKey(hash->ptr); serverAssert(de != NULL); Entry *e = dictGetKey(de); sds field = entryGetField(e); sds value = entryGetValue(e); /* Try to add the object to the dictionary. If it already exists * free it, otherwise increment the number of objects we have * in the result dictionary. */ if (dictAdd(dictUnique, field, value) != DICT_OK) continue; added++; /* We can reply right away, so that we don't need to store the value in the dict. */ if (withvalues && c->resp > 2) addReplyArrayLen(c,2); addReplyBulkCBuffer(c, field, sdslen(field)); if (withvalues) addReplyBulkCBuffer(c, value, sdslen(value)); } /* Release memory */ dictRelease(dictUnique); } out: if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(hash)); } /* * HRANDFIELD - Return a random field from the hash value stored at key. * CLI usage: HRANDFIELD key [ [WITHVALUES]] * * Considerations for the current imp of HRANDFIELD & HFE feature: * HRANDFIELD might access any of the fields in the hash as some of them might * be expired. And so the Implementation of HRANDFIELD along with HFEs * might be one of the two options: * 1. Expire hash-fields before diving into handling HRANDFIELD. * 2. Refine HRANDFIELD cases to deal with expired fields. * * Regarding the first option, as reference, the command RANDOMKEY also declares * on O(1) complexity, yet might be stuck on a very long (but not infinite) loop * trying to find non-expired keys. Furthermore RANDOMKEY also evicts expired keys * along the way even though it is categorized as a read-only command. Note that * the case of HRANDFIELD is more lightweight versus RANDOMKEY since HFEs have * much more effective and aggressive active-expiration for fields behind. * * The second option introduces additional implementation complexity to HRANDFIELD. * We could further refine HRANDFIELD cases to differentiate between scenarios * with many expired fields versus few expired fields, and adjust based on the * percentage of expired fields. However, this approach could still lead to long * loops or necessitate expiring fields before selecting them. For the “lightweight” * cases it is also expected to have a lightweight expiration. * * Considering the pros and cons, and the fact that HRANDFIELD is an infrequent * command (particularly with HFEs) and the fact we have effective active-expiration * behind for hash-fields, it is better to keep it simple and choose the option #1. */ void hrandfieldCommand(client *c) { long l; int withvalues = 0; kvobj *hash; CommonEntry ele; size_t oldsize = 0; if (c->argc >= 3) { if (getRangeLongFromObjectOrReply(c,c->argv[2],-LONG_MAX,LONG_MAX,&l,NULL) != C_OK) return; if (c->argc > 4 || (c->argc == 4 && strcasecmp(c->argv[3]->ptr,"withvalues"))) { addReplyErrorObject(c,shared.syntaxerr); return; } else if (c->argc == 4) { withvalues = 1; if (l < -LONG_MAX/2 || l > LONG_MAX/2) { addReplyError(c,"value is out of range"); return; } } hrandfieldWithCountCommand(c, l, withvalues); return; } /* Handle variant without argument. Reply with simple bulk string */ if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))== NULL || checkType(c,hash,OBJ_HASH)) { return; } /* Delete all expired fields. If the entire hash got deleted then return null. */ if (hashTypeExpireIfNeeded(c->db, hash)) { addReply(c,shared.null[c->resp]); return; } if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(hash); hashTypeRandomElement(hash,hashTypeLength(hash, 0),&ele,NULL); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(hash)); if (ele.sval) addReplyBulkCBuffer(c, ele.sval, ele.slen); else addReplyBulkLongLong(c, ele.lval); } /*----------------------------------------------------------------------------- * Hash Field with optional expiry (based on entry) *----------------------------------------------------------------------------*/ static ExpireMeta* hentryGetExpireMeta(const eItem e) { /* extract the expireMeta from the field (entry) */ return entryRefExpiryMeta((Entry *)e); } /* Remove TTL from the field. Assumed ExpireMeta is attached and has valid value */ static void hfieldPersist(robj *hashObj, Entry *entry) { uint64_t fieldExpireTime = entryGetExpiry(entry); if (fieldExpireTime == EB_EXPIRE_TIME_INVALID) return; /* if field is set with expire, then dict must has HFE metadata attached */ dict *d = hashObj->ptr; htMetadataEx *dictExpireMeta = htGetMetadataEx(d); /* If field has valid expiry then dict must have valid metadata as well */ serverAssert(dictExpireMeta->expireMeta.trash == 0); /* Remove field from private HFE DS */ ebRemove(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, entry); /* Don't have to update global HFE DS. It's unnecessary. Implementing this * would introduce significant complexity and overhead for an operation that * isn't critical. In the worst case scenario, the hash will be efficiently * updated later by an active-expire operation, or it will be removed by the * hash's dbGenericDelete() function. */ } /*----------------------------------------------------------------------------- * Hash Field Expiration (HFE) *----------------------------------------------------------------------------*/ /* Can be called either by active-expire cron job or query from the client */ static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen) { robj *argv[] = { shared.hdel, createStringObject((char*) key, sdslen(key)), createStringObject(field, fieldLen) }; enterExecutionUnit(1, 0); int prev_replication_allowed = server.replication_allowed; server.replication_allowed = 1; alsoPropagate(db->id,argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); server.replication_allowed = prev_replication_allowed; exitExecutionUnit(); /* Propagate the HDEL command */ postExecutionUnitOperations(); decrRefCount(argv[1]); decrRefCount(argv[2]); } /* Called during active expiration of hash-fields. Propagate to replica & Delete. */ static ExpireAction onFieldExpire(eItem item, void *ctx) { OnFieldExpireCtx *expCtx = ctx; Entry *e = item; kvobj *kv = expCtx->hashObj; size_t oldsize = 0; sds key = kvobjGetKey(kv); if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(kv); sds field = entryGetField(e); propagateHashFieldDeletion(expCtx->db, key, field, sdslen(field)); /* update keysizes */ unsigned long l = hashTypeLength(expCtx->hashObj, 0); updateKeysizesHist(expCtx->db, getKeySlot(key), OBJ_HASH, l, l - 1); serverAssert(hashTypeDelete(expCtx->hashObj, field) == 1); if (server.memory_tracking_per_slot) updateSlotAllocSize(expCtx->db, getKeySlot(key), oldsize, hashTypeAllocSize(kv)); server.stat_expired_subkeys++; return ACT_REMOVE_EXP_ITEM; } /* Retrieve the ExpireMeta associated with the hash. * The caller is responsible for ensuring that it is indeed attached. */ ExpireMeta *hashGetExpireMeta(const eItem hash) { robj *hashObj = (robj *)hash; if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = hashObj->ptr; return &lpt->meta; } else if (hashObj->encoding == OBJ_ENCODING_HT) { dict *d = hashObj->ptr; htMetadataEx *dictExpireMeta = htGetMetadataEx(d); return &dictExpireMeta->expireMeta; } else { serverPanic("Unknown encoding: %d", hashObj->encoding); } } /* Generic structure to hold parsed arguments for all hash field commands */ typedef struct { /* FIELDS arguments */ int fieldsPos; /* Position of FIELDS keyword (-1 if not found) */ int numFieldsPos; /* Position of numfields argument */ int firstFieldPos; /* Position of first field */ int fieldCount; /* Number of fields */ /* HEXPIRE family arguments */ int expireTimePos; /* Position of expire time argument */ long long expireTime; /* Parsed expire time */ int expireCondition; /* HFE_NX, HFE_XX, HFE_GT, HFE_LT */ } HashCommandArgs; /* Parser for HEXPIRE family commands with flexible keyword ordering. * Returns C_OK on success, C_ERR on error (with reply sent). */ static int parseHashCommandArgs(client *c, HashCommandArgs *args, long long basetime, int unit) { memset(args, 0, sizeof(*args)); args->fieldsPos = -1; args->expireTimePos = 2; if (parseExpireTime(c, c->argv[2], unit, basetime, &args->expireTime) != C_OK) { return C_ERR; } /* Parse remaining arguments starting from position 3 */ for (int i = 3; i < c->argc; i++) { char *arg = c->argv[i]->ptr; /* FIELDS keyword - supported by ALL hash field commands */ if (!strcasecmp(arg, "FIELDS")) { if (args->fieldsPos != -1) { addReplyError(c, "FIELDS keyword specified multiple times"); return C_ERR; } if (i >= c->argc - 2) { addReplyError(c, "FIELDS requires at least numfields and one field argument"); return C_ERR; } args->fieldsPos = i; args->numFieldsPos = i + 1; long numFields; if (getRangeLongFromObjectOrReply(c, c->argv[args->numFieldsPos], 1, INT_MAX, &numFields, "Parameter `numFields` should be greater than 0") != C_OK) return C_ERR; args->fieldCount = (int)numFields; args->firstFieldPos = i + 2; /* Check bounds - we must have exactly the right number of fields */ if (args->firstFieldPos + args->fieldCount > c->argc) { addReplyError(c, "wrong number of arguments"); return C_ERR; } /* Skip over the field arguments */ i = args->firstFieldPos + args->fieldCount - 1; continue; } /* Expiration condition keywords - validation moved outside loop for performance */ if (!strcasecmp(arg, "NX")) { args->expireCondition |= HFE_NX; continue; } else if (!strcasecmp(arg, "XX")) { args->expireCondition |= HFE_XX; continue; } else if (!strcasecmp(arg, "GT")) { args->expireCondition |= HFE_GT; continue; } else if (!strcasecmp(arg, "LT")) { args->expireCondition |= HFE_LT; continue; } addReplyErrorFormat(c, "unknown argument: %s", (char*) c->argv[i]->ptr); return C_ERR; } if (__builtin_popcount(args->expireCondition & (HFE_NX|HFE_XX|HFE_GT|HFE_LT)) > 1) { addReplyError(c, "Multiple condition flags specified"); return C_ERR; } return C_OK; } /* HTTL key */ static void httlGenericCommand(client *c, const char *cmd, long long basetime, int unit){ UNUSED(cmd); kvobj *hashObj; long numFields = 0, numFieldsAt = 3; /* Read the hash object */ hashObj = lookupKeyRead(c->db, c->argv[1]); if (checkType(c, hashObj, OBJ_HASH)) return; if (strcasecmp(c->argv[numFieldsAt-1]->ptr, "FIELDS")) { addReplyError(c, "Mandatory argument FIELDS is missing or not at the right position"); return; } /* Read number of fields */ if (getRangeLongFromObjectOrReply(c, c->argv[numFieldsAt], 1, LONG_MAX, &numFields, "Number of fields must be a positive integer") != C_OK) return; /* Verify `numFields` is consistent with number of arguments */ if (numFields != (c->argc - numFieldsAt - 1)) { addReplyError(c, "The `numfields` parameter must match the number of arguments"); return; } /* Non-existing keys and empty hashes are the same thing. It also means * fields in the command don't exist in the hash key. */ if (!hashObj) { addReplyArrayLen(c, numFields); for (int i = 0; i < numFields; i++) { addReplyLongLong(c, HFE_GET_NO_FIELD); } return; } if (hashObj->encoding == OBJ_ENCODING_LISTPACK) { void *lp = hashObj->ptr; addReplyArrayLen(c, numFields); for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt+1+i]->ptr; void *fptr = lpFirst(lp); if (fptr != NULL) fptr = lpFind(lp, fptr, (unsigned char *) field, sdslen(field), 1); if (!fptr) addReplyLongLong(c, HFE_GET_NO_FIELD); else addReplyLongLong(c, HFE_GET_NO_TTL); } return; } else if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = hashObj->ptr; addReplyArrayLen(c, numFields); for (int i = 0 ; i < numFields ; i++) { long long expire; sds field = c->argv[numFieldsAt+1+i]->ptr; void *fptr = lpFirst(lpt->lp); if (fptr != NULL) fptr = lpFind(lpt->lp, fptr, (unsigned char *) field, sdslen(field), 2); if (!fptr) { addReplyLongLong(c, HFE_GET_NO_FIELD); continue; } fptr = lpNext(lpt->lp, fptr); serverAssert(fptr); fptr = lpNext(lpt->lp, fptr); serverAssert(fptr && lpGetIntegerValue(fptr, &expire)); if (expire == HASH_LP_NO_TTL) { addReplyLongLong(c, HFE_GET_NO_TTL); continue; } if (expire <= commandTimeSnapshot()) { addReplyLongLong(c, HFE_GET_NO_FIELD); continue; } if (unit == UNIT_SECONDS) addReplyLongLong(c, (expire + 999 - basetime) / 1000); else addReplyLongLong(c, (expire - basetime)); } return; } else if (hashObj->encoding == OBJ_ENCODING_HT) { dict *d = hashObj->ptr; size_t oldsize = dictMemUsage(d); addReplyArrayLen(c, numFields); for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt+1+i]->ptr; dictEntry *de = dictFind(d, field); if (de == NULL) { addReplyLongLong(c, HFE_GET_NO_FIELD); continue; } Entry *entry = dictGetKey(de); uint64_t expire = entryGetExpiry(entry); if (expire == EB_EXPIRE_TIME_INVALID) { addReplyLongLong(c, HFE_GET_NO_TTL); /* no ttl */ continue; } if ( (long long) expire < commandTimeSnapshot()) { addReplyLongLong(c, HFE_GET_NO_FIELD); continue; } if (unit == UNIT_SECONDS) addReplyLongLong(c, (expire + 999 - basetime) / 1000); else addReplyLongLong(c, (expire - basetime)); } if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, dictMemUsage(d)); return; } else { serverPanic("Unknown encoding: %d", hashObj->encoding); } } /* This is the generic command implementation for HEXPIRE, HPEXPIRE, HEXPIREAT * and HPEXPIREAT. Because the command second argument may be relative or absolute * the "basetime" argument is used to signal what the base time is (either 0 * for *AT variants of the command, or the current time for relative expires). * * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for * the argv[2] parameter. The basetime is always specified in milliseconds. * * PROPAGATE TO REPLICA: * The command will be translated into HPEXPIREAT and the expiration time will be * converted to absolute time in milliseconds. * * As we need to propagate H(P)EXPIRE(AT) command to the replica, each field that * is mentioned in the command should be categorized into one of the four options: * 1. Field’s expiration time updated successfully - Propagate it to replica as * part of the HPEXPIREAT command. * 2. The field got deleted since the time is in the past - propagate also HDEL * command to delete the field. Also remove the field from the propagated * HPEXPIREAT command. * 3. Condition not met for the field - Remove the field from the propagated * HPEXPIREAT command. * 4. Field doesn't exists - Remove the field from propagated HPEXPIREAT command. * * If none of the provided fields match option #1, that is provided time of the * command is in the past, then avoid propagating the HPEXPIREAT command to the * replica. * * This approach is aligned with existing EXPIRE command. If a given key has already * expired, then DEL will be propagated instead of EXPIRE command. If condition * not met, then command will be rejected. Otherwise, EXPIRE command will be * propagated for given key. */ static void hexpireGenericCommand(client *c, long long basetime, int unit) { HashCommandArgs args; int fieldsNotSet = 0, updated = 0, deleted = 0; int64_t oldlen, newlen; robj *keyArg = c->argv[1]; size_t oldsize = 0; /* Read the hash object */ kvobj *hashObj = lookupKeyWrite(c->db, keyArg); if (checkType(c, hashObj, OBJ_HASH)) return; /* Parse arguments using flexible keyword-based parsing */ if (parseHashCommandArgs(c, &args, basetime, unit) != C_OK) return; /* Non-existing keys and empty hashes are the same thing. It also means * fields in the command don't exist in the hash key. */ if (!hashObj) { addReplyArrayLen(c, args.fieldCount); for (int i = 0; i < args.fieldCount; i++) { addReplyLongLong(c, HSETEX_NO_FIELD); } return; } oldlen = hashTypeLength(hashObj, 0); if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(hashObj); HashTypeSetEx exCtx; hashTypeSetExInit(keyArg, hashObj, c, c->db, args.expireCondition, &exCtx); addReplyArrayLen(c, args.fieldCount); /* Lazy allocation of fieldsToRemove - only allocate when failures occur */ int *fieldsToRemove = NULL; int removeCount = 0; for (int i = 0; i < args.fieldCount; i++) { int fieldPos = args.firstFieldPos + i; sds field = c->argv[fieldPos]->ptr; SetExRes res = hashTypeSetEx(hashObj, field, args.expireTime, &exCtx); updated += (res == HSETEX_OK); deleted += (res == HSETEX_DELETED); if (unlikely(res != HSETEX_OK)) { if (fieldsToRemove == NULL) { fieldsToRemove = zmalloc(sizeof(int) * (args.fieldCount > 0 ? args.fieldCount : 1)); } /* Remember this field position for later removal from propagation */ fieldsToRemove[removeCount++] = fieldPos; fieldsNotSet = 1; } addReplyLongLong(c, res); } hashTypeSetExDone(&exCtx); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(keyArg->ptr), oldsize, hashTypeAllocSize(hashObj)); if (deleted + updated > 0) { server.dirty += deleted + updated; keyModified(c, c->db, keyArg, hashObj, 1); notifyKeyspaceEvent(NOTIFY_HASH, deleted ? "hdel" : "hexpire", keyArg, c->db->id); } newlen = (int64_t) hashTypeLength(hashObj, 0); if (newlen == 0) { newlen = -1; /* Del key but don't update KEYSIZES. Else it will decr wrong bin in histogram */ dbDeleteSkipKeysizesUpdate(c->db, keyArg); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyArg, c->db->id); } if (oldlen != newlen) updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldlen, newlen); /* Avoid propagating command if not even one field was updated (Either because * the time is in the past, and corresponding HDELs were sent, or conditions * not met) then it is useless and invalid to propagate command with no fields */ if (updated == 0) { preventCommandPropagation(c); zfree(fieldsToRemove); return; } /* Handle propagation using command rewriting * Rewrite to canonical HPEXPIREAT command */ if (c->cmd->proc != hpexpireatCommand) { rewriteClientCommandArgument(c, 0, shared.hpexpireat); robj *expireTimeObj = createStringObjectFromLongLong(args.expireTime); rewriteClientCommandArgument(c, args.expireTimePos, expireTimeObj); decrRefCount(expireTimeObj); } /* For partial failures, remove failed fields from the original command */ if (fieldsNotSet) { for (int i = removeCount - 1; i >= 0; i--) { rewriteClientCommandArgument(c, fieldsToRemove[i], NULL); } robj *newFieldCount = createStringObjectFromLongLong(updated); rewriteClientCommandArgument(c, args.fieldsPos + 1, newFieldCount); decrRefCount(newFieldCount); } if (fieldsToRemove) zfree(fieldsToRemove); } /* HPEXPIRE key milliseconds [ NX | XX | GT | LT] FIELDS numfields */ void hpexpireCommand(client *c) { hexpireGenericCommand(c,commandTimeSnapshot(),UNIT_MILLISECONDS); } /* HEXPIRE key seconds [NX | XX | GT | LT] FIELDS numfields */ void hexpireCommand(client *c) { hexpireGenericCommand(c,commandTimeSnapshot(),UNIT_SECONDS); } /* HEXPIREAT key unix-time-seconds [NX | XX | GT | LT] FIELDS numfields */ void hexpireatCommand(client *c) { hexpireGenericCommand(c,0,UNIT_SECONDS); } /* HPEXPIREAT key unix-time-milliseconds [NX | XX | GT | LT] FIELDS numfields */ void hpexpireatCommand(client *c) { hexpireGenericCommand(c,0,UNIT_MILLISECONDS); } /* for each specified field: get the remaining time to live in seconds*/ /* HTTL key FIELDS numfields */ void httlCommand(client *c) { httlGenericCommand(c, "httl", commandTimeSnapshot(), UNIT_SECONDS); } /* HPTTL key FIELDS numfields */ void hpttlCommand(client *c) { httlGenericCommand(c, "hpttl", commandTimeSnapshot(), UNIT_MILLISECONDS); } /* HEXPIRETIME key FIELDS numfields */ void hexpiretimeCommand(client *c) { httlGenericCommand(c, "hexpiretime", 0, UNIT_SECONDS); } /* HPEXPIRETIME key FIELDS numfields */ void hpexpiretimeCommand(client *c) { httlGenericCommand(c, "hexpiretime", 0, UNIT_MILLISECONDS); } /* HPERSIST key FIELDS numfields */ void hpersistCommand(client *c) { long numFields = 0, numFieldsAt = 3; int changed = 0; /* Used to determine whether to send a notification. */ /* Read the hash object */ kvobj *hashObj = lookupKeyWrite(c->db, c->argv[1]); if (checkType(c, hashObj, OBJ_HASH)) return; if (strcasecmp(c->argv[numFieldsAt-1]->ptr, "FIELDS")) { addReplyError(c, "Mandatory argument FIELDS is missing or not at the right position"); return; } /* Read number of fields */ if (getRangeLongFromObjectOrReply(c, c->argv[numFieldsAt], 1, LONG_MAX, &numFields, "Number of fields must be a positive integer") != C_OK) return; /* Verify `numFields` is consistent with number of arguments */ if (numFields != (c->argc - numFieldsAt - 1)) { addReplyError(c, "The `numfields` parameter must match the number of arguments"); return; } /* Non-existing keys and empty hashes are the same thing. It also means * fields in the command don't exist in the hash key. */ if (!hashObj) { addReplyArrayLen(c, numFields); for (int i = 0; i < numFields; i++) { addReplyLongLong(c, HFE_PERSIST_NO_FIELD); } return; } if (hashObj->encoding == OBJ_ENCODING_LISTPACK) { addReplyArrayLen(c, numFields); for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt + 1 + i]->ptr; unsigned char *fptr, *zl = hashObj->ptr; fptr = lpFirst(zl); if (fptr != NULL) fptr = lpFind(zl, fptr, (unsigned char *) field, sdslen(field), 1); if (!fptr) addReplyLongLong(c, HFE_PERSIST_NO_FIELD); else addReplyLongLong(c, HFE_PERSIST_NO_TTL); } return; } else if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { long long prevExpire; unsigned char *fptr, *vptr, *tptr; listpackEx *lpt = hashObj->ptr; size_t oldsize = 0; addReplyArrayLen(c, numFields); for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt + 1 + i]->ptr; fptr = lpFirst(lpt->lp); if (fptr != NULL) fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); if (!fptr) { addReplyLongLong(c, HFE_PERSIST_NO_FIELD); continue; } vptr = lpNext(lpt->lp, fptr); serverAssert(vptr); tptr = lpNext(lpt->lp, vptr); serverAssert(tptr && lpGetIntegerValue(tptr, &prevExpire)); if (prevExpire == HASH_LP_NO_TTL) { addReplyLongLong(c, HFE_PERSIST_NO_TTL); continue; } if (prevExpire < commandTimeSnapshot()) { addReplyLongLong(c, HFE_PERSIST_NO_FIELD); continue; } if (server.memory_tracking_per_slot) oldsize = hashTypeAllocSize(hashObj); listpackExUpdateExpiry(hashObj, field, fptr, vptr, HASH_LP_NO_TTL); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(hashObj)); addReplyLongLong(c, HFE_PERSIST_OK); changed = 1; } } else if (hashObj->encoding == OBJ_ENCODING_HT) { dict *d = hashObj->ptr; size_t oldsize = dictMemUsage(d); addReplyArrayLen(c, numFields); for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt + 1 + i]->ptr; dictEntry *de = dictFind(d, field); if (de == NULL) { addReplyLongLong(c, HFE_PERSIST_NO_FIELD); continue; } Entry *entry = dictGetKey(de); uint64_t expire = entryGetExpiry(entry); if (expire == EB_EXPIRE_TIME_INVALID) { addReplyLongLong(c, HFE_PERSIST_NO_TTL); continue; } /* Already expired. Pretend there is no such field */ if ( (long long) expire < commandTimeSnapshot()) { addReplyLongLong(c, HFE_PERSIST_NO_FIELD); continue; } hfieldPersist(hashObj, entry); addReplyLongLong(c, HFE_PERSIST_OK); changed = 1; } if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, dictMemUsage(d)); } else { serverPanic("Unknown encoding: %d", hashObj->encoding); } /* Generates a hpersist event if the expiry time associated with any field * has been successfully deleted. */ if (changed) { notifyKeyspaceEvent(NOTIFY_HASH, "hpersist", c->argv[1], c->db->id); keyModified(c, c->db, c->argv[1], hashObj, 1); server.dirty++; } }