diff options
Diffstat (limited to 'examples/redis-unstable/src/rdb.c')
| -rw-r--r-- | examples/redis-unstable/src/rdb.c | 4483 |
1 files changed, 0 insertions, 4483 deletions
diff --git a/examples/redis-unstable/src/rdb.c b/examples/redis-unstable/src/rdb.c deleted file mode 100644 index 7dc0e8e..0000000 --- a/examples/redis-unstable/src/rdb.c +++ /dev/null @@ -1,4483 +0,0 @@ -/* - * Copyright (c) 2009-Present, Redis Ltd. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. - */ - -#include "server.h" -#include "lzf.h" /* LZF compression library */ -#include "zipmap.h" -#include "endianconv.h" -#include "fpconv_dtoa.h" -#include "stream.h" -#include "functions.h" -#include "intset.h" /* Compact integer set structure */ -#include "bio.h" -#include "cluster_asm.h" -#include "keymeta.h" - -#include <math.h> -#include <fcntl.h> -#include <sys/types.h> -#include <sys/time.h> -#include <sys/resource.h> -#include <sys/wait.h> -#include <arpa/inet.h> -#include <sys/stat.h> -#include <sys/param.h> - -/* This macro is called when the internal RDB structure is corrupt */ -#define rdbReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__) -/* This macro is called when RDB read failed (possibly a short read) */ -#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__) - -/* This macro tells if we are in the context of a RESTORE command, and not loading an RDB or AOF. */ -#define isRestoreContext() \ - ((server.current_client == NULL || server.current_client->id == CLIENT_ID_AOF) ? 0 : 1) - -char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ -extern int rdbCheckMode; -void rdbCheckError(const char *fmt, ...); -void rdbCheckSetError(const char *fmt, ...); - -#ifdef __GNUC__ -void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__ ((format (printf, 3, 4))); -#endif -void rdbReportError(int corruption_error, int linenum, char *reason, ...) { - va_list ap; - char msg[1024]; - int len; - - len = snprintf(msg,sizeof(msg), - "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ", - (unsigned long long)server.loading_loaded_bytes, linenum); - va_start(ap,reason); - vsnprintf(msg+len,sizeof(msg)-len,reason,ap); - va_end(ap); - - if (isRestoreContext()) { - /* If we're in the context of a RESTORE command, just propagate the error. */ - /* log in VERBOSE, and return (don't exit). */ - serverLog(LL_VERBOSE, "%s", msg); - return; - } else if (rdbCheckMode) { - /* If we're inside the rdb checker, let it handle the error. */ - rdbCheckError("%s",msg); - } else if (rdbFileBeingLoaded) { - /* If we're loading an rdb file form disk, run rdb check (and exit) */ - serverLog(LL_WARNING, "%s", msg); - char *argv[2] = {"",rdbFileBeingLoaded}; - if (anetIsFifo(argv[1])) { - /* Cannot check RDB FIFO because we cannot reopen the FIFO and check already streamed data. */ - rdbCheckError("Cannot check RDB that is a FIFO: %s", argv[1]); - return; - } - redis_check_rdb_main(2,argv,NULL); - } else if (corruption_error) { - /* In diskless loading, in case of corrupt file, log and exit. */ - serverLog(LL_WARNING, "%s. Failure loading rdb format", msg); - } else { - /* In diskless loading, in case of a short read (not a corrupt - * file), log and proceed (don't exit). */ - serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg); - return; - } - serverLog(LL_WARNING, "Terminating server after rdb file reading failure."); - exit(1); -} - -ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) { - if (rdb && rioWrite(rdb,p,len) == 0) - return -1; - return len; -} - -int rdbSaveType(rio *rdb, unsigned char type) { - return rdbWriteRaw(rdb,&type,1); -} - -/* Load a "type" in RDB format, that is a one byte unsigned integer. - * This function is not only used to load object types, but also special - * "types" like the end-of-file type, the EXPIRE type, and so forth. */ -int rdbLoadType(rio *rdb) { - unsigned char type; - if (rioRead(rdb,&type,1) == 0) return -1; - return type; -} - -/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME - * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS - * opcode. On error -1 is returned, however this could be a valid time, so - * to check for loading errors the caller should call rioGetReadError() after - * calling this function. */ -time_t rdbLoadTime(rio *rdb) { - int32_t t32; - if (rioRead(rdb,&t32,4) == 0) return -1; - return (time_t)t32; -} - -ssize_t rdbSaveMillisecondTime(rio *rdb, long long t) { - int64_t t64 = (int64_t) t; - memrev64ifbe(&t64); /* Store in little endian. */ - return rdbWriteRaw(rdb,&t64,8); -} - -/* This function loads a time from the RDB file. It gets the version of the - * RDB because, unfortunately, before Redis 5 (RDB version 9), the function - * failed to convert data to/from little endian, so RDB files with keys having - * expires could not be shared between big endian and little endian systems - * (because the expire time will be totally wrong). The fix for this is just - * to call memrev64ifbe(), however if we fix this for all the RDB versions, - * this call will introduce an incompatibility for big endian systems: - * after upgrading to Redis version 5 they will no longer be able to load their - * own old RDB files. Because of that, we instead fix the function only for new - * RDB versions, and load older RDB versions as we used to do in the past, - * allowing big endian systems to load their own old RDB files. - * - * On I/O error the function returns LLONG_MAX, however if this is also a - * valid stored value, the caller should use rioGetReadError() to check for - * errors after calling this function. */ -long long rdbLoadMillisecondTime(rio *rdb, int rdbver) { - int64_t t64; - if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX; - if (rdbver >= 9) /* Check the top comment of this function. */ - memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ - return (long long)t64; -} - -/* Saves an encoded length. The first two bits in the first byte are used to - * hold the encoding type. See the RDB_* definitions for more information - * on the types of encoding. */ -int rdbSaveLen(rio *rdb, uint64_t len) { - unsigned char buf[2]; - size_t nwritten; - - if (len < (1<<6)) { - /* Save a 6 bit len */ - buf[0] = (len&0xFF)|(RDB_6BITLEN<<6); - if (rdbWriteRaw(rdb,buf,1) == -1) return -1; - nwritten = 1; - } else if (len < (1<<14)) { - /* Save a 14 bit len */ - buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6); - buf[1] = len&0xFF; - if (rdbWriteRaw(rdb,buf,2) == -1) return -1; - nwritten = 2; - } else if (len <= UINT32_MAX) { - /* Save a 32 bit len */ - buf[0] = RDB_32BITLEN; - if (rdbWriteRaw(rdb,buf,1) == -1) return -1; - uint32_t len32 = htonl(len); - if (rdbWriteRaw(rdb,&len32,4) == -1) return -1; - nwritten = 1+4; - } else { - /* Save a 64 bit len */ - buf[0] = RDB_64BITLEN; - if (rdbWriteRaw(rdb,buf,1) == -1) return -1; - len = htonu64(len); - if (rdbWriteRaw(rdb,&len,8) == -1) return -1; - nwritten = 1+8; - } - return nwritten; -} - - -/* Load an encoded length. If the loaded length is a normal length as stored - * with rdbSaveLen(), the read length is set to '*lenptr'. If instead the - * loaded length describes a special encoding that follows, then '*isencoded' - * is set to 1 and the encoding format is stored at '*lenptr'. - * - * See the RDB_ENC_* definitions in rdb.h for more information on special - * encodings. - * - * The function returns -1 on error, 0 on success. */ -int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) { - unsigned char buf[2]; - int type; - - if (isencoded) *isencoded = 0; - if (rioRead(rdb,buf,1) == 0) return -1; - type = (buf[0]&0xC0)>>6; - if (type == RDB_ENCVAL) { - /* Read a 6 bit encoding type. */ - if (isencoded) *isencoded = 1; - *lenptr = buf[0]&0x3F; - } else if (type == RDB_6BITLEN) { - /* Read a 6 bit len. */ - *lenptr = buf[0]&0x3F; - } else if (type == RDB_14BITLEN) { - /* Read a 14 bit len. */ - if (rioRead(rdb,buf+1,1) == 0) return -1; - *lenptr = ((buf[0]&0x3F)<<8)|buf[1]; - } else if (buf[0] == RDB_32BITLEN) { - /* Read a 32 bit len. */ - uint32_t len; - if (rioRead(rdb,&len,4) == 0) return -1; - *lenptr = ntohl(len); - } else if (buf[0] == RDB_64BITLEN) { - /* Read a 64 bit len. */ - uint64_t len; - if (rioRead(rdb,&len,8) == 0) return -1; - *lenptr = ntohu64(len); - } else { - rdbReportCorruptRDB( - "Unknown length encoding %d in rdbLoadLen()",type); - return -1; /* Never reached. */ - } - return 0; -} - -/* This is like rdbLoadLenByRef() but directly returns the value read - * from the RDB stream, signaling an error by returning RDB_LENERR - * (since it is a too large count to be applicable in any Redis data - * structure). */ -uint64_t rdbLoadLen(rio *rdb, int *isencoded) { - uint64_t len; - - if (rdbLoadLenByRef(rdb,isencoded,&len) == -1) return RDB_LENERR; - return len; -} - -/* Encodes the "value" argument as integer when it fits in the supported ranges - * for encoded types. If the function successfully encodes the integer, the - * representation is stored in the buffer pointer to by "enc" and the string - * length is returned. Otherwise 0 is returned. */ -int rdbEncodeInteger(long long value, unsigned char *enc) { - if (value >= -(1<<7) && value <= (1<<7)-1) { - enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8; - enc[1] = value&0xFF; - return 2; - } else if (value >= -(1<<15) && value <= (1<<15)-1) { - enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16; - enc[1] = value&0xFF; - enc[2] = (value>>8)&0xFF; - return 3; - } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) { - enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32; - enc[1] = value&0xFF; - enc[2] = (value>>8)&0xFF; - enc[3] = (value>>16)&0xFF; - enc[4] = (value>>24)&0xFF; - return 5; - } else { - return 0; - } -} - -/* Loads an integer-encoded object with the specified encoding type "enctype". - * The returned value changes according to the flags, see - * rdbGenericLoadStringObject() for more info. */ -void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr, size_t *usable) { - int plainFlag = flags & RDB_LOAD_PLAIN; - int sdsFlag = flags & RDB_LOAD_SDS; - int encode = flags & RDB_LOAD_ENC; - unsigned char enc[4]; - long long val; - - if (enctype == RDB_ENC_INT8) { - if (rioRead(rdb,enc,1) == 0) return NULL; - val = (signed char)enc[0]; - } else if (enctype == RDB_ENC_INT16) { - uint16_t v; - if (rioRead(rdb,enc,2) == 0) return NULL; - v = ((uint32_t)enc[0])| - ((uint32_t)enc[1]<<8); - val = (int16_t)v; - } else if (enctype == RDB_ENC_INT32) { - uint32_t v; - if (rioRead(rdb,enc,4) == 0) return NULL; - v = ((uint32_t)enc[0])| - ((uint32_t)enc[1]<<8)| - ((uint32_t)enc[2]<<16)| - ((uint32_t)enc[3]<<24); - val = (int32_t)v; - } else { - rdbReportCorruptRDB("Unknown RDB integer encoding type %d",enctype); - return NULL; /* Never reached. */ - } - if (plainFlag || sdsFlag) { - char buf[LONG_STR_SIZE], *p; - int len = ll2string(buf,sizeof(buf),val); - if (lenptr) *lenptr = len; - if (plainFlag) { - p = zmalloc_usable(len, usable); - } else { - debugServerAssert(sdsFlag); - p = sdsnewlen(SDS_NOINIT,len); - if (usable) *usable = sdsAllocSize(p); - } - - memcpy(p,buf,len); - return p; - } else if (encode) { - return createStringObjectFromLongLongForValue(val); - } else { - return createStringObjectFromLongLongWithSds(val); - } -} - -/* String objects in the form "2391" "-100" without any space and with a - * range of values that can fit in an 8, 16 or 32 bit signed value can be - * encoded as integers to save space */ -int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { - long long value; - if (string2ll(s, len, &value)) { - return rdbEncodeInteger(value, enc); - } else { - return 0; - } -} - -ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len, - size_t original_len) { - unsigned char byte; - ssize_t n, nwritten = 0; - - /* Data compressed! Let's save it on disk */ - byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF; - if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr; - nwritten += n; - - if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr; - nwritten += n; - - if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr; - nwritten += n; - - if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr; - nwritten += n; - - return nwritten; - -writeerr: - return -1; -} - -ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) { - size_t comprlen, outlen; - void *out; - - /* We require at least four bytes compression for this to be worth it */ - if (len <= 4) return 0; - outlen = len-4; - if ((out = zmalloc(outlen+1)) == NULL) return 0; - comprlen = lzf_compress(s, len, out, outlen); - if (comprlen == 0) { - zfree(out); - return 0; - } - ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len); - zfree(out); - return nwritten; -} - -/* Load an LZF compressed string in RDB format. The returned value - * changes according to 'flags'. For more info check the - * rdbGenericLoadStringObject() function. */ -void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr, size_t *usable) { - int plainFlag = flags & RDB_LOAD_PLAIN; - int sdsFlag = flags & RDB_LOAD_SDS; - int robjFlag = !(plainFlag || sdsFlag); /* not plain/sds */ - - uint64_t len, clen; - unsigned char *c = NULL; - char *val = NULL; - - if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; - if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; - if ((c = ztrymalloc(clen)) == NULL) { - serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen); - goto err; - } - - /* Allocate our target according to the uncompressed size. */ - if (plainFlag) { - val = ztrymalloc_usable(len, usable); - } else { - debugServerAssert(sdsFlag || robjFlag); - val = sdstrynewlen(SDS_NOINIT,len); - if (usable) *usable = sdsAllocSize(val); - } - - if (!val) { - serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len); - goto err; - } - - if (lenptr) *lenptr = len; - - /* Load the compressed representation and uncompress it to target. */ - if (rioRead(rdb,c,clen) == 0) goto err; - if (lzf_decompress(c,clen,val,len) != len) { - rdbReportCorruptRDB("Invalid LZF compressed string"); - goto err; - } - zfree(c); - - return (robjFlag) ? createObject(OBJ_STRING,val) : (void *) val; - -err: - zfree(c); - if (plainFlag) { - zfree(val); - } else { - debugServerAssert(sdsFlag || robjFlag); - sdsfree(val); - } - return NULL; -} - -/* Save a string object as [len][data] on disk. If the object is a string - * representation of an integer value we try to save it in a special form */ -ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) { - int enclen; - ssize_t n, nwritten = 0; - - /* Try integer encoding */ - if (len <= 11) { - unsigned char buf[5]; - if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) { - if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1; - return enclen; - } - } - - /* Try LZF compression - under 20 bytes it's unable to compress even - * aaaaaaaaaaaaaaaaaa so skip it */ - if (server.rdb_compression && len > 20) { - n = rdbSaveLzfStringObject(rdb,s,len); - if (n == -1) return -1; - if (n > 0) return n; - /* Return value of 0 means data can't be compressed, save the old way */ - } - - /* Store verbatim */ - if ((n = rdbSaveLen(rdb,len)) == -1) return -1; - nwritten += n; - if (len > 0) { - if (rdbWriteRaw(rdb,s,len) == -1) return -1; - nwritten += len; - } - return nwritten; -} - -/* Save a long long value as either an encoded string or a string. */ -ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) { - unsigned char buf[32]; - ssize_t n, nwritten = 0; - int enclen = rdbEncodeInteger(value,buf); - if (enclen > 0) { - return rdbWriteRaw(rdb,buf,enclen); - } else { - /* Encode as string */ - enclen = ll2string((char*)buf,32,value); - serverAssert(enclen < 32); - if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1; - nwritten += n; - if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1; - nwritten += n; - } - return nwritten; -} - -/* Like rdbSaveRawString() gets a Redis object instead. */ -ssize_t rdbSaveStringObject(rio *rdb, robj *obj) { - /* Avoid to decode the object, then encode it again, if the - * object is already integer encoded. */ - if (obj->encoding == OBJ_ENCODING_INT) { - return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr); - } else { - serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj)); - return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr)); - } -} - -/* Load a string object from an RDB file according to flags: - * - * RDB_LOAD_NONE (no flags): load an RDB object, unencoded. - * RDB_LOAD_ENC: If the returned type is a Redis object, try to - * encode it in a special way to be more memory - * efficient. When this flag is passed the function - * no longer guarantees that obj->ptr is an SDS string. - * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc() - * instead of a Redis object with an sds in it. - * RDB_LOAD_SDS: Return an SDS string instead of a Redis object. - * - * On I/O error NULL is returned. - */ -void *rdbGenericLoadStringObjectUsable(rio *rdb, int flags, size_t *lenptr, size_t *usable) { - void *buf; - int plainFlag = flags & RDB_LOAD_PLAIN; - int sdsFlag = flags & RDB_LOAD_SDS; - int robjFlag = !(plainFlag || sdsFlag); /* not plain/sds */ - - int isencoded; - unsigned long long len; - - len = rdbLoadLen(rdb,&isencoded); - if (len == RDB_LENERR) return NULL; - - if (isencoded) { - switch(len) { - case RDB_ENC_INT8: - case RDB_ENC_INT16: - case RDB_ENC_INT32: - return rdbLoadIntegerObject(rdb,len,flags,lenptr,usable); - case RDB_ENC_LZF: - return rdbLoadLzfStringObject(rdb,flags,lenptr,usable); - default: - rdbReportCorruptRDB("Unknown RDB string encoding type %llu",len); - return NULL; - } - } - - /* return robj */ - if (robjFlag) { - if (usable) *usable = 0; - robj *o = tryCreateStringObject(SDS_NOINIT,len); - if (!o) { - serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); - return NULL; - } - if (len && rioRead(rdb,o->ptr,len) == 0) { - decrRefCount(o); - return NULL; - } - return o; - } - - /* plain/sds */ - if (plainFlag) { - buf = ztrymalloc_usable(len, usable); - } else { - debugServerAssert(sdsFlag); - buf = sdstrynewlen(SDS_NOINIT,len); - if (usable) *usable = sdsAllocSize(buf); - } - - if (!buf) { - serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); - return NULL; - } - - if (lenptr) *lenptr = len; - if (len && rioRead(rdb,buf,len) == 0) { - if (plainFlag) - zfree(buf); - else - sdsfree(buf); - return NULL; - } - return buf; -} - -void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { - return rdbGenericLoadStringObjectUsable(rdb,flags,lenptr,NULL); -} - -robj *rdbLoadStringObject(rio *rdb) { - return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); -} - -robj *rdbLoadEncodedStringObject(rio *rdb) { - return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL); -} - -/* Save a double value. Doubles are saved as strings prefixed by an unsigned - * 8 bit integer specifying the length of the representation. - * This 8 bit integer has special values in order to specify the following - * conditions: - * 253: not a number - * 254: + inf - * 255: - inf - */ -ssize_t rdbSaveDoubleValue(rio *rdb, double val) { - unsigned char buf[128]; - int len; - - if (isnan(val)) { - buf[0] = 253; - len = 1; - } else if (!isfinite(val)) { - len = 1; - buf[0] = (val < 0) ? 255 : 254; - } else { - long long lvalue; - /* Integer printing function is much faster, check if we can safely use it. */ - if (double2ll(val, &lvalue)) - ll2string((char*)buf+1,sizeof(buf)-1,lvalue); - else { - const int dlen = fpconv_dtoa(val, (char*)buf+1); - buf[dlen+1] = '\0'; - } - buf[0] = strlen((char*)buf+1); - len = buf[0]+1; - } - return rdbWriteRaw(rdb,buf,len); -} - -/* For information about double serialization check rdbSaveDoubleValue() */ -int rdbLoadDoubleValue(rio *rdb, double *val) { - char buf[256]; - unsigned char len; - - if (rioRead(rdb,&len,1) == 0) return -1; - switch(len) { - case 255: *val = R_NegInf; return 0; - case 254: *val = R_PosInf; return 0; - case 253: *val = R_Nan; return 0; - default: - if (rioRead(rdb,buf,len) == 0) return -1; - buf[len] = '\0'; - if (sscanf(buf, "%lg", val)!=1) return -1; - return 0; - } -} - -/* Saves a double for RDB 8 or greater, where IE754 binary64 format is assumed. - * We just make sure the integer is always stored in little endian, otherwise - * the value is copied verbatim from memory to disk. - * - * Return -1 on error, the size of the serialized value on success. */ -int rdbSaveBinaryDoubleValue(rio *rdb, double val) { - memrev64ifbe(&val); - return rdbWriteRaw(rdb,&val,sizeof(val)); -} - -/* Loads a double from RDB 8 or greater. See rdbSaveBinaryDoubleValue() for - * more info. On error -1 is returned, otherwise 0. */ -int rdbLoadBinaryDoubleValue(rio *rdb, double *val) { - if (rioRead(rdb,val,sizeof(*val)) == 0) return -1; - memrev64ifbe(val); - return 0; -} - -/* Like rdbSaveBinaryDoubleValue() but single precision. */ -int rdbSaveBinaryFloatValue(rio *rdb, float val) { - memrev32ifbe(&val); - return rdbWriteRaw(rdb,&val,sizeof(val)); -} - -/* Like rdbLoadBinaryDoubleValue() but single precision. */ -int rdbLoadBinaryFloatValue(rio *rdb, float *val) { - if (rioRead(rdb,val,sizeof(*val)) == 0) return -1; - memrev32ifbe(val); - return 0; -} - -/* Save the object type of object "o". */ -int rdbSaveObjectType(rio *rdb, robj *o) { - switch (o->type) { - case OBJ_STRING: - return rdbSaveType(rdb,RDB_TYPE_STRING); - case OBJ_LIST: - if (o->encoding == OBJ_ENCODING_QUICKLIST || o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2); - else - serverPanic("Unknown list encoding"); - case OBJ_SET: - if (o->encoding == OBJ_ENCODING_INTSET) - return rdbSaveType(rdb,RDB_TYPE_SET_INTSET); - else if (o->encoding == OBJ_ENCODING_HT) - return rdbSaveType(rdb,RDB_TYPE_SET); - else if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb,RDB_TYPE_SET_LISTPACK); - else - serverPanic("Unknown set encoding"); - case OBJ_ZSET: - if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb,RDB_TYPE_ZSET_LISTPACK); - else if (o->encoding == OBJ_ENCODING_SKIPLIST) - return rdbSaveType(rdb,RDB_TYPE_ZSET_2); - else - serverPanic("Unknown sorted set encoding"); - case OBJ_HASH: - if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb,RDB_TYPE_HASH_LISTPACK); - else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) - return rdbSaveType(rdb,RDB_TYPE_HASH_LISTPACK_EX); - else if (o->encoding == OBJ_ENCODING_HT) { - if (hashTypeGetMinExpire(o, /*accurate*/ 1) == EB_EXPIRE_TIME_INVALID) - return rdbSaveType(rdb,RDB_TYPE_HASH); - else - return rdbSaveType(rdb,RDB_TYPE_HASH_METADATA); - } else - serverPanic("Unknown hash encoding"); - case OBJ_STREAM: - return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_4); - case OBJ_MODULE: - return rdbSaveType(rdb,RDB_TYPE_MODULE_2); - default: - serverPanic("Unknown object type"); - } - return -1; /* avoid warning */ -} - -/* Use rdbLoadType() to load a TYPE in RDB format, but returns -1 if the - * type is not specifically a valid Object Type. */ -int rdbLoadObjectType(rio *rdb) { - int type; - if ((type = rdbLoadType(rdb)) == -1) return -1; - if (!rdbIsObjectType(type)) return -1; - return type; -} - -/* This helper function serializes a consumer group Pending Entries List (PEL) - * into the RDB file. The 'nacks' argument tells the function if also persist - * the information about the not acknowledged message, or if to persist - * just the IDs: this is useful because for the global consumer group PEL - * we serialized the NACKs as well, but when serializing the local consumer - * PELs we just add the ID, that will be resolved inside the global PEL to - * put a reference to the same structure. */ -ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) { - ssize_t n, nwritten = 0; - - /* Number of entries in the PEL. */ - if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1; - nwritten += n; - - /* Save each entry. */ - raxIterator ri; - raxStart(&ri,pel); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - /* We store IDs in raw form as 128 big big endian numbers, like - * they are inside the radix tree key. */ - if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - if (nacks) { - streamNACK *nack = ri.data; - if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - /* We don't save the consumer name: we'll save the pending IDs - * for each consumer in the consumer PEL, and resolve the consumer - * at loading time. */ - } - } - raxStop(&ri); - return nwritten; -} - -/* Serialize the IDMP entries for a stream into the RDB file. - * This saves all the idempotent producer tracking entries (IID -> stream ID mappings). - * Format: num_producers, then for each producer: pid, num_entries, entries... */ -ssize_t rdbSaveStreamIdmpEntries(rio *rdb, stream *s) { - ssize_t n, nwritten = 0; - - /* Save the number of producers. */ - size_t num_producers = s->idmp_producers ? raxSize(s->idmp_producers) : 0; - if ((n = rdbSaveLen(rdb,num_producers)) == -1) return -1; - nwritten += n; - - if (num_producers == 0) return nwritten; - - /* Iterate through all producers. */ - raxIterator ri; - raxStart(&ri, s->idmp_producers); - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - idmpProducer *producer = ri.data; - - /* Save the producer ID (pid). */ - if ((n = rdbSaveRawString(rdb, ri.key, ri.key_len)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Save the number of entries for this producer. */ - size_t count = dictSize(producer->idmp_dict); - if ((n = rdbSaveLen(rdb, count)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Iterate through the linked list and save each entry in insertion order. */ - idmpEntry *entry = producer->idmp_head; - while (entry != NULL) { - /* Save the IID string (length + data). */ - if ((n = rdbSaveRawString(rdb,(unsigned char *)entry->iid,entry->iid_len)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Save the associated stream ID. */ - if ((n = rdbSaveLen(rdb,entry->id.ms)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - if ((n = rdbSaveLen(rdb,entry->id.seq)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - entry = entry->next; - } - } - raxStop(&ri); - return nwritten; -} - -/* Load IDMP entries for a stream from the RDB file. - * This loads all the idempotent producer tracking entries (IID -> stream ID mappings) - * and inserts them into the stream's idmp_producers rax tree. - * Format: num_producers, then for each producer: pid, num_entries, entries... */ -int rdbLoadStreamIdmpEntries(rio *rdb, stream *s) { - /* Load the number of producers. */ - uint64_t num_producers = rdbLoadLen(rdb, NULL); - if (num_producers == RDB_LENERR) { - return -1; - } - - if (num_producers == 0) return 0; - - /* Create the producers rax tree. */ - s->idmp_producers = raxNew(); - if (s->idmp_producers == NULL) { - return -1; - } - - /* Load each producer. */ - for (uint64_t p = 0; p < num_producers; p++) { - /* Load the producer ID (pid). */ - size_t pid_len; - char *pid = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, &pid_len); - if (pid == NULL) goto cleanup; - - /* Load the number of entries for this producer. */ - uint64_t count = rdbLoadLen(rdb, NULL); - if (count == RDB_LENERR) { - sdsfree(pid); - goto cleanup; - } - - /* Create the producer. */ - idmpProducer *producer = idmpProducerCreate(&s->alloc_size); - - /* Insert producer into rax tree. */ - int inserted = raxTryInsert(s->idmp_producers, (unsigned char *)pid, pid_len, producer, NULL); - sdsfree(pid); - if (!inserted) { - idmpProducerFree(producer, &s->alloc_size); - goto cleanup; - } - - /* Load each entry for this producer. */ - for (uint64_t i = 0; i < count; i++) { - /* Load the IID string. */ - size_t iid_len; - char *iid = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, &iid_len); - if (iid == NULL) goto cleanup; - - /* Load the associated stream ID. */ - streamID id; - id.ms = rdbLoadLen(rdb, NULL); - id.seq = rdbLoadLen(rdb, NULL); - if (rioGetReadError(rdb)) { - sdsfree(iid); - goto cleanup; - } - - /* Create the idmpEntry. */ - idmpEntry *entry = idmpEntryCreate(iid, iid_len, &s->alloc_size); - sdsfree(iid); /* idmpEntryCreate makes a copy */ - - /* Set the stream ID. */ - entry->id = id; - entry->next = NULL; - - /* Insert into dict. If insertion fails (e.g., duplicate), skip. */ - int ret = dictAdd(producer->idmp_dict, entry, NULL); - if (ret != DICT_OK) { - /* Insertion failed (duplicate). For RDB loading, we'll just skip - * duplicates rather than failing the entire load. */ - idmpEntryFree(entry, &s->alloc_size); - } else { - /* Add to linked list tail. */ - if (producer->idmp_tail == NULL) { - producer->idmp_head = producer->idmp_tail = entry; - } else { - producer->idmp_tail->next = entry; - producer->idmp_tail = entry; - } - } - } - } - return 0; - -cleanup: - /* Clean up partially constructed producers tree on error. - * This prevents use-after-free when the stream is later freed. */ - if (s->idmp_producers) { - raxFreeWithCbAndContext(s->idmp_producers, streamFreeIdmpProducerGeneric, s); - s->idmp_producers = NULL; - } - return -1; -} - -/* Serialize the consumers of a stream consumer group into the RDB. Helper - * function for the stream data type serialization. What we do here is to - * persist the consumer metadata, and it's PEL, for each consumer. */ -size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { - ssize_t n, nwritten = 0; - - /* Number of consumers in this consumer group. */ - if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1; - nwritten += n; - - /* Save each consumer. */ - raxIterator ri; - raxStart(&ri,cg->consumers); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - streamConsumer *consumer = ri.data; - - /* Consumer name. */ - if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Seen time. */ - if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Active time. */ - if ((n = rdbSaveMillisecondTime(rdb,consumer->active_time)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Consumer PEL, without the ACKs (see last parameter of the function - * passed with value of 0), at loading time we'll lookup the ID - * in the consumer group global PEL and will put a reference in the - * consumer local PEL. */ - if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - } - raxStop(&ri); - return nwritten; -} - -/* Save a Redis object. - * Returns -1 on error, number of bytes written on success. */ -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { - ssize_t n = 0, nwritten = 0; - - if (o->type == OBJ_STRING) { - /* Save a string value */ - if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1; - nwritten += n; - } else if (o->type == OBJ_LIST) { - /* Save a list value */ - if (o->encoding == OBJ_ENCODING_QUICKLIST) { - quicklist *ql = o->ptr; - quicklistNode *node = ql->head; - - if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1; - nwritten += n; - - while(node) { - if ((n = rdbSaveLen(rdb,node->container)) == -1) return -1; - nwritten += n; - - if (quicklistNodeIsCompressed(node)) { - void *data; - size_t compress_len = quicklistGetLzf(node, &data); - if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1; - nwritten += n; - } else { - if ((n = rdbSaveRawString(rdb,node->entry,node->sz)) == -1) return -1; - nwritten += n; - } - node = node->next; - } - } else if (o->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *lp = o->ptr; - - /* Save list listpack as a fake quicklist that only has a single node. */ - if ((n = rdbSaveLen(rdb,1)) == -1) return -1; - nwritten += n; - if ((n = rdbSaveLen(rdb,QUICKLIST_NODE_CONTAINER_PACKED)) == -1) return -1; - nwritten += n; - if ((n = rdbSaveRawString(rdb,lp,lpBytes(lp))) == -1) return -1; - nwritten += n; - } else { - serverPanic("Unknown list encoding"); - } - } else if (o->type == OBJ_SET) { - /* Save a set value */ - if (o->encoding == OBJ_ENCODING_HT) { - dict *set = o->ptr; - dictIterator di; - dictEntry *de; - - if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) { - return -1; - } - nwritten += n; - - dictInitIterator(&di, set); - while((de = dictNext(&di)) != NULL) { - sds ele = dictGetKey(de); - if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele))) - == -1) - { - dictResetIterator(&di); - return -1; - } - nwritten += n; - } - dictResetIterator(&di); - } else if (o->encoding == OBJ_ENCODING_INTSET) { - size_t l = intsetBlobLen((intset*)o->ptr); - - if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; - nwritten += n; - } else if (o->encoding == OBJ_ENCODING_LISTPACK) { - size_t l = lpBytes((unsigned char *)o->ptr); - if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1) return -1; - nwritten += n; - } else { - serverPanic("Unknown set encoding"); - } - } else if (o->type == OBJ_ZSET) { - /* Save a sorted set value */ - if (o->encoding == OBJ_ENCODING_LISTPACK) { - size_t l = lpBytes((unsigned char*)o->ptr); - - if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; - nwritten += n; - } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = o->ptr; - zskiplist *zsl = zs->zsl; - - if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1; - nwritten += n; - - /* We save the skiplist elements from the greatest to the smallest - * (that's trivial since the elements are already ordered in the - * skiplist): this improves the load process, since the next loaded - * element will always be the smaller, so adding to the skiplist - * will always immediately stop at the head, making the insertion - * O(1) instead of O(log(N)). */ - zskiplistNode *zn = zsl->tail; - while (zn != NULL) { - sds ele = zslGetNodeElement(zn); - if ((n = rdbSaveRawString(rdb, - (unsigned char*)ele,sdslen(ele))) == -1) - { - return -1; - } - nwritten += n; - if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1) - return -1; - nwritten += n; - zn = zn->backward; - } - } else { - serverPanic("Unknown sorted set encoding"); - } - } else if (o->type == OBJ_HASH) { - /* Save a hash value */ - if ((o->encoding == OBJ_ENCODING_LISTPACK) || - (o->encoding == OBJ_ENCODING_LISTPACK_EX)) - { - /* Save min/next HFE expiration time if needed */ - if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { - uint64_t minExpire = hashTypeGetMinExpire(o, 0); - /* if invalid time then save 0 */ - if (minExpire == EB_EXPIRE_TIME_INVALID) - minExpire = 0; - if (rdbSaveMillisecondTime(rdb, minExpire) == -1) - return -1; - } - unsigned char *lp_ptr = hashTypeListpackGetLp(o); - size_t l = lpBytes(lp_ptr); - - if ((n = rdbSaveRawString(rdb,lp_ptr,l)) == -1) return -1; - nwritten += n; - } else if (o->encoding == OBJ_ENCODING_HT) { - int hashWithMeta = 0; /* RDB_TYPE_HASH_METADATA */ - dictIterator di; - dictEntry *de; - /* Determine the hash layout to use based on the presence of at least - * one field with a valid TTL. If such a field exists, employ the - * RDB_TYPE_HASH_METADATA layout, including tuples of [ttl][field][value]. - * Otherwise, use the standard RDB_TYPE_HASH layout containing only - * the tuples [field][value]. */ - uint64_t minExpire = hashTypeGetMinExpire(o, 1); - - /* if RDB_TYPE_HASH_METADATA (Can have TTLs on fields) */ - if (minExpire != EB_EXPIRE_TIME_INVALID) { - hashWithMeta = 1; - /* Save next field expire time of hash */ - if (rdbSaveMillisecondTime(rdb, minExpire) == -1) { - return -1; - } - } - - /* save number of fields in hash */ - if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) { - return -1; - } - nwritten += n; - - /* save all hash fields */ - dictInitIterator(&di, o->ptr); - while((de = dictNext(&di)) != NULL) { - Entry *entry = dictGetKey(de); - sds field = entryGetField(entry); - sds value = entryGetValue(entry); - - /* save the TTL */ - if (hashWithMeta) { - uint64_t ttl, expiryTime= entryGetExpiry(entry); - - /* Saved TTL value: - * - 0: Indicates no TTL. This is common case so we keep it small. - * - Otherwise: TTL is relative to minExpire (with +1 to avoid 0 that already taken) - */ - ttl = (expiryTime == EB_EXPIRE_TIME_INVALID) ? 0 : expiryTime - minExpire + 1; - if ((n = rdbSaveLen(rdb, ttl)) == -1) { - dictResetIterator(&di); - return -1; - } - nwritten += n; - } - - /* save the key */ - if ((n = rdbSaveRawString(rdb,(unsigned char*)field, - sdslen(field))) == -1) - { - dictResetIterator(&di); - return -1; - } - nwritten += n; - - /* save the value */ - if ((n = rdbSaveRawString(rdb,(unsigned char*)value, - sdslen(value))) == -1) - { - dictResetIterator(&di); - return -1; - } - nwritten += n; - } - dictResetIterator(&di); - } else { - serverPanic("Unknown hash encoding"); - } - } else if (o->type == OBJ_STREAM) { - /* Store how many listpacks we have inside the radix tree. */ - stream *s = o->ptr; - rax *rax = s->rax; - if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1; - nwritten += n; - - /* Serialize all the listpacks inside the radix tree as they are, - * when loading back, we'll use the first entry of each listpack - * to insert it back into the radix tree. */ - raxIterator ri; - raxStart(&ri,rax); - raxSeek(&ri,"^",NULL,0); - while (raxNext(&ri)) { - unsigned char *lp = ri.data; - size_t lp_bytes = lpBytes(lp); - if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - } - raxStop(&ri); - - /* Save the number of elements inside the stream. We cannot obtain - * this easily later, since our macro nodes should be checked for - * number of items: not a great CPU / space tradeoff. */ - if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1; - nwritten += n; - /* Save the last entry ID. */ - if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1; - nwritten += n; - if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; - nwritten += n; - /* Save the first entry ID. */ - if ((n = rdbSaveLen(rdb,s->first_id.ms)) == -1) return -1; - nwritten += n; - if ((n = rdbSaveLen(rdb,s->first_id.seq)) == -1) return -1; - nwritten += n; - /* Save the maximal tombstone ID. */ - if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.ms)) == -1) return -1; - nwritten += n; - if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.seq)) == -1) return -1; - nwritten += n; - /* Save the offset. */ - if ((n = rdbSaveLen(rdb,s->entries_added)) == -1) return -1; - nwritten += n; - - /* The consumer groups and their clients are part of the stream - * type, so serialize every consumer group. */ - - /* Save the number of groups. */ - size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0; - if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1; - nwritten += n; - - if (num_cgroups) { - /* Serialize each consumer group. */ - raxStart(&ri,s->cgroups); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - streamCG *cg = ri.data; - - /* Save the group name. */ - if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Last ID. */ - if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Save the group's logical reads counter. */ - if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Save the global PEL. */ - if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - - /* Save the consumers of this group. */ - if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) { - raxStop(&ri); - return -1; - } - nwritten += n; - } - raxStop(&ri); - } - - /* Save IDMP (Idempotent Message Producer) configuration and entries. */ - - /* Save IDMP duration (in seconds). */ - if ((n = rdbSaveLen(rdb,s->idmp_duration)) == -1) return -1; - nwritten += n; - - /* Save IDMP max entries. */ - if ((n = rdbSaveLen(rdb,s->idmp_max_entries)) == -1) return -1; - nwritten += n; - - /* Save all IDMP entries. */ - if ((n = rdbSaveStreamIdmpEntries(rdb,s)) == -1) return -1; - nwritten += n; - - /* Save the all-time count of IIDs added. */ - if ((n = rdbSaveLen(rdb,s->iids_added)) == -1) return -1; - nwritten += n; - - /* Save the all-time count of duplicate IIDs detected. */ - if ((n = rdbSaveLen(rdb,s->iids_duplicates)) == -1) return -1; - nwritten += n; - } else if (o->type == OBJ_MODULE) { - /* Save a module-specific value. */ - RedisModuleIO io; - moduleValue *mv = o->ptr; - moduleType *mt = mv->type; - - /* Write the "module" identifier as prefix, so that we'll be able - * to call the right module during loading. */ - int retval = rdbSaveLen(rdb,mt->entity.id); - if (retval == -1) return -1; - moduleInitIOContext(&io, &mt->entity, rdb, key, dbid); - io.bytes += retval; - - /* Then write the module-specific representation + EOF marker. */ - mt->rdb_save(&io,mv->value); - retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); - if (retval == -1) - io.error = 1; - else - io.bytes += retval; - - if (io.ctx) { - moduleFreeContext(io.ctx); - zfree(io.ctx); - } - return io.error ? -1 : (ssize_t)io.bytes; - } else { - serverPanic("Unknown object type"); - } - return nwritten; -} - -/* Return the length the object will have on disk if saved with - * the rdbSaveObject() function. Currently we use a trick to get - * this length with very little changes to the code. In the future - * we could switch to a faster solution. */ -size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) { - ssize_t len = rdbSaveObject(NULL,o,key,dbid); - serverAssertWithInfo(NULL,o,len != -1); - return len; -} - -/* Save a key-value pair, with expire time, type, key, value. - * On error -1 is returned. - * On success if the key was actually saved 1 is returned. */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) { - int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; - int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; - - /* Save the expire time */ - if (expiretime != -1) { - if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; - if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; - } - - /* Save the LRU info. */ - if (savelru) { - uint64_t idletime = estimateObjectIdleTime(val); - idletime /= 1000; /* Using seconds is enough and requires less space.*/ - if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1; - if (rdbSaveLen(rdb,idletime) == -1) return -1; - } - - /* Save the LFU info. */ - if (savelfu) { - uint8_t buf[1]; - buf[0] = LFUDecrAndReturn(val); - /* We can encode this in exactly two bytes: the opcode and an 8 - * bit counter, since the frequency is logarithmic with a 0-255 range. - * Note that we do not store the halving time because to reset it - * a single time when loading does not affect the frequency much. */ - if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1; - if (rdbWriteRaw(rdb,buf,1) == -1) return -1; - } - - /* if needed save key metadata */ - if (getModuleMetaBits(val->metabits)) { - if (rdbSaveKeyMetadata(rdb, key, val, dbid) == -1) - return -1; - } - - /* Save type, key, value */ - if (rdbSaveObjectType(rdb,val) == -1) return -1; - if (rdbSaveStringObject(rdb,key) == -1) return -1; - if (rdbSaveObject(rdb,val,key,dbid) == -1) return -1; - - /* Delay return if required (for testing) */ - if (server.rdb_key_save_delay) - debugDelay(server.rdb_key_save_delay); - - return 1; -} - -/* Save an AUX field. */ -ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) { - ssize_t ret, len = 0; - if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1; - len += ret; - if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1; - len += ret; - if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1; - len += ret; - return len; -} - -/* Wrapper for rdbSaveAuxField() used when key/val length can be obtained - * with strlen(). */ -ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) { - return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val)); -} - -/* Wrapper for strlen(key) + integer type (up to long long range). */ -ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { - char buf[LONG_STR_SIZE]; - int vlen = ll2string(buf,sizeof(buf),val); - return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen); -} - -/* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { - int redis_bits = (sizeof(void*) == 8) ? 64 : 32; - int aof_base = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0; - - /* Add a few fields about the state when the RDB was created. */ - if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; - if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; - if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; - if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; - - /* Handle saving options that generate aux fields. */ - if (rsi) { - if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db) - == -1) return -1; - if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) - == -1) return -1; - if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) - == -1) return -1; - } - if (rdbSaveAuxFieldStrInt(rdb, "aof-base", aof_base) == -1) return -1; - - /* Save the active import ASM task if cluster is enabled. */ - if (server.cluster_enabled) { - sds task_info = asmDumpActiveImportTask(); - int ret = rdbSaveAuxFieldStrStr(rdb, "cluster-asm-task", - task_info ? task_info : ""); - if (task_info) sdsfree(task_info); - if (ret == -1) return -1; - } - - return 1; -} - -ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { - /* Save a module-specific aux value. */ - RedisModuleIO io; - int retval = 0; - moduleInitIOContext(&io, &mt->entity, rdb, NULL, -1); - - /* We save the AUX field header in a temporary buffer so we can support aux_save2 API. - * If aux_save2 is used the buffer will be flushed at the first time the module will perform - * a write operation to the RDB and will be ignored is case there was no writes. */ - rio aux_save_headers_rio; - rioInitWithBuffer(&aux_save_headers_rio, sdsempty()); - - if (rdbSaveType(&aux_save_headers_rio, RDB_OPCODE_MODULE_AUX) == -1) goto error; - - /* Write the "module" identifier as prefix, so that we'll be able - * to call the right module during loading. */ - if (rdbSaveLen(&aux_save_headers_rio,mt->entity.id) == -1) goto error; - - /* write the 'when' so that we can provide it on loading. add a UINT opcode - * for backwards compatibility, everything after the MT needs to be prefixed - * by an opcode. */ - if (rdbSaveLen(&aux_save_headers_rio,RDB_MODULE_OPCODE_UINT) == -1) goto error; - if (rdbSaveLen(&aux_save_headers_rio,when) == -1) goto error; - - /* Then write the module-specific representation + EOF marker. */ - if (mt->aux_save2) { - io.pre_flush_buffer = aux_save_headers_rio.io.buffer.ptr; - mt->aux_save2(&io,when); - if (io.pre_flush_buffer) { - /* aux_save did not save any data to the RDB. - * We will avoid saving any data related to this aux type - * to allow loading this RDB if the module is not present. */ - sdsfree(io.pre_flush_buffer); - io.pre_flush_buffer = NULL; - return 0; - } - } else { - /* Write headers now, aux_save does not do lazy saving of the headers. */ - retval = rdbWriteRaw(rdb, aux_save_headers_rio.io.buffer.ptr, sdslen(aux_save_headers_rio.io.buffer.ptr)); - if (retval == -1) goto error; - io.bytes += retval; - sdsfree(aux_save_headers_rio.io.buffer.ptr); - mt->aux_save(&io,when); - } - retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); - serverAssert(!io.pre_flush_buffer); - if (retval == -1) - io.error = 1; - else - io.bytes += retval; - - if (io.ctx) { - moduleFreeContext(io.ctx); - zfree(io.ctx); - } - if (io.error) - return -1; - return io.bytes; -error: - sdsfree(aux_save_headers_rio.io.buffer.ptr); - return -1; -} - -ssize_t rdbSaveFunctions(rio *rdb) { - dict *functions = functionsLibGet(); - dictIterator iter; - dictEntry *entry = NULL; - ssize_t written = 0; - ssize_t ret; - - dictInitIterator(&iter, functions); - while ((entry = dictNext(&iter))) { - if ((ret = rdbSaveType(rdb, RDB_OPCODE_FUNCTION2)) < 0) goto werr; - written += ret; - functionLibInfo *li = dictGetVal(entry); - if ((ret = rdbSaveRawString(rdb, (unsigned char *) li->code, sdslen(li->code))) < 0) goto werr; - written += ret; - } - dictResetIterator(&iter); - return written; - -werr: - dictResetIterator(&iter); - return -1; -} - -ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter, unsigned long long *skipped) { - dictEntry *de; - ssize_t written = 0; - ssize_t res; - size_t oldsize = 0; - kvstoreIterator kvs_it; - static long long info_updated_time = 0; - char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; - - redisDb *db = server.db + dbid; - unsigned long long int db_size = kvstoreSize(db->keys); - if (db_size == 0) return 0; - - /* Write the SELECT DB opcode */ - if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr; - written += res; - if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr; - written += res; - - /* Write the RESIZE DB opcode. */ - unsigned long long expires_size = kvstoreSize(db->expires); - if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr; - written += res; - if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr; - written += res; - if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr; - written += res; - - kvstoreIteratorInit(&kvs_it, db->keys); - int last_slot = -1; - /* Iterate this DB writing every entry */ - while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) { - int curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it); - /* Save slot info. */ - if (server.cluster_enabled && curr_slot != last_slot) { - if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr2; - written += res; - if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr2; - written += res; - if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->keys, curr_slot))) < 0) goto werr2; - written += res; - if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->expires, curr_slot))) < 0) goto werr2; - written += res; - last_slot = curr_slot; - } - kvobj *kv = dictGetKV(de); - robj key; - long long expire; - size_t rdb_bytes_before_key = rdb->processed_bytes; - - /* Skip keys that are being trimmed */ - if (server.cluster_enabled && isSlotInTrimJob(curr_slot)) { - (*skipped)++; - continue; - } - - initStaticStringObject(key,kvobjGetKey(kv)); - expire = kvobjGetExpire(kv); - if (server.memory_tracking_per_slot) - oldsize = kvobjAllocSize(kv); - res = rdbSaveKeyValuePair(rdb, &key, kv, expire, dbid); - if (server.memory_tracking_per_slot) - updateSlotAllocSize(db, curr_slot, oldsize, kvobjAllocSize(kv)); - if (res < 0) goto werr2; - written += res; - - /* In fork child process, we can try to release memory back to the - * OS and possibly avoid or decrease COW. We give the dismiss - * mechanism a hint about an estimated size of the object we stored. */ - size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key; - if (server.in_fork_child) dismissObject(kv, dump_size); - - /* Update child info every 1 second (approximately). - * in order to avoid calling mstime() on each iteration, we will - * check the diff every 1024 keys */ - if (((*key_counter)++ & 1023) == 0) { - long long now = mstime(); - if (now - info_updated_time >= 1000) { - sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname); - info_updated_time = now; - } - } - } - kvstoreIteratorReset(&kvs_it); - return written; - -werr2: - kvstoreIteratorReset(&kvs_it); -werr: - return -1; -} - -/* Produces a dump of the database in RDB format sending it to the specified - * Redis I/O channel. On success C_OK is returned, otherwise C_ERR - * is returned and part of the output, or all the output, can be - * missing because of I/O errors. - * - * When the function returns C_ERR and if 'error' is not NULL, the - * integer pointed by 'error' is set to the value of errno just after the I/O - * error. */ -int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { - char magic[10]; - uint64_t cksum; - long key_counter = 0; - unsigned long long skipped = 0; - int j; - - if (server.rdb_checksum) - rdb->update_cksum = rioGenericUpdateChecksum; - snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); - if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; - if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; - - /* save functions */ - if (!(req & SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS) && rdbSaveFunctions(rdb) == -1) goto werr; - - /* save all databases, skip this if we're in functions-only mode */ - if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA)) { - for (j = 0; j < server.dbnum; j++) { - if (rdbSaveDb(rdb, j, rdbflags, &key_counter, &skipped) == -1) goto werr; - } - } - - if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; - - /* EOF opcode */ - if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; - - /* CRC64 checksum. It will be zero if checksum computation is disabled, the - * loading code skips the check in this case. */ - cksum = rdb->cksum; - memrev64ifbe(&cksum); - if (rioWrite(rdb,&cksum,8) == 0) goto werr; - serverLog(LL_NOTICE, "BGSAVE done, %ld keys saved, %llu keys skipped, %zu bytes written.", key_counter, skipped, rdb->processed_bytes); - return C_OK; - -werr: - if (error) *error = errno; - return C_ERR; -} - -/* This helper function is only used for diskless replication. - * This is just a wrapper to rdbSaveRio() that additionally adds a prefix - * and a suffix to the generated RDB dump. The prefix is: - * - * $EOF:<40 bytes unguessable hex string>\r\n - * - * While the suffix is the 40 bytes hex string we announced in the prefix. - * This way processes receiving the payload can understand when it ends - * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { - char eofmark[RDB_EOF_MARK_SIZE]; - - startSaving(RDBFLAGS_REPLICATION); - getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); - if (error) *error = 0; - if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; - if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; - if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(req,rdb,error,RDBFLAGS_REPLICATION,rsi) == C_ERR) goto werr; - if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; - stopSaving(1); - return C_OK; - -werr: /* Write error. */ - /* Set 'error' only if not already set by rdbSaveRio() call. */ - if (error && *error == 0) *error = errno; - stopSaving(0); - return C_ERR; -} - -static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int rdbflags) { - char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ - rio rdb; - int error = 0; - int saved_errno; - char *err_op; /* For a detailed log */ - - FILE *fp = fopen(filename,"w"); - if (!fp) { - saved_errno = errno; - char *str_err = strerror(errno); - char *cwdp = getcwd(cwd,MAXPATHLEN); - serverLog(LL_WARNING, - "Failed opening the temp RDB file %s (in server root dir %s) " - "for saving: %s", - filename, - cwdp ? cwdp : "unknown", - str_err); - errno = saved_errno; - return C_ERR; - } - - rioInitWithFile(&rdb,fp); - - if (server.rdb_save_incremental_fsync) { - rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); - if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb,1); - } - - if (rdbSaveRio(req,&rdb,&error,rdbflags,rsi) == C_ERR) { - errno = error; - err_op = "rdbSaveRio"; - goto werr; - } - - /* Make sure data will not remain on the OS's output buffers */ - if (fflush(fp)) { err_op = "fflush"; goto werr; } - if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; } - if (!(rdbflags & RDBFLAGS_KEEP_CACHE) && reclaimFilePageCache(fileno(fp), 0, 0) == -1) { - serverLog(LL_NOTICE,"Unable to reclaim cache after saving RDB: %s", strerror(errno)); - } - if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; } - - return C_OK; - -werr: - saved_errno = errno; - serverLog(LL_WARNING,"Write error while saving DB to the disk(%s): %s", err_op, strerror(errno)); - if (fp) fclose(fp); - unlink(filename); - errno = saved_errno; - return C_ERR; -} - -/* Save DB to the file. Similar to rdbSave() but this function won't use a - * temporary file and won't update the metrics. */ -int rdbSaveToFile(const char *filename) { - startSaving(RDBFLAGS_NONE); - - if (rdbSaveInternal(SLAVE_REQ_NONE,filename,NULL,RDBFLAGS_NONE) != C_OK) { - int saved_errno = errno; - stopSaving(0); - errno = saved_errno; - return C_ERR; - } - - stopSaving(1); - return C_OK; -} - -/* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) { - char tmpfile[256]; - char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ - - startSaving(rdbflags); - snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); - - if (rdbSaveInternal(req,tmpfile,rsi,rdbflags) != C_OK) { - stopSaving(0); - return C_ERR; - } - - /* Use RENAME to make sure the DB file is changed atomically only - * if the generate DB file is ok. */ - if (rename(tmpfile,filename) == -1) { - char *str_err = strerror(errno); - char *cwdp = getcwd(cwd,MAXPATHLEN); - serverLog(LL_WARNING, - "Error moving temp DB file %s on the final " - "destination %s (in server root dir %s): %s", - tmpfile, - filename, - cwdp ? cwdp : "unknown", - str_err); - unlink(tmpfile); - stopSaving(0); - return C_ERR; - } - if (fsyncFileDir(filename) != 0) { - serverLog(LL_WARNING, - "Failed to fsync directory while saving DB: %s", strerror(errno)); - stopSaving(0); - return C_ERR; - } - - serverLog(LL_NOTICE,"DB saved on disk"); - server.dirty = 0; - server.lastsave = time(NULL); - server.lastbgsave_status = C_OK; - stopSaving(1); - return C_OK; -} - -int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) { - pid_t childpid; - - if (hasActiveChildProcess()) return C_ERR; - server.stat_rdb_saves++; - - server.dirty_before_bgsave = server.dirty; - server.lastbgsave_try = time(NULL); - - if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) { - int retval; - - /* Child */ - redisSetProcTitle("redis-rdb-bgsave"); - redisSetCpuAffinity(server.bgsave_cpulist); - retval = rdbSave(req, filename,rsi,rdbflags); - if (retval == C_OK) { - sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB"); - } - exitFromChild((retval == C_OK) ? 0 : 1, 0); - } else { - /* Parent */ - if (childpid == -1) { - server.lastbgsave_status = C_ERR; - serverLog(LL_WARNING,"Can't save in background: fork: %s", - strerror(errno)); - return C_ERR; - } - serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid); - server.rdb_save_time_start = time(NULL); - server.rdb_child_type = RDB_CHILD_TYPE_DISK; - return C_OK; - } - return C_OK; /* unreached */ -} - -/* Note that we may call this function in signal handle 'sigShutdownHandler', - * so we need guarantee all functions we call are async-signal-safe. - * If we call this function from signal handle, we won't call bg_unlink that - * is not async-signal-safe. */ -void rdbRemoveTempFile(pid_t childpid, int from_signal) { - char tmpfile[256]; - char pid[32]; - - /* Generate temp rdb file name using async-signal safe functions. */ - ll2string(pid, sizeof(pid), childpid); - redis_strlcpy(tmpfile, "temp-", sizeof(tmpfile)); - redis_strlcat(tmpfile, pid, sizeof(tmpfile)); - redis_strlcat(tmpfile, ".rdb", sizeof(tmpfile)); - - if (from_signal) { - /* bg_unlink is not async-signal-safe, but in this case we don't really - * need to close the fd, it'll be released when the process exists. */ - int fd = open(tmpfile, O_RDONLY|O_NONBLOCK); - UNUSED(fd); - unlink(tmpfile); - } else { - bg_unlink(tmpfile); - } -} - -/* This function is called by rdbLoadObject() when the code is in RDB-check - * mode and we find a module value of type 2 that can be parsed without - * the need of the actual module. The value is parsed for errors, finally - * a dummy redis object is returned just to conform to the API. */ -robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { - uint64_t opcode; - while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) { - if (opcode == RDB_MODULE_OPCODE_SINT || - opcode == RDB_MODULE_OPCODE_UINT) - { - uint64_t len; - if (rdbLoadLenByRef(rdb,NULL,&len) == -1) { - rdbReportCorruptRDB( - "Error reading integer from module %s value", modulename); - } - } else if (opcode == RDB_MODULE_OPCODE_STRING) { - robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); - if (o == NULL) { - rdbReportCorruptRDB( - "Error reading string from module %s value", modulename); - } - decrRefCount(o); - } else if (opcode == RDB_MODULE_OPCODE_FLOAT) { - float val; - if (rdbLoadBinaryFloatValue(rdb,&val) == -1) { - rdbReportCorruptRDB( - "Error reading float from module %s value", modulename); - } - } else if (opcode == RDB_MODULE_OPCODE_DOUBLE) { - double val; - if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) { - rdbReportCorruptRDB( - "Error reading double from module %s value", modulename); - } - } - } - return createStringObject("module-dummy-value",18); -} - -/* Load object type and optional key metadata (into `keymeta`) from RDB stream. - * This function handles the RDB_OPCODE_KEY_META opcode that may appear before - * the actual object type in RDB streams (both regular RDB files and DUMP payloads). - * The `type` parameter is updated with the actual object type. - * - * Returns: 0 on success, -1 on error - */ -int rdbResolveKeyType(rio *rdb, int *type, int dbid, KeyMetaSpec *keymeta) { - if (*type == RDB_OPCODE_KEY_META) { - /* Load key metadata from RDB */ - uint64_t numClasses; - if ((numClasses = rdbLoadLen(rdb, NULL)) == RDB_LENERR) { - return -1; - } - if (rdbLoadKeyMetadata(rdb, dbid, numClasses, keymeta) == -1) { - return -1; - } - /* Read the actual object type after metadata */ - *type = rdbLoadObjectType(rdb); - if (*type == -1) { - keyMetaSpecCleanup(keymeta); - return -1; - } - } else if (!rdbIsObjectType(*type)) { - /* Not metadata and not a valid object type */ - return -1; - } - - return 0; -} - -/* callback for hashZiplistConvertAndValidateIntegrity. - * Check that the ziplist doesn't have duplicate hash field names. - * The ziplist element pointed by 'p' will be converted and stored into listpack. */ -static int _ziplistPairsEntryConvertAndValidate(unsigned char *p, unsigned int head_count, void *userdata) { - unsigned char *str; - unsigned int slen; - long long vll; - - struct { - long count; - dict *fields; - unsigned char **lp; - } *data = userdata; - - if (data->fields == NULL) { - data->fields = dictCreate(&hashDictType); - dictExpand(data->fields, head_count/2); - } - - if (!ziplistGet(p, &str, &slen, &vll)) - return 0; - - /* Even records are field names, add to dict and check that's not a dup */ - if (((data->count) & 1) == 0) { - sds field = str? sdsnewlen(str, slen): sdsfromlonglong(vll); - if (dictAdd(data->fields, field, NULL) != DICT_OK) { - /* Duplicate, return an error */ - sdsfree(field); - return 0; - } - } - - if (str) { - *(data->lp) = lpAppend(*(data->lp), (unsigned char*)str, slen); - } else { - *(data->lp) = lpAppendInteger(*(data->lp), vll); - } - - (data->count)++; - return 1; -} - -/* Validate the integrity of the data structure while converting it to - * listpack and storing it at 'lp'. - * The function is safe to call on non-validated ziplists, it returns 0 - * when encounter an integrity validation issue. */ -int ziplistPairsConvertAndValidateIntegrity(unsigned char *zl, size_t size, unsigned char **lp) { - /* Keep track of the field names to locate duplicate ones */ - struct { - long count; - dict *fields; /* Initialisation at the first callback. */ - unsigned char **lp; - } data = {0, NULL, lp}; - - int ret = ziplistValidateIntegrity(zl, size, 1, _ziplistPairsEntryConvertAndValidate, &data); - - /* make sure we have an even number of records. */ - if (data.count & 1) - ret = 0; - - if (data.fields) dictRelease(data.fields); - return ret; -} - -/* callback for ziplistValidateIntegrity. - * The ziplist element pointed by 'p' will be converted and stored into listpack. */ -static int _ziplistEntryConvertAndValidate(unsigned char *p, unsigned int head_count, void *userdata) { - UNUSED(head_count); - unsigned char *str; - unsigned int slen; - long long vll; - unsigned char **lp = (unsigned char**)userdata; - - if (!ziplistGet(p, &str, &slen, &vll)) return 0; - - if (str) - *lp = lpAppend(*lp, (unsigned char*)str, slen); - else - *lp = lpAppendInteger(*lp, vll); - - return 1; -} - -/* callback for ziplistValidateIntegrity. - * The ziplist element pointed by 'p' will be converted and stored into quicklist. */ -static int _listZiplistEntryConvertAndValidate(unsigned char *p, unsigned int head_count, void *userdata) { - UNUSED(head_count); - unsigned char *str; - unsigned int slen; - long long vll; - char longstr[32] = {0}; - quicklist *ql = (quicklist*)userdata; - - if (!ziplistGet(p, &str, &slen, &vll)) return 0; - if (!str) { - /* Write the longval as a string so we can re-add it */ - slen = ll2string(longstr, sizeof(longstr), vll); - str = (unsigned char *)longstr; - } - quicklistPushTail(ql, str, slen); - return 1; -} - -/* callback for to check the listpack doesn't have duplicate records */ -static int _lpEntryValidation(unsigned char *p, unsigned int head_count, void *userdata) { - struct { - int tuple_len; - long count; - dict *fields; - long long last_expireat; - } *data = userdata; - - if (data->fields == NULL) { - data->fields = dictCreate(&hashDictType); - dictExpand(data->fields, head_count/data->tuple_len); - } - - /* If we're checking pairs, then even records are field names. Otherwise - * we're checking all elements. Add to dict and check that's not a dup */ - if (data->count % data->tuple_len == 0) { - unsigned char *str; - int64_t slen; - unsigned char buf[LP_INTBUF_SIZE]; - - str = lpGet(p, &slen, buf); - sds field = sdsnewlen(str, slen); - if (dictAdd(data->fields, field, NULL) != DICT_OK) { - /* Duplicate, return an error */ - sdsfree(field); - return 0; - } - } - - /* Validate TTL field, only for listpackex. */ - if (data->count % data->tuple_len == 2) { - long long expire_at; - /* Must be an integer. */ - if (!lpGetIntegerValue(p, &expire_at)) return 0; - /* Must be less than EB_EXPIRE_TIME_MAX. */ - if (expire_at < 0 || (unsigned long long)expire_at > EB_EXPIRE_TIME_MAX) return 0; - /* TTL fields are ordered. If the current field has TTL, the previous field must - * also have one, and the current TTL must be greater than the previous one. */ - if (expire_at != 0 && (data->last_expireat == 0 || expire_at < data->last_expireat)) return 0; - data->last_expireat = expire_at; - } - - (data->count)++; - return 1; -} - -/* Validate the integrity of the listpack structure. - * when `deep` is 0, only the integrity of the header is validated. - * when `deep` is 1, we scan all the entries one by one. - * tuple_len indicates what is a logical entry tuple size. - * Whether tuple is of size 1 (set), 2 (feild-value) or 3 (field-value[-ttl]), - * first element in the tuple must be unique */ -int lpValidateIntegrityAndDups(unsigned char *lp, size_t size, int deep, int tuple_len) { - if (!deep) - return lpValidateIntegrity(lp, size, 0, NULL, NULL); - - /* Keep track of the field names to locate duplicate ones */ - struct { - int tuple_len; - long count; - dict *fields; /* Initialisation at the first callback. */ - long long last_expireat; /* Last field's expiry time to ensure order in TTL fields. */ - } data = {tuple_len, 0, NULL, -1}; - - int ret = lpValidateIntegrity(lp, size, 1, _lpEntryValidation, &data); - - /* the number of records should be a multiple of the tuple length */ - if (data.count % tuple_len != 0) - ret = 0; - - if (data.fields) dictRelease(data.fields); - return ret; -} - -/* Load a Redis object of the specified type from the specified file. - * On success a newly allocated object is returned, otherwise NULL. - * - * error - When the function returns NULL and if 'error' is not NULL, the - * integer pointed by 'error' is set to the type of error that occurred - * minExpiredField - If loading a hash with expiration on fields, then this value - * will be set to the minimum expire time found in the hash fields. If there are - * no fields with expiration or it is not a hash, then it will set be to - * EB_EXPIRE_TIME_INVALID. - */ -robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) -{ - robj *o = NULL, *ele, *dec; - uint64_t len; - unsigned int i; - - /* Set default error of load object, it will be set to 0 on success. */ - if (error) *error = RDB_LOAD_ERR_OTHER; - - int deep_integrity_validation = server.sanitize_dump_payload == SANITIZE_DUMP_YES; - if (server.sanitize_dump_payload == SANITIZE_DUMP_CLIENTS) { - /* Skip sanitization when loading (an RDB), or getting a RESTORE command - * from either the master or a client using an ACL user with the skip-sanitize-payload flag. */ - int skip = server.loading || - (server.current_client && (server.current_client->flags & CLIENT_MASTER)); - if (!skip && server.current_client && server.current_client->user) - skip = !!(server.current_client->user->flags & USER_FLAG_SANITIZE_PAYLOAD_SKIP); - deep_integrity_validation = !skip; - } - - if (rdbtype == RDB_TYPE_STRING) { - /* Read string value */ - if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; - o = tryObjectEncodingEx(o, 0); - } else if (rdbtype == RDB_TYPE_LIST) { - /* Read list value */ - if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; - if (len == 0) goto emptykey; - - o = createQuicklistObject(server.list_max_listpack_size, server.list_compress_depth); - - /* Load every single element of the list */ - while(len--) { - if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) { - decrRefCount(o); - return NULL; - } - dec = getDecodedObject(ele); - size_t len = sdslen(dec->ptr); - quicklistPushTail(o->ptr, dec->ptr, len); - decrRefCount(dec); - decrRefCount(ele); - } - - listTypeTryConversion(o, LIST_CONV_AUTO, NULL, NULL); - } else if (rdbtype == RDB_TYPE_SET) { - /* Read Set value */ - if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; - if (len == 0) goto emptykey; - - /* Use a regular set when there are too many entries. */ - size_t max_entries = server.set_max_intset_entries; - if (max_entries >= 1<<30) max_entries = 1<<30; - if (len > max_entries) { - o = createSetObject(); - /* It's faster to expand the dict to the right size asap in order - * to avoid rehashing */ - if (len > DICT_HT_INITIAL_SIZE && dictTryExpand(o->ptr, len) != DICT_OK) { - rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len); - decrRefCount(o); - return NULL; - } - } else { - o = createIntsetObject(); - } - - /* Load every single element of the set */ - size_t maxelelen = 0, sumelelen = 0; - for (i = 0; i < len; i++) { - long long llval; - sds sdsele; - - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - decrRefCount(o); - return NULL; - } - size_t elelen = sdslen(sdsele); - sumelelen += elelen; - if (elelen > maxelelen) maxelelen = elelen; - - if (o->encoding == OBJ_ENCODING_INTSET) { - /* Fetch integer value from element. */ - if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) { - uint8_t success; - o->ptr = intsetAdd(o->ptr, llval, &success); - if (!success) { - rdbReportCorruptRDB("Duplicate set members detected"); - decrRefCount(o); - sdsfree(sdsele); - return NULL; - } - } else if (setTypeSize(o) < server.set_max_listpack_entries && - maxelelen <= server.set_max_listpack_value && - lpSafeToAdd(NULL, sumelelen)) - { - /* We checked if it's safe to add one large element instead - * of many small ones. It's OK since lpSafeToAdd doesn't - * care about individual elements, only the total size. */ - setTypeConvert(o, OBJ_ENCODING_LISTPACK); - } else if (setTypeConvertAndExpand(o, OBJ_ENCODING_HT, len, 0) != C_OK) { - rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len); - sdsfree(sdsele); - decrRefCount(o); - return NULL; - } - } - - /* This will also be called when the set was just converted - * to a listpack encoded set. */ - if (o->encoding == OBJ_ENCODING_LISTPACK) { - if (setTypeSize(o) < server.set_max_listpack_entries && - elelen <= server.set_max_listpack_value && - lpSafeToAdd(o->ptr, elelen)) - { - unsigned char *p = lpFirst(o->ptr); - if (p && lpFind(o->ptr, p, (unsigned char*)sdsele, elelen, 0)) { - rdbReportCorruptRDB("Duplicate set members detected"); - decrRefCount(o); - sdsfree(sdsele); - return NULL; - } - o->ptr = lpAppend(o->ptr, (unsigned char *)sdsele, elelen); - } else if (setTypeConvertAndExpand(o, OBJ_ENCODING_HT, len, 0) != C_OK) { - rdbReportCorruptRDB("OOM in dictTryExpand %llu", - (unsigned long long)len); - sdsfree(sdsele); - decrRefCount(o); - return NULL; - } - } - - /* This will also be called when the set was just converted - * to a regular hash table encoded set. */ - if (o->encoding == OBJ_ENCODING_HT) { - if (dictAdd((dict*)o->ptr, sdsele, NULL) != DICT_OK) { - rdbReportCorruptRDB("Duplicate set members detected"); - decrRefCount(o); - sdsfree(sdsele); - return NULL; - } - *htGetMetadataSize(o->ptr) += sdsAllocSize(sdsele); - } else { - sdsfree(sdsele); - } - } - } else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) { - /* Read sorted set value. */ - uint64_t zsetlen; - size_t maxelelen = 0, totelelen = 0; - zset *zs; - - if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; - if (zsetlen == 0) goto emptykey; - - o = createZsetObject(); - zs = o->ptr; - - if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict,zsetlen) != DICT_OK) { - rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)zsetlen); - decrRefCount(o); - return NULL; - } - - /* Load every single element of the sorted set. */ - while(zsetlen--) { - sds sdsele; - double score; - zskiplistNode *znode; - - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - decrRefCount(o); - return NULL; - } - - if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { - decrRefCount(o); - sdsfree(sdsele); - return NULL; - } - } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) { - decrRefCount(o); - sdsfree(sdsele); - return NULL; - } - } - - if (isnan(score)) { - rdbReportCorruptRDB("Zset with NAN score detected"); - decrRefCount(o); - sdsfree(sdsele); - return NULL; - } - - /* Don't care about integer-encoded strings. */ - if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele); - totelelen += sdslen(sdsele); - - znode = zslInsert(zs->zsl,score,sdsele); - if (dictAdd(zs->dict, znode, NULL) != DICT_OK) { - rdbReportCorruptRDB("Duplicate zset fields detected"); - decrRefCount(o); - sdsfree(sdsele); /* zslInsert copies the sds, so we need to free the original */ - return NULL; - } - sdsfree(sdsele); /* zslInsert copies the sds into the node, so free the original */ - } - - /* Convert *after* loading, since sorted sets are not stored ordered. */ - if (zsetLength(o) <= server.zset_max_listpack_entries && - maxelelen <= server.zset_max_listpack_value && - lpSafeToAdd(NULL, totelelen)) - { - zsetConvert(o, OBJ_ENCODING_LISTPACK); - } - } else if (rdbtype == RDB_TYPE_HASH) { - uint64_t len, original_len; - int ret; - sds value; - Entry *entry; - dict *dupSearchDict = NULL; - - len = rdbLoadLen(rdb, NULL); - if (len == RDB_LENERR) return NULL; - original_len = len; - if (len == 0) goto emptykey; - - o = createHashObject(); - - /* Too many entries? Use a hash table right from the start. */ - if (len > server.hash_max_listpack_entries) - hashTypeConvert(NULL, o, OBJ_ENCODING_HT); - else if (deep_integrity_validation) { - /* In this mode, we need to guarantee that the server won't crash - * later when the ziplist is converted to a dict. - * Create a set (dict with no values) to for a dup search. - * We can dismiss it as soon as we convert the ziplist to a hash. */ - dupSearchDict = dictCreate(&hashDictType); - } - - /* Load every field and value into the listpack */ - while (o->encoding == OBJ_ENCODING_LISTPACK && len > 0) { - len--; - /* Load raw strings - load field as SDS first */ - size_t usable; - sds fieldSds; - if ((fieldSds = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - decrRefCount(o); - if (dupSearchDict) dictRelease(dupSearchDict); - return NULL; - } - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - sdsfree(fieldSds); - decrRefCount(o); - if (dupSearchDict) dictRelease(dupSearchDict); - return NULL; - } - - /* Create entry with field and value - take ownership of value */ - entry = entryCreate(fieldSds, value, ENTRY_TAKE_VALUE, &usable); - sdsfree(fieldSds); /* entryCreate() doesn't take ownership of field */ - - if (dupSearchDict) { - sds field_dup = sdsdup(entryGetField(entry)); - - if (dictAdd(dupSearchDict, field_dup, NULL) != DICT_OK) { - rdbReportCorruptRDB("Hash with dup elements"); - dictRelease(dupSearchDict); - decrRefCount(o); - sdsfree(field_dup); - entryFree(entry, NULL); - return NULL; - } - } - - /* Convert to hash table if size threshold is exceeded */ - if (entryFieldLen(entry) > server.hash_max_listpack_value || - sdslen(entryGetValue(entry)) > server.hash_max_listpack_value || - !lpSafeToAdd(o->ptr, entryFieldLen(entry) + sdslen(entryGetValue(entry)))) - { - hashTypeConvert(NULL, o, OBJ_ENCODING_HT); - ret = dictAdd((dict*)o->ptr, entry, NULL); /* no_value=1 */ - if (ret == DICT_ERR) { - rdbReportCorruptRDB("Duplicate hash fields detected"); - if (dupSearchDict) dictRelease(dupSearchDict); - entryFree(entry, NULL); - decrRefCount(o); - return NULL; - } - *htGetMetadataSize(o->ptr) += usable; - break; - } - - /* Add pair to listpack */ - o->ptr = lpAppend(o->ptr, (unsigned char*)entryGetField(entry), entryFieldLen(entry)); - o->ptr = lpAppend(o->ptr, (unsigned char*)entryGetValue(entry), sdslen(entryGetValue(entry))); - - entryFree(entry, NULL); - } - - if (dupSearchDict) { - /* We no longer need this, from now on the entries are added - * to a dict so the check is performed implicitly. */ - dictRelease(dupSearchDict); - dupSearchDict = NULL; - } - - if (o->encoding == OBJ_ENCODING_HT && original_len > DICT_HT_INITIAL_SIZE) { - if (dictTryExpand(o->ptr, original_len) != DICT_OK) { - rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)original_len); - decrRefCount(o); - return NULL; - } - } - - /* Load remaining fields and values into the hash table */ - while (o->encoding == OBJ_ENCODING_HT && len > 0) { - len--; - /* Load encoded strings - load field as SDS first */ - size_t usable; - sds fieldSds; - if ((fieldSds = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - decrRefCount(o); - return NULL; - } - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - sdsfree(fieldSds); - decrRefCount(o); - return NULL; - } - - /* Create entry with field and value - take ownership of value */ - entry = entryCreate(fieldSds, value, ENTRY_TAKE_VALUE, &usable); - sdsfree(fieldSds); /* entryCreate() doesn't take ownership of field */ - - /* Add entry to hash table */ - dict *d = o->ptr; - ret = dictAdd(d, entry, NULL); /* no_value=1 */ - if (ret == DICT_ERR) { - rdbReportCorruptRDB("Duplicate hash fields detected"); - entryFree(entry, NULL); - decrRefCount(o); - return NULL; - } - *htGetMetadataSize(o->ptr) += usable; - } - - /* All pairs should be read by now */ - serverAssert(len == 0); - } else if (rdbtype == RDB_TYPE_HASH_METADATA || rdbtype == RDB_TYPE_HASH_METADATA_PRE_GA) { - sds value; - Entry *entry; - uint64_t ttl, expireAt, minExpire = EB_EXPIRE_TIME_INVALID; - uint64_t original_len; - dict *dupSearchDict = NULL; - - /* If hash with TTLs, load next/min expiration time - * - * - This value is serialized for future use-case of streaming the object - * directly to FLASH (while keeping in mem its next expiration time). - * - It is also being used to keep only relative TTL for fields in RDB file. - */ - if (rdbtype == RDB_TYPE_HASH_METADATA) { - minExpire = rdbLoadMillisecondTime(rdb, RDB_VERSION); - if (rioGetReadError(rdb)) { - rdbReportReadError("Hash failed loading minExpire"); - return NULL; - } - if (minExpire > EB_EXPIRE_TIME_INVALID) { - rdbReportCorruptRDB("Hash read invalid minExpire value"); - } - } - - len = rdbLoadLen(rdb, NULL); - if (len == RDB_LENERR) return NULL; - original_len = len; - if (len == 0) goto emptykey; - /* TODO: create listpackEx or HT directly*/ - o = createHashObject(); - /* Too many entries? Use a hash table right from the start. */ - if (len > server.hash_max_listpack_entries) { - hashTypeConvert(NULL, o, OBJ_ENCODING_HT); - dictTypeAddMeta((dict**)&o->ptr, &entryHashDictTypeWithHFE); - initDictExpireMetadata(o); - } else { - hashTypeConvert(NULL, o, OBJ_ENCODING_LISTPACK_EX); - if (deep_integrity_validation) { - /* In this mode, we need to guarantee that the server won't crash - * later when the listpack is converted to a dict. - * Create a set (dict with no values) for dup search. - * We can dismiss it as soon as we convert the listpack to a hash. */ - dupSearchDict = dictCreate(&hashDictType); - } - } - - while (len > 0) { - len--; - - /* read the TTL */ - if (rdbLoadLenByRef(rdb, NULL, &ttl) == -1) { - serverLog(LL_WARNING, "failed reading hash TTL"); - decrRefCount(o); - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - return NULL; - } - - - if (rdbtype == RDB_TYPE_HASH_METADATA) { - /* Loaded TTL value: - * - 0: Indicates no TTL. This is common case so we keep it small. - * - Otherwise: TTL is relative to minExpire (with +1 to avoid 0 that already taken) - */ - expireAt = (ttl != 0) ? (ttl + minExpire - 1) : 0; - } else { /* RDB_TYPE_HASH_METADATA_PRE_GA */ - expireAt = ttl; /* Value is absolute */ - } - - if (expireAt > EB_EXPIRE_TIME_MAX) { - rdbReportCorruptRDB("invalid expireAt time: %llu", - (unsigned long long) expireAt); - decrRefCount(o); - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - return NULL; - } - - /* Load field and value as SDS first */ - size_t usable; - sds fieldSds = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL); - - if (fieldSds == NULL) { - serverLog(LL_WARNING, "failed reading hash field"); - decrRefCount(o); - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - return NULL; - } - - /* read the value */ - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - serverLog(LL_WARNING, "failed reading hash value"); - decrRefCount(o); - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - sdsfree(fieldSds); - return NULL; - } - - /* Create entry with field, value, and optional expiration */ - uint32_t entryFlags = ENTRY_TAKE_VALUE | ((expireAt != 0) ? ENTRY_HAS_EXPIRY : 0); - entry = entryCreate(fieldSds, value, entryFlags, &usable); - sdsfree(fieldSds); /* entryCreate() doesn't take ownership of field */ - - sds field = entryGetField(entry); - size_t flen = sdslen(field); - sds value = entryGetValue(entry); - size_t vlen = sdslen(value); - - /* store the values read - either to listpack or dict */ - if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { - /* integrity - check for key duplication (if required) */ - if (dupSearchDict) { - sds field_dup = sdsdup(field); - - if (dictAdd(dupSearchDict, field_dup, NULL) != DICT_OK) { - rdbReportCorruptRDB("Hash with dup elements"); - dictRelease(dupSearchDict); - decrRefCount(o); - sdsfree(field_dup); - entryFree(entry, NULL); - return NULL; - } - } - - /* check if the values can be saved to listpack (or should convert to dict encoding) */ - if (flen > server.hash_max_listpack_value || - vlen > server.hash_max_listpack_value || - !lpSafeToAdd(((listpackEx*)o->ptr)->lp, flen + vlen + lpEntrySizeInteger(expireAt))) - { - /* convert to hash */ - hashTypeConvert(NULL, o, OBJ_ENCODING_HT); - - if (original_len > DICT_HT_INITIAL_SIZE) { - if (dictTryExpand(o->ptr, original_len) != DICT_OK) { - rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)original_len); - decrRefCount(o); - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - entryFree(entry, NULL); - return NULL; - } - } - - /* don't add the values to the new hash: the next if will catch and the values will be added there */ - } else { - listpackExAddNew(o, field, flen, value, vlen, expireAt); - entryFree(entry, NULL); - } - } - - if (o->encoding == OBJ_ENCODING_HT) { - /* Add entry to hash table */ - dict *d = o->ptr; - int ret = dictAdd(d, entry, NULL); /* no_value=1 */ - - /* Attach expiry to the hash field and register in hash private HFE DS */ - if ((ret != DICT_ERR) && expireAt) { - htMetadataEx *m = htGetMetadataEx(d); - ret = ebAdd(&m->hfe, &hashFieldExpireBucketsType, entry, expireAt); - } - - if (ret == DICT_ERR) { - rdbReportCorruptRDB("Duplicate hash fields detected"); - entryFree(entry, NULL); - decrRefCount(o); - return NULL; - } - *htGetMetadataSize(d) += usable; - } - } - - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - - } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST || rdbtype == RDB_TYPE_LIST_QUICKLIST_2) { - if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; - if (len == 0) goto emptykey; - - o = createQuicklistObject(server.list_max_listpack_size, server.list_compress_depth); - uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED; - while (len--) { - unsigned char *lp; - size_t encoded_len; - - if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) { - if ((container = rdbLoadLen(rdb,NULL)) == RDB_LENERR) { - decrRefCount(o); - return NULL; - } - - if (container != QUICKLIST_NODE_CONTAINER_PACKED && container != QUICKLIST_NODE_CONTAINER_PLAIN) { - rdbReportCorruptRDB("Quicklist integrity check failed."); - decrRefCount(o); - return NULL; - } - } - - unsigned char *data = - rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&encoded_len); - if (data == NULL || (encoded_len == 0)) { - zfree(data); - decrRefCount(o); - return NULL; - } - - if (container == QUICKLIST_NODE_CONTAINER_PLAIN) { - quicklistAppendPlainNode(o->ptr, data, encoded_len); - continue; - } - - if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) { - lp = data; - if (deep_integrity_validation) server.stat_dump_payload_sanitizations++; - if (!lpValidateIntegrity(lp, encoded_len, deep_integrity_validation, NULL, NULL)) { - rdbReportCorruptRDB("Listpack integrity check failed."); - decrRefCount(o); - zfree(lp); - return NULL; - } - } else { - lp = lpNew(encoded_len); - if (!ziplistValidateIntegrity(data, encoded_len, 1, - _ziplistEntryConvertAndValidate, &lp)) - { - rdbReportCorruptRDB("Ziplist integrity check failed."); - decrRefCount(o); - zfree(data); - zfree(lp); - return NULL; - } - zfree(data); - lp = lpShrinkToFit(lp); - } - - /* Silently skip empty ziplists, if we'll end up with empty quicklist we'll fail later. */ - if (lpLength(lp) == 0) { - zfree(lp); - continue; - } else { - quicklistAppendListpack(o->ptr, lp); - } - } - - if (quicklistCount(o->ptr) == 0) { - decrRefCount(o); - goto emptykey; - } - - listTypeTryConversion(o, LIST_CONV_AUTO, NULL, NULL); - } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP || - rdbtype == RDB_TYPE_LIST_ZIPLIST || - rdbtype == RDB_TYPE_SET_INTSET || - rdbtype == RDB_TYPE_SET_LISTPACK || - rdbtype == RDB_TYPE_ZSET_ZIPLIST || - rdbtype == RDB_TYPE_ZSET_LISTPACK || - rdbtype == RDB_TYPE_HASH_ZIPLIST || - rdbtype == RDB_TYPE_HASH_LISTPACK || - rdbtype == RDB_TYPE_HASH_LISTPACK_EX_PRE_GA || - rdbtype == RDB_TYPE_HASH_LISTPACK_EX) - { - size_t encoded_len; - - /* If Hash TTLs, Load next/min expiration time before the `encoded` */ - if (rdbtype == RDB_TYPE_HASH_LISTPACK_EX) { - uint64_t minExpire = rdbLoadMillisecondTime(rdb, RDB_VERSION); - /* This value was serialized for future use-case of streaming the object - * directly to FLASH (while keeping in mem its next expiration time) */ - UNUSED(minExpire); - if (rioGetReadError(rdb)) { - rdbReportReadError( "Short read of listpackex min expiration time."); - return NULL; - } - } - - unsigned char *encoded = - rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&encoded_len); - if (encoded == NULL) return NULL; - - o = createObject(OBJ_STRING, encoded); /* Obj type fixed below. */ - - /* Fix the object encoding, and make sure to convert the encoded - * data type into the base type if accordingly to the current - * configuration there are too many elements in the encoded data - * type. Note that we only check the length and not max element - * size as this is an O(N) scan. Eventually everything will get - * converted. */ - switch(rdbtype) { - case RDB_TYPE_HASH_ZIPMAP: - /* Since we don't keep zipmaps anymore, the rdb loading for these - * is O(n) anyway, use `deep` validation. */ - if (!zipmapValidateIntegrity(encoded, encoded_len, 1)) { - rdbReportCorruptRDB("Zipmap integrity check failed."); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - return NULL; - } - /* Convert to ziplist encoded hash. This must be deprecated - * when loading dumps created by Redis 2.4 gets deprecated. */ - { - unsigned char *lp = lpNew(0); - unsigned char *zi = zipmapRewind(o->ptr); - unsigned char *fstr, *vstr; - unsigned int flen, vlen; - unsigned int maxlen = 0; - dict *dupSearchDict = dictCreate(&hashDictType); - - while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) { - if (flen > maxlen) maxlen = flen; - if (vlen > maxlen) maxlen = vlen; - - /* search for duplicate records */ - sds field = sdstrynewlen(fstr, flen); - if (!field || dictAdd(dupSearchDict, field, NULL) != DICT_OK || - !lpSafeToAdd(lp, (size_t)flen + vlen)) { - rdbReportCorruptRDB("Hash zipmap with dup elements, or big length (%u)", flen); - dictRelease(dupSearchDict); - sdsfree(field); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - return NULL; - } - - lp = lpAppend(lp, fstr, flen); - lp = lpAppend(lp, vstr, vlen); - } - - dictRelease(dupSearchDict); - zfree(o->ptr); - o->ptr = lp; - o->type = OBJ_HASH; - o->encoding = OBJ_ENCODING_LISTPACK; - - if (hashTypeLength(o, 0) > server.hash_max_listpack_entries || - maxlen > server.hash_max_listpack_value) - { - hashTypeConvert(NULL, o, OBJ_ENCODING_HT); - } - } - break; - case RDB_TYPE_LIST_ZIPLIST: - { - quicklist *ql = quicklistNew(server.list_max_listpack_size, - server.list_compress_depth); - - if (!ziplistValidateIntegrity(encoded, encoded_len, 1, - _listZiplistEntryConvertAndValidate, ql)) - { - rdbReportCorruptRDB("List ziplist integrity check failed."); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - quicklistRelease(ql); - return NULL; - } - - if (ql->len == 0) { - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - quicklistRelease(ql); - goto emptykey; - } - - zfree(encoded); - o->type = OBJ_LIST; - o->ptr = ql; - o->encoding = OBJ_ENCODING_QUICKLIST; - break; - } - case RDB_TYPE_SET_INTSET: - if (deep_integrity_validation) server.stat_dump_payload_sanitizations++; - if (!intsetValidateIntegrity(encoded, encoded_len, deep_integrity_validation)) { - rdbReportCorruptRDB("Intset integrity check failed."); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - return NULL; - } - o->type = OBJ_SET; - o->encoding = OBJ_ENCODING_INTSET; - if (intsetLen(o->ptr) > server.set_max_intset_entries) - setTypeConvert(o, OBJ_ENCODING_HT); - break; - case RDB_TYPE_SET_LISTPACK: - if (deep_integrity_validation) server.stat_dump_payload_sanitizations++; - if (!lpValidateIntegrityAndDups(encoded, encoded_len, deep_integrity_validation, 1)) { - rdbReportCorruptRDB("Set listpack integrity check failed."); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - return NULL; - } - o->type = OBJ_SET; - o->encoding = OBJ_ENCODING_LISTPACK; - - if (setTypeSize(o) == 0) { - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - goto emptykey; - } - if (setTypeSize(o) > server.set_max_listpack_entries) - setTypeConvert(o, OBJ_ENCODING_HT); - break; - case RDB_TYPE_ZSET_ZIPLIST: - { - unsigned char *lp = lpNew(encoded_len); - if (!ziplistPairsConvertAndValidateIntegrity(encoded, encoded_len, &lp)) { - rdbReportCorruptRDB("Zset ziplist integrity check failed."); - zfree(lp); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - return NULL; - } - - zfree(o->ptr); - o->type = OBJ_ZSET; - o->ptr = lp; - o->encoding = OBJ_ENCODING_LISTPACK; - if (zsetLength(o) == 0) { - decrRefCount(o); - goto emptykey; - } - - if (zsetLength(o) > server.zset_max_listpack_entries) - zsetConvert(o, OBJ_ENCODING_SKIPLIST); - else - o->ptr = lpShrinkToFit(o->ptr); - break; - } - case RDB_TYPE_ZSET_LISTPACK: - if (deep_integrity_validation) server.stat_dump_payload_sanitizations++; - if (!lpValidateIntegrityAndDups(encoded, encoded_len, deep_integrity_validation, 2)) { - rdbReportCorruptRDB("Zset listpack integrity check failed."); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - return NULL; - } - o->type = OBJ_ZSET; - o->encoding = OBJ_ENCODING_LISTPACK; - if (zsetLength(o) == 0) { - decrRefCount(o); - goto emptykey; - } - - if (zsetLength(o) > server.zset_max_listpack_entries) - zsetConvert(o, OBJ_ENCODING_SKIPLIST); - break; - case RDB_TYPE_HASH_ZIPLIST: - { - unsigned char *lp = lpNew(encoded_len); - if (!ziplistPairsConvertAndValidateIntegrity(encoded, encoded_len, &lp)) { - rdbReportCorruptRDB("Hash ziplist integrity check failed."); - zfree(lp); - zfree(encoded); - o->ptr = NULL; - decrRefCount(o); - return NULL; - } - - zfree(o->ptr); - o->ptr = lp; - o->type = OBJ_HASH; - o->encoding = OBJ_ENCODING_LISTPACK; - if (hashTypeLength(o, 0) == 0) { - decrRefCount(o); - goto emptykey; - } - - if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) - hashTypeConvert(NULL, o, OBJ_ENCODING_HT); - else - o->ptr = lpShrinkToFit(o->ptr); - break; - } - case RDB_TYPE_HASH_LISTPACK: - case RDB_TYPE_HASH_LISTPACK_EX_PRE_GA: - case RDB_TYPE_HASH_LISTPACK_EX: - /* listpack-encoded hash with TTL requires its own struct - * pointed to by o->ptr */ - o->type = OBJ_HASH; - if ( (rdbtype == RDB_TYPE_HASH_LISTPACK_EX) || - (rdbtype == RDB_TYPE_HASH_LISTPACK_EX_PRE_GA) ) { - listpackEx *lpt = listpackExCreate(); - lpt->lp = encoded; - o->ptr = lpt; - o->encoding = OBJ_ENCODING_LISTPACK_EX; - } else - o->encoding = OBJ_ENCODING_LISTPACK; - - /* tuple_len is the number of elements for each key: - * key + value for simple hash, key + value + tll for hash with TTL*/ - int tuple_len = (rdbtype == RDB_TYPE_HASH_LISTPACK ? 2 : 3); - /* validate read data */ - if (deep_integrity_validation) server.stat_dump_payload_sanitizations++; - if (!lpValidateIntegrityAndDups(encoded, encoded_len, - deep_integrity_validation, tuple_len)) { - rdbReportCorruptRDB("Hash listpack integrity check failed."); - decrRefCount(o); - return NULL; - } - - /* if listpack is empty, delete it */ - if (hashTypeLength(o, 0) == 0) { - decrRefCount(o); - goto emptykey; - } - - /* Convert listpack to hash table without registering in global HFE DS, - * if has HFEs, since the listpack is not connected yet to the DB */ - if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) - hashTypeConvert(NULL /*db*/, o, OBJ_ENCODING_HT); - - break; - default: - /* totally unreachable */ - rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); - break; - } - } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || - rdbtype == RDB_TYPE_STREAM_LISTPACKS_2 || - rdbtype == RDB_TYPE_STREAM_LISTPACKS_3 || - rdbtype == RDB_TYPE_STREAM_LISTPACKS_4) - { - o = createStreamObject(); - stream *s = o->ptr; - uint64_t listpacks = rdbLoadLen(rdb,NULL); - if (listpacks == RDB_LENERR) { - rdbReportReadError("Stream listpacks len loading failed."); - decrRefCount(o); - return NULL; - } - - while(listpacks--) { - /* Get the master ID, the one we'll use as key of the radix tree - * node: the entries inside the listpack itself are delta-encoded - * relatively to this ID. */ - sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); - if (nodekey == NULL) { - rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error."); - decrRefCount(o); - return NULL; - } - if (sdslen(nodekey) != sizeof(streamID)) { - rdbReportCorruptRDB("Stream node key entry is not the " - "size of a stream ID"); - sdsfree(nodekey); - decrRefCount(o); - return NULL; - } - - /* Load the listpack. */ - size_t lp_size; - unsigned char *lp = - rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&lp_size); - if (lp == NULL) { - rdbReportReadError("Stream listpacks loading failed."); - sdsfree(nodekey); - decrRefCount(o); - return NULL; - } - if (deep_integrity_validation) server.stat_dump_payload_sanitizations++; - if (!streamValidateListpackIntegrity(lp, lp_size, deep_integrity_validation)) { - rdbReportCorruptRDB("Stream listpack integrity check failed."); - sdsfree(nodekey); - decrRefCount(o); - zfree(lp); - return NULL; - } - - unsigned char *first = lpFirst(lp); - if (first == NULL) { - /* Serialized listpacks should never be empty, since on - * deletion we should remove the radix tree key if the - * resulting listpack is empty. */ - rdbReportCorruptRDB("Empty listpack inside stream"); - sdsfree(nodekey); - decrRefCount(o); - zfree(lp); - return NULL; - } - - /* Insert the key in the radix tree. */ - int retval = raxTryInsert(s->rax, - (unsigned char*)nodekey,sizeof(streamID),lp,NULL); - sdsfree(nodekey); - if (!retval) { - rdbReportCorruptRDB("Listpack re-added with existing key"); - decrRefCount(o); - zfree(lp); - return NULL; - } - s->alloc_size += lpBytes(lp); - } - /* Load total number of items inside the stream. */ - s->length = rdbLoadLen(rdb,NULL); - - /* Load the last entry ID. */ - s->last_id.ms = rdbLoadLen(rdb,NULL); - s->last_id.seq = rdbLoadLen(rdb,NULL); - - if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { - /* Load the first entry ID. */ - s->first_id.ms = rdbLoadLen(rdb,NULL); - s->first_id.seq = rdbLoadLen(rdb,NULL); - - /* Load the maximal deleted entry ID. */ - s->max_deleted_entry_id.ms = rdbLoadLen(rdb,NULL); - s->max_deleted_entry_id.seq = rdbLoadLen(rdb,NULL); - - /* Load the offset. */ - s->entries_added = rdbLoadLen(rdb,NULL); - } else { - /* During migration the offset can be initialized to the stream's - * length. At this point, we also don't care about tombstones - * because CG offsets will be later initialized as well. */ - s->max_deleted_entry_id.ms = 0; - s->max_deleted_entry_id.seq = 0; - s->entries_added = s->length; - - /* Since the rax is already loaded, we can find the first entry's - * ID. */ - streamGetEdgeID(s,1,1,&s->first_id); - } - - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream object metadata loading failed."); - decrRefCount(o); - return NULL; - } - - if (s->length && !raxSize(s->rax)) { - rdbReportCorruptRDB("Stream length inconsistent with rax entries"); - decrRefCount(o); - return NULL; - } - - /* Consumer groups loading */ - uint64_t cgroups_count = rdbLoadLen(rdb,NULL); - if (cgroups_count == RDB_LENERR) { - rdbReportReadError("Stream cgroup count loading failed."); - decrRefCount(o); - return NULL; - } - while(cgroups_count--) { - /* Get the consumer group name and ID. We can then create the - * consumer group ASAP and populate its structure as - * we read more data. */ - streamID cg_id; - sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); - if (cgname == NULL) { - rdbReportReadError( - "Error reading the consumer group name from Stream"); - decrRefCount(o); - return NULL; - } - - cg_id.ms = rdbLoadLen(rdb,NULL); - cg_id.seq = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream cgroup ID loading failed."); - sdsfree(cgname); - decrRefCount(o); - return NULL; - } - - /* Load group offset. */ - uint64_t cg_offset; - if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { - cg_offset = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream cgroup offset loading failed."); - sdsfree(cgname); - decrRefCount(o); - return NULL; - } - } else { - cg_offset = streamEstimateDistanceFromFirstEverEntry(s,&cg_id); - } - - streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id,cg_offset); - if (cgroup == NULL) { - rdbReportCorruptRDB("Duplicated consumer group name %s", - cgname); - decrRefCount(o); - sdsfree(cgname); - return NULL; - } - sdsfree(cgname); - - /* Load the global PEL for this consumer group, however we'll - * not yet populate the NACK structures with the message - * owner, since consumers for this group and their messages will - * be read as a next step. So for now leave them not resolved - * and later populate it. */ - uint64_t pel_size = rdbLoadLen(rdb,NULL); - if (pel_size == RDB_LENERR) { - rdbReportReadError("Stream PEL size loading failed."); - decrRefCount(o); - return NULL; - } - while(pel_size--) { - unsigned char rawid[sizeof(streamID)]; - if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { - rdbReportReadError("Stream PEL ID loading failed."); - decrRefCount(o); - return NULL; - } - streamNACK *nack = streamCreateNACK(s, NULL); - nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); - nack->delivery_count = rdbLoadLen(rdb,NULL); - nack->cgroup_ref_node = streamLinkCGroupToEntry(s, cgroup, rawid); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream PEL NACK loading failed."); - streamFreeNACK(s, nack); - decrRefCount(o); - return NULL; - } - if (!raxTryInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) { - rdbReportCorruptRDB("Duplicated global PEL entry " - "loading stream consumer group"); - streamFreeNACK(s, nack); - decrRefCount(o); - return NULL; - } - - streamID id; - streamDecodeID(rawid, &id); - raxInsertPelByTime(cgroup->pel_by_time, nack->delivery_time, &id); - } - - /* Now that we loaded our global PEL, we need to load the - * consumers and their local PELs. */ - uint64_t consumers_num = rdbLoadLen(rdb,NULL); - if (consumers_num == RDB_LENERR) { - rdbReportReadError("Stream consumers num loading failed."); - decrRefCount(o); - return NULL; - } - while(consumers_num--) { - sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); - if (cname == NULL) { - rdbReportReadError( - "Error reading the consumer name from Stream group."); - decrRefCount(o); - return NULL; - } - streamConsumer *consumer = streamCreateConsumer(s,cgroup,cname,NULL,0, - SCC_NO_NOTIFY|SCC_NO_DIRTIFY); - sdsfree(cname); - if (!consumer) { - rdbReportCorruptRDB("Duplicate stream consumer detected."); - decrRefCount(o); - return NULL; - } - - consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream short read reading seen time."); - decrRefCount(o); - return NULL; - } - - if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_3) { - consumer->active_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream short read reading active time."); - decrRefCount(o); - return NULL; - } - } else { - /* That's the best estimate we got */ - consumer->active_time = consumer->seen_time; - } - - /* Load the PEL about entries owned by this specific - * consumer. */ - pel_size = rdbLoadLen(rdb,NULL); - if (pel_size == RDB_LENERR) { - rdbReportReadError( - "Stream consumer PEL num loading failed."); - decrRefCount(o); - return NULL; - } - while(pel_size--) { - unsigned char rawid[sizeof(streamID)]; - if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { - rdbReportReadError( - "Stream short read reading PEL streamID."); - decrRefCount(o); - return NULL; - } - void *result; - if (!raxFind(cgroup->pel,rawid,sizeof(rawid),&result)) { - rdbReportCorruptRDB("Consumer entry not found in " - "group global PEL"); - decrRefCount(o); - return NULL; - } - streamNACK *nack = result; - - /* Set the NACK consumer, that was left to NULL when - * loading the global PEL. Then set the same shared - * NACK structure also in the consumer-specific PEL. */ - nack->consumer = consumer; - if (!raxTryInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL)) { - rdbReportCorruptRDB("Duplicated consumer PEL entry " - " loading a stream consumer " - "group"); - streamFreeNACK(s, nack); - decrRefCount(o); - return NULL; - } - } - } - - /* Verify that each PEL eventually got a consumer assigned to it. */ - if (deep_integrity_validation) { - raxIterator ri_cg_pel; - raxStart(&ri_cg_pel,cgroup->pel); - raxSeek(&ri_cg_pel,"^",NULL,0); - while(raxNext(&ri_cg_pel)) { - streamNACK *nack = ri_cg_pel.data; - if (!nack->consumer) { - raxStop(&ri_cg_pel); - rdbReportCorruptRDB("Stream CG PEL entry without consumer"); - decrRefCount(o); - return NULL; - } - } - raxStop(&ri_cg_pel); - } - } - - /* Load IDMP (Idempotent Message Producer) configuration and entries - * for RDB_TYPE_STREAM_LISTPACKS_4 and above. */ - if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_4) { - /* Load IDMP duration. */ - s->idmp_duration = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream IDMP duration loading failed."); - decrRefCount(o); - return NULL; - } - if (s->idmp_duration < CONFIG_STREAM_IDMP_MIN_DURATION || - s->idmp_duration > CONFIG_STREAM_IDMP_MAX_DURATION) { - rdbReportCorruptRDB("Stream IDMP duration out of range"); - decrRefCount(o); - return NULL; - } - - /* Load IDMP max entries. */ - s->idmp_max_entries = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream IDMP max entries loading failed."); - decrRefCount(o); - return NULL; - } - if (s->idmp_max_entries < CONFIG_STREAM_IDMP_MIN_MAXSIZE || - s->idmp_max_entries > CONFIG_STREAM_IDMP_MAX_MAXSIZE) { - rdbReportCorruptRDB("Stream IDMP max entries out of range"); - decrRefCount(o); - return NULL; - } - - /* Load all IDMP entries. */ - if (rdbLoadStreamIdmpEntries(rdb,s) == -1) { - rdbReportReadError("Stream IDMP entries loading failed."); - decrRefCount(o); - return NULL; - } - - /* Load all-time count of IIDs added. */ - s->iids_added = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream iids_added loading failed."); - decrRefCount(o); - return NULL; - } - - /* Load all-time count of duplicate IIDs detected. */ - s->iids_duplicates = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) { - rdbReportReadError("Stream iids_duplicates loading failed."); - decrRefCount(o); - return NULL; - } - } - } else if (rdbtype == RDB_TYPE_MODULE_PRE_GA) { - rdbReportCorruptRDB("Pre-release module format not supported"); - return NULL; - } else if (rdbtype == RDB_TYPE_MODULE_2) { - uint64_t moduleid = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) { - rdbReportReadError("Short read module id"); - return NULL; - } - moduleType *mt = moduleTypeLookupModuleByID(moduleid); - - if (rdbCheckMode) { - char name[10]; - moduleTypeNameByID(name,moduleid); - return rdbLoadCheckModuleValue(rdb,name); - } - - if (mt == NULL) { - char name[10]; - moduleTypeNameByID(name,moduleid); - rdbReportCorruptRDB("The RDB file contains module data I can't load: no matching module type '%s'", name); - return NULL; - } - RedisModuleIO io; - robj keyobj; - initStaticStringObject(keyobj,key); - moduleInitIOContext(&io, &mt->entity, rdb, &keyobj, dbid); - /* Call the rdb_load method of the module providing the 10 bit - * encoding version in the lower 10 bits of the module ID. */ - void *ptr = mt->rdb_load(&io,moduleid&1023); - if (io.ctx) { - moduleFreeContext(io.ctx); - zfree(io.ctx); - } - - /* Module v2 serialization has an EOF mark at the end. */ - uint64_t eof = rdbLoadLen(rdb,NULL); - if (eof == RDB_LENERR) { - if (ptr) { - o = createModuleObject(mt, ptr); /* creating just in order to easily destroy */ - decrRefCount(o); - } - return NULL; - } - if (eof != RDB_MODULE_OPCODE_EOF) { - rdbReportCorruptRDB("The RDB file contains module data for the module '%s' that is not terminated by " - "the proper module value EOF marker", moduleTypeModuleName(mt)); - if (ptr) { - o = createModuleObject(mt, ptr); /* creating just in order to easily destroy */ - decrRefCount(o); - } - return NULL; - } - - if (ptr == NULL) { - rdbReportCorruptRDB("The RDB file contains module data for the module type '%s', that the responsible " - "module is not able to load. Check for modules log above for additional clues.", - moduleTypeModuleName(mt)); - return NULL; - } - o = createModuleObject(mt, ptr); - } else { - rdbReportReadError("Unknown RDB encoding type %d",rdbtype); - return NULL; - } - - if (error) *error = 0; - return o; - -emptykey: - if (error) *error = RDB_LOAD_ERR_EMPTY_KEY; - return NULL; -} - -/* Mark that we are loading in the global state and setup the fields - * needed to provide loading stats. */ -void startLoading(size_t size, int rdbflags, int async) { - loadingSetFlags(NULL, size, async); - loadingFireEvent(rdbflags); -} - -/* Initialize stats, set loading flags and filename if provided. */ -void loadingSetFlags(char *filename, size_t size, int async) { - rdbFileBeingLoaded = filename; - server.loading = 1; - if (async == 1) server.async_loading = 1; - server.loading_start_time = time(NULL); - server.loading_loaded_bytes = 0; - server.loading_total_bytes = size; - server.loading_rdb_used_mem = 0; - server.rdb_last_load_keys_expired = 0; - server.rdb_last_load_keys_loaded = 0; - blockingOperationStarts(); -} - -void loadingFireEvent(int rdbflags) { - /* Fire the loading modules start event. */ - int subevent; - if (rdbflags & RDBFLAGS_AOF_PREAMBLE) - subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START; - else if(rdbflags & RDBFLAGS_REPLICATION) - subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START; - else - subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START; - moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL); -} - -/* Mark that we are loading in the global state and setup the fields - * needed to provide loading stats. - * 'filename' is optional and used for rdb-check on error */ -void startLoadingFile(size_t size, char* filename, int rdbflags) { - loadingSetFlags(filename, size, 0); - loadingFireEvent(rdbflags); -} - -/* Refresh the absolute loading progress info */ -void loadingAbsProgress(off_t pos) { - server.loading_loaded_bytes = pos; - updatePeakMemory(); -} - -/* Refresh the incremental loading progress info */ -void loadingIncrProgress(off_t size) { - server.loading_loaded_bytes += size; - updatePeakMemory(); -} - -/* Update the file name currently being loaded */ -void updateLoadingFileName(char* filename) { - rdbFileBeingLoaded = filename; -} - -/* Loading finished */ -void stopLoading(int success) { - server.loading = 0; - server.async_loading = 0; - blockingOperationEnds(); - rdbFileBeingLoaded = NULL; - - /* Fire the loading modules end event. */ - moduleFireServerEvent(REDISMODULE_EVENT_LOADING, - success? - REDISMODULE_SUBEVENT_LOADING_ENDED: - REDISMODULE_SUBEVENT_LOADING_FAILED, - NULL); -} - -void startSaving(int rdbflags) { - /* Fire the persistence modules start event. */ - int subevent; - if (rdbflags & RDBFLAGS_AOF_PREAMBLE && getpid() != server.pid) - subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START; - else if (rdbflags & RDBFLAGS_AOF_PREAMBLE) - subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_AOF_START; - else if (getpid()!=server.pid) - subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START; - else - subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START; - moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL); -} - -void stopSaving(int success) { - /* Fire the persistence modules end event. */ - moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE, - success? - REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: - REDISMODULE_SUBEVENT_PERSISTENCE_FAILED, - NULL); -} - -/* Track loading progress in order to serve client's from time to time - and if needed calculate rdb checksum */ -void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { - if (server.rdb_checksum) - rioGenericUpdateChecksum(r, buf, len); - if (server.loading_process_events_interval_bytes && - (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) - { - if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) - replicationSendNewlineToMaster(); - loadingAbsProgress(r->processed_bytes); - processEventsWhileBlocked(); - processModuleLoadingProgressEvent(0); - } - if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) { - atomicIncr(server.stat_net_repl_input_bytes, len); - } -} - -/* Save the given functions_ctx to the rdb. - * The err output parameter is optional and will be set with relevant error - * message on failure, it is the caller responsibility to free the error - * message on failure. - * - * The lib_ctx argument is also optional. If NULL is given, only verify rdb - * structure with out performing the actual functions loading. */ -int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx* lib_ctx, int rdbflags, sds *err) { - UNUSED(ver); - sds error = NULL; - sds final_payload = NULL; - int res = C_ERR; - if (!(final_payload = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - error = sdsnew("Failed loading library payload"); - goto done; - } - - if (lib_ctx) { - sds library_name = NULL; - if (!(library_name = functionsCreateWithLibraryCtx(final_payload, rdbflags & RDBFLAGS_ALLOW_DUP, &error, lib_ctx, 0))) { - if (!error) { - error = sdsnew("Failed creating the library"); - } - goto done; - } - sdsfree(library_name); - } - - res = C_OK; - -done: - if (final_payload) sdsfree(final_payload); - if (error) { - if (err) { - *err = error; - } else { - serverLog(LL_WARNING, "Failed creating function, %s", error); - sdsfree(error); - } - } - return res; -} - -/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, - * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { - functionsLibCtx* functions_lib_ctx = functionsLibCtxGetCurrent(); - rdbLoadingCtx loading_ctx = { .dbarray = server.db, .functions_lib_ctx = functions_lib_ctx }; - int retval = rdbLoadRioWithLoadingCtx(rdb,rdbflags,rsi,&loading_ctx); - return retval; -} - -/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, - * otherwise C_ERR is returned. - * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, - * currently it only allow to set db object and functionLibCtx to which the data - * will be loaded (in the future it might contains more such objects). */ -int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) { - uint64_t dbid = 0; - int type, rdbver; - uint64_t db_size = 0, expires_size = 0; - int should_expand_db = 0; - redisDb *db = rdb_loading_ctx->dbarray+0; - char buf[1024]; - int error; - long long empty_keys_skipped = 0; - - rdb->update_cksum = rdbLoadProgressCallback; - rdb->max_processing_chunk = server.loading_process_events_interval_bytes; - if (rioRead(rdb,buf,9) == 0) goto eoferr; - buf[9] = '\0'; - if (memcmp(buf,"REDIS",5) != 0) { - serverLog(LL_WARNING,"Wrong signature trying to load DB from file"); - return C_ERR; - } - rdbver = atoi(buf+5); - if (rdbver < 1 || rdbver > RDB_VERSION) { - serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver); - return C_ERR; - } - - /* Key-specific attributes, set by opcodes before the key type. */ - long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); - long long lru_clock = LRU_CLOCK(); - KeyMetaSpec keyMeta; /* Updated by OPCODE_KEY_META and OPCODE_EXPIRETIME */ - keyMetaSpecInit(&keyMeta); - - while(1) { - sds key; - robj *val; - - /* Read type. */ - if ((type = rdbLoadType(rdb)) == -1) goto eoferr; - - /* Handle special types. */ - if (type == RDB_OPCODE_EXPIRETIME) { - /* EXPIRETIME: load an expire associated with the next key - * to load. Note that after loading an expire we need to - * load the actual type, and continue. */ - expiretime = rdbLoadTime(rdb); - expiretime *= 1000; - keyMetaSpecAdd(&keyMeta, KEY_META_ID_EXPIRE, expiretime); - if (rioGetReadError(rdb)) goto eoferr; - continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_EXPIRETIME_MS) { - /* EXPIRETIME_MS: milliseconds precision expire times introduced - * with RDB v3. Like EXPIRETIME but no with more precision. */ - expiretime = rdbLoadMillisecondTime(rdb,rdbver); - keyMetaSpecAdd(&keyMeta, KEY_META_ID_EXPIRE, expiretime); - if (rioGetReadError(rdb)) goto eoferr; - continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_FREQ) { - /* FREQ: LFU frequency. */ - uint8_t byte; - if (rioRead(rdb,&byte,1) == 0) goto eoferr; - lfu_freq = byte; - continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_IDLE) { - /* IDLE: LRU idle time. */ - uint64_t qword; - if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; - lru_idle = qword; - continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_EOF) { - /* EOF: End of file, exit the main loop. */ - break; - } else if (type == RDB_OPCODE_SELECTDB) { - /* SELECTDB: Select the specified database. */ - if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; - if (dbid >= (unsigned)server.dbnum) { - serverLog(LL_WARNING, - "FATAL: Data file was created with a Redis " - "server configured to handle more than %d " - "databases. Exiting\n", server.dbnum); - exit(1); - } - db = rdb_loading_ctx->dbarray+dbid; - continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_RESIZEDB) { - /* RESIZEDB: Hint about the size of the keys in the currently - * selected data base, in order to avoid useless rehashing. */ - if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) - goto eoferr; - if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) - goto eoferr; - should_expand_db = 1; - continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_SLOT_INFO) { - uint64_t slot_id, slot_size, expires_slot_size; - if ((slot_id = rdbLoadLen(rdb,NULL)) == RDB_LENERR) - goto eoferr; - if ((slot_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) - goto eoferr; - if ((expires_slot_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) - goto eoferr; - if (!server.cluster_enabled) { - continue; /* Ignore gracefully. */ - } - /* In cluster mode we resize individual slot specific dictionaries based on the number of keys that slot holds. */ - kvstoreDictExpand(db->keys, slot_id, slot_size); - kvstoreDictExpand(db->expires, slot_id, expires_slot_size); - should_expand_db = 0; - continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_AUX) { - /* AUX: generic string-string fields. Use to add state to RDB - * which is backward compatible. Implementations of RDB loading - * are required to skip AUX fields they don't understand. - * - * An AUX field is composed of two strings: key and value. */ - robj *auxkey, *auxval; - if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr; - if ((auxval = rdbLoadStringObject(rdb)) == NULL) { - decrRefCount(auxkey); - goto eoferr; - } - - if (((char*)auxkey->ptr)[0] == '%') { - /* All the fields with a name staring with '%' are considered - * information fields and are logged at startup with a log - * level of NOTICE. */ - serverLog(LL_NOTICE,"RDB '%s': %s", - (char*)auxkey->ptr, - (char*)auxval->ptr); - } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) { - if (rsi) rsi->repl_stream_db = atoi(auxval->ptr); - } else if (!strcasecmp(auxkey->ptr,"repl-id")) { - if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) { - memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1); - rsi->repl_id_is_set = 1; - } - } else if (!strcasecmp(auxkey->ptr,"repl-offset")) { - if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10); - } else if (!strcasecmp(auxkey->ptr,"lua")) { - /* Won't load the script back in memory anymore. */ - } else if (!strcasecmp(auxkey->ptr,"redis-ver")) { - serverLog(LL_NOTICE,"Loading RDB produced by version %s", - (char*)auxval->ptr); - } else if (!strcasecmp(auxkey->ptr,"ctime")) { - time_t age = time(NULL)-strtol(auxval->ptr,NULL,10); - if (age < 0) age = 0; - serverLog(LL_NOTICE,"RDB age %ld seconds", - (unsigned long) age); - } else if (!strcasecmp(auxkey->ptr,"used-mem")) { - long long usedmem = strtoll(auxval->ptr,NULL,10); - serverLog(LL_NOTICE,"RDB memory usage when created %.2f Mb", - (double) usedmem / (1024*1024)); - server.loading_rdb_used_mem = usedmem; - } else if (!strcasecmp(auxkey->ptr,"aof-preamble")) { - long long haspreamble = strtoll(auxval->ptr,NULL,10); - if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail"); - } else if (!strcasecmp(auxkey->ptr, "aof-base")) { - long long isbase = strtoll(auxval->ptr, NULL, 10); - if (isbase) serverLog(LL_NOTICE, "RDB is base AOF"); - } else if (!strcasecmp(auxkey->ptr,"cluster-asm-task")) { - asmReplicaHandleMasterTask(auxval->ptr); - } else if (!strcasecmp(auxkey->ptr,"redis-bits")) { - /* Just ignored. */ - } else { - /* We ignore fields we don't understand, as by AUX field - * contract. */ - serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'", - (char*)auxkey->ptr); - } - - decrRefCount(auxkey); - decrRefCount(auxval); - continue; /* Read type again. */ - } else if (type == RDB_OPCODE_MODULE_AUX) { - /* Load module data that is not related to the Redis key space. - * Such data can be potentially be stored both before and after the - * RDB keys-values section. */ - uint64_t moduleid = rdbLoadLen(rdb,NULL); - int when_opcode = rdbLoadLen(rdb,NULL); - int when = rdbLoadLen(rdb,NULL); - if (rioGetReadError(rdb)) goto eoferr; - if (when_opcode != RDB_MODULE_OPCODE_UINT) { - rdbReportReadError("bad when_opcode"); - goto eoferr; - } - moduleType *mt = moduleTypeLookupModuleByID(moduleid); - char name[10]; - moduleTypeNameByID(name,moduleid); - - if (!rdbCheckMode && mt == NULL) { - /* Unknown module. */ - serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name); - exit(1); - } else if (!rdbCheckMode && mt != NULL) { - if (!mt->aux_load) { - /* Module doesn't support AUX. */ - serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name); - exit(1); - } - - RedisModuleIO io; - moduleInitIOContext(&io, &mt->entity, rdb, NULL, -1); - /* Call the rdb_load method of the module providing the 10 bit - * encoding version in the lower 10 bits of the module ID. */ - int rc = mt->aux_load(&io,moduleid&1023, when); - if (io.ctx) { - moduleFreeContext(io.ctx); - zfree(io.ctx); - } - if (rc != REDISMODULE_OK || io.error) { - moduleTypeNameByID(name,moduleid); - serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name); - goto eoferr; - } - uint64_t eof = rdbLoadLen(rdb,NULL); - if (eof != RDB_MODULE_OPCODE_EOF) { - serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name); - goto eoferr; - } - continue; - } else { - /* RDB check mode. */ - robj *aux = rdbLoadCheckModuleValue(rdb,name); - decrRefCount(aux); - continue; /* Read next opcode. */ - } - } else if (type == RDB_OPCODE_FUNCTION_PRE_GA) { - rdbReportCorruptRDB("Pre-release function format not supported."); - exit(1); - } else if (type == RDB_OPCODE_FUNCTION2) { - sds err = NULL; - if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_lib_ctx, rdbflags, &err) != C_OK) { - serverLog(LL_WARNING,"Failed loading library, %s", err); - sdsfree(err); - goto eoferr; - } - continue; - } - - /* If there is no slot info, it means that it's either not cluster mode or we are trying to load legacy RDB file. - * In this case we want to estimate number of keys per slot and resize accordingly. */ - if (should_expand_db) { - dbExpand(db, db_size, 0); - dbExpandExpires(db, expires_size, 0); - should_expand_db = 0; - } - - /* With metadata, type = RDB_OPCODE_KEY_META. Layout: [<META>,]<TYPE>,<KEY>,<VALUE> */ - if (rdbResolveKeyType(rdb, &type, dbid, &keyMeta) == -1) - goto eoferr; - - /* Read key */ - if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { - keyMetaSpecCleanup(&keyMeta); - goto eoferr; - } - /* Read value */ - val = rdbLoadObject(type,rdb,key,db->id,&error); - - /* Check if the key already expired. This function is used when loading - * an RDB file from disk, either at startup, or when an RDB was - * received from the master. In the latter case, the master is - * responsible for key expiry. If we would expire keys here, the - * snapshot taken by the master may not be reflected on the slave. - * Similarly, if the base AOF is RDB format, we want to load all - * the keys they are, since the log of operations in the incr AOF - * is assumed to work in the exact keyspace state. */ - if (val == NULL) { - keyMetaSpecCleanup(&keyMeta); - /* Since we used to have bug that could lead to empty keys - * (See #8453), we rather not fail when empty key is encountered - * in an RDB file, instead we will silently discard it and - * continue loading. */ - if (error == RDB_LOAD_ERR_EMPTY_KEY) { - if(empty_keys_skipped++ < 10) - serverLog(LL_NOTICE, "rdbLoadObject skipping empty key: %s", key); - sdsfree(key); - } else { - sdsfree(key); - goto eoferr; - } - } else if (iAmMaster() && - !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && - expiretime != -1 && expiretime < now) - { - if (rdbflags & RDBFLAGS_FEED_REPL) { - /* Caller should have created replication backlog, - * and now this path only works when rebooting, - * so we don't have replicas yet. */ - serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0); - robj keyobj; - initStaticStringObject(keyobj,key); - robj *argv[2]; - argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del; - argv[1] = &keyobj; - replicationFeedSlaves(server.slaves,dbid,argv,2); - } - sdsfree(key); - decrRefCount(val); - keyMetaSpecCleanup(&keyMeta); - server.rdb_last_load_keys_expired++; - } else { - robj keyobj; - initStaticStringObject(keyobj,key); - - /* Add the new object in the hash table */ - kvobj *kv = dbAddRDBLoad(db, key, &val, &keyMeta); - server.rdb_last_load_keys_loaded++; - if (!kv) { - if (rdbflags & RDBFLAGS_ALLOW_DUP) { - /* This flag is useful for DEBUG RELOAD special modes. - * When it's set we allow new keys to replace the current - * keys with the same name. */ - dbSyncDelete(db,&keyobj); - kv = dbAddRDBLoad(db, key, &val, &keyMeta); - serverAssert(kv != NULL); - } else { - serverLog(LL_WARNING, - "RDB has duplicated key '%s' in DB %d",key,db->id); - serverPanic("Duplicated key found in RDB file"); - } - } - - /* If minExpiredField was set, then the object is hash with expiration - * on fields and need to register it in global HFE DS */ - if (kv->type == OBJ_HASH) { - uint64_t minExpiredField = hashTypeGetMinExpire(kv, 1); - if (minExpiredField != EB_EXPIRE_TIME_INVALID) - estoreAdd(db->subexpires, getKeySlot(key), kv, minExpiredField); - } - - /* Set usage information (for eviction). */ - objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); - - /* call key space notification on key loaded for modules only */ - moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id); - - /* Release key (sds), dictEntry stores a copy of it in embedded data */ - sdsfree(key); - } - - /* Loading the database more slowly is useful in order to test - * certain edge cases. */ - if (server.key_load_delay) - debugDelay(server.key_load_delay); - - /* Reset the state that is key-specified and is populated by - * opcodes before the key, so that we start from scratch again. */ - expiretime = -1; - lfu_freq = -1; - lru_idle = -1; - keyMetaSpecInit(&keyMeta); - } - /* Verify the checksum if RDB version is >= 5 */ - if (rdbver >= 5) { - uint64_t cksum, expected = rdb->cksum; - - if (rioRead(rdb,&cksum,8) == 0) goto eoferr; - if (server.rdb_checksum && !server.skip_checksum_validation) { - memrev64ifbe(&cksum); - if (cksum == 0) { - serverLog(LL_NOTICE,"RDB file was saved with checksum disabled: no check performed."); - } else if (cksum != expected) { - serverLog(LL_WARNING,"Wrong RDB checksum expected: (%llx) but " - "got (%llx). Aborting now.", - (unsigned long long)expected, - (unsigned long long)cksum); - rdbReportCorruptRDB("RDB CRC error"); - return C_ERR; - } - } - } - - if (empty_keys_skipped) { - serverLog(LL_NOTICE, - "Done loading RDB, keys loaded: %lld, keys expired: %lld, empty keys skipped: %lld.", - server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, empty_keys_skipped); - } else { - serverLog(LL_NOTICE, - "Done loading RDB, keys loaded: %lld, keys expired: %lld.", - server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired); - } - return C_OK; - - /* Unexpected end of file is handled here calling rdbReportReadError(): - * this will in turn either abort Redis in most cases, or if we are loading - * the RDB file from a socket during initial SYNC (diskless replica mode), - * we'll report the error to the caller, so that we can retry. */ -eoferr: - serverLog(LL_WARNING, - "Short read or OOM loading DB. Unrecoverable error, aborting now."); - rdbReportReadError("Unexpected EOF reading RDB file"); - return C_ERR; -} - -int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { - return rdbLoadWithEmptyFunc(filename, rsi, rdbflags, NULL); -} - -int slotSnapshotSaveRio(int req, rio *rdb, int *error); - -/* Like rdbLoadRio() but takes a filename instead of a rio stream. The - * filename is open for reading and a rio stream object created in order - * to do the actual loading. Moreover the ETA displayed in the INFO - * output is initialized and finalized. - * - * If you pass an 'rsi' structure initialized with RDB_SAVE_INFO_INIT, the - * loading code will fill the information fields in the structure. - * - * If emptyDbFunc is not NULL, it will be called to flush old db or to - * discard partial db on error. */ -int rdbLoadWithEmptyFunc(char *filename, rdbSaveInfo *rsi, int rdbflags, void (*emptyDbFunc)(void)) { - FILE *fp; - rio rdb; - int retval; - struct stat sb; - int rdb_fd; - - fp = fopen(filename, "r"); - if (fp == NULL) { - if (errno == ENOENT) return RDB_NOT_EXIST; - - serverLog(LL_WARNING,"Fatal error: can't open the RDB file %s for reading: %s", filename, strerror(errno)); - return RDB_FAILED; - } - - if (fstat(fileno(fp), &sb) == -1) - sb.st_size = 0; - - loadingSetFlags(filename, sb.st_size, 0); - /* Note that inside loadingSetFlags(), server.loading is set. - * emptyDbCallback() may yield back to event-loop to reply -LOADING. */ - if (emptyDbFunc) - emptyDbFunc(); /* Flush existing db. */ - loadingFireEvent(rdbflags); - rioInitWithFile(&rdb,fp); - - retval = rdbLoadRio(&rdb,rdbflags,rsi); - - fclose(fp); - if (retval != C_OK && emptyDbFunc) - emptyDbFunc(); /* Clean up partial db. */ - - stopLoading(retval==C_OK); - /* Reclaim the cache backed by rdb */ - if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) { - /* TODO: maybe we could combine the fopen and open into one in the future */ - rdb_fd = open(filename, O_RDONLY); - if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1); - } - return (retval==C_OK) ? RDB_OK : RDB_FAILED; -} - -/* A background saving child (BGSAVE) terminated its work. Handle this. - * This function covers the case of actual BGSAVEs. */ -static void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal, time_t save_end) { - if (!bysignal && exitcode == 0) { - serverLog(LL_NOTICE, - "Background saving terminated with success"); - server.dirty = server.dirty - server.dirty_before_bgsave; - server.lastsave = save_end; - server.lastbgsave_status = C_OK; - server.stat_rdb_consecutive_failures = 0; - } else if (!bysignal && exitcode != 0) { - serverLog(LL_WARNING, "Background saving error"); - server.lastbgsave_status = C_ERR; - server.stat_rdb_consecutive_failures++; - } else { - mstime_t latency; - - serverLog(LL_WARNING, - "Background saving terminated by signal %d", bysignal); - latencyStartMonitor(latency); - rdbRemoveTempFile(server.child_pid, 0); - latencyEndMonitor(latency); - latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency); - /* SIGUSR1 is whitelisted, so we have a way to kill a child without - * triggering an error condition. */ - if (bysignal != SIGUSR1) { - server.lastbgsave_status = C_ERR; - server.stat_rdb_consecutive_failures++; - } - } -} - -/* A background saving child (BGSAVE) terminated its work. Handle this. - * This function covers the case of RDB -> Slaves socket transfers for - * diskless replication. */ -static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { - if (!bysignal && exitcode == 0) { - serverLog(LL_NOTICE, - "Background RDB transfer terminated with success"); - } else if (!bysignal && exitcode != 0) { - serverLog(LL_WARNING, "Background transfer error"); - } else { - serverLog(LL_WARNING, - "Background transfer terminated by signal %d", bysignal); - } - if (server.rdb_child_exit_pipe!=-1) - close(server.rdb_child_exit_pipe); - if (server.rdb_pipe_read != -1) { - aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); - close(server.rdb_pipe_read); - } - server.rdb_child_exit_pipe = -1; - server.rdb_pipe_read = -1; - zfree(server.rdb_pipe_conns); - server.rdb_pipe_conns = NULL; - server.rdb_pipe_numconns = 0; - server.rdb_pipe_numconns_writing = 0; - zfree(server.rdb_pipe_buff); - server.rdb_pipe_buff = NULL; - server.rdb_pipe_bufflen = 0; -} - -/* When a background RDB saving/transfer terminates, call the right handler. */ -void backgroundSaveDoneHandler(int exitcode, int bysignal) { - int type = server.rdb_child_type; - time_t save_end = time(NULL); - if (server.bgsave_aborted) - bysignal = SIGUSR1; - switch(server.rdb_child_type) { - case RDB_CHILD_TYPE_DISK: - backgroundSaveDoneHandlerDisk(exitcode,bysignal,save_end); - break; - case RDB_CHILD_TYPE_SOCKET: - backgroundSaveDoneHandlerSocket(exitcode,bysignal); - break; - default: - serverPanic("Unknown RDB child type."); - break; - } - - server.rdb_child_type = RDB_CHILD_TYPE_NONE; - server.rdb_save_time_last = save_end-server.rdb_save_time_start; - server.rdb_save_time_start = -1; - server.bgsave_aborted = 0; - /* Possibly there are slaves waiting for a BGSAVE in order to be served - * (the first stage of SYNC is a bulk transfer of dump.rdb) */ - updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, type); -} - -/* Kill the RDB saving child using SIGUSR1 (so that the parent will know - * the child did not exit for an error, but because we wanted), and performs - * the cleanup needed. */ -void killRDBChild(void) { - kill(server.child_pid, SIGUSR1); - /* Because we are not using here waitpid (like we have in killAppendOnlyChild - * and TerminateModuleForkChild), all the cleanup operations is done by - * checkChildrenDone, that later will find that the process killed. - * This includes: - * - resetChildState - * - rdbRemoveTempFile */ - - /* However, there's a chance the child already exited (or about to exit), and will - * not receive the signal, in that case it could result in success and the done - * handler will override some server metrics (e.g. the dirty counter) which it - * shouldn't (e.g. in case of FLUSHALL), or the synchronously created RDB file. */ - server.bgsave_aborted = 1; -} - -/* Spawn an RDB child that writes the RDB to the sockets of the slaves - * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ -int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { - listNode *ln; - listIter li; - pid_t childpid; - int pipefds[2], rdb_pipe_write = 0, safe_to_exit_pipe = 0; - int rdb_channel = server.repl_rdb_channel && (req & SLAVE_REQ_RDB_CHANNEL); - int slots_req = req & SLAVE_REQ_SLOTS_SNAPSHOT; - - if (hasActiveChildProcess()) return C_ERR; - - /* Even if the previous fork child exited, don't start a new one until we - * drained the pipe. */ - if (server.rdb_pipe_conns) return C_ERR; - - if (!rdb_channel) { - /* Before to fork, create a pipe that is used to transfer the rdb bytes to - * the parent, we can't let it write directly to the sockets, since in case - * of TLS we must let the parent handle a continuous TLS state when the - * child terminates and parent takes over. */ - if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR; - server.rdb_pipe_read = pipefds[0]; /* read end */ - rdb_pipe_write = pipefds[1]; /* write end */ - - /* create another pipe that is used by the parent to signal to the child - * that it can exit. */ - if (anetPipe(pipefds, 0, 0) == -1) { - close(rdb_pipe_write); - close(server.rdb_pipe_read); - return C_ERR; - } - safe_to_exit_pipe = pipefds[0]; /* read end */ - server.rdb_child_exit_pipe = pipefds[1]; /* write end */ - } - - /* Collect the connections of the replicas we want to transfer - * the RDB to, which are in WAIT_BGSAVE_START state. */ - int numconns = 0; - connection **conns = zmalloc(sizeof(*conns) * listLength(server.slaves)); - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - /* Check slave has the exact requirements */ - if (slave->slave_req != req) - continue; - replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); - conns[numconns++] = slave->conn; - if (rdb_channel) { - /* Put the socket in blocking mode to simplify RDB transfer. */ - connSendTimeout(slave->conn, server.repl_timeout * 1000); - connBlock(slave->conn); - } - } - } - - if (!rdb_channel) { - server.rdb_pipe_conns = conns; - server.rdb_pipe_numconns = numconns; - server.rdb_pipe_numconns_writing = 0; - } - - /* Create the child process. */ - if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) { - /* Child */ - int retval, dummy; - rio rdb; - - if (rdb_channel) { - rioInitWithConnset(&rdb, conns, numconns); - } else { - rioInitWithFd(&rdb,rdb_pipe_write); - /* Close the reading part, so that if the parent crashes, the child - * will get a write error and exit. */ - close(server.rdb_pipe_read); - } - - redisSetProcTitle("redis-rdb-to-slaves"); - redisSetCpuAffinity(server.bgsave_cpulist); - - /* Disable RDB compression if requested. */ - if (req & SLAVE_REQ_RDB_NO_COMPRESS) - server.rdb_compression = 0; - - if (req & SLAVE_REQ_SLOTS_SNAPSHOT) { - /* Slots snapshot is required */ - retval = slotSnapshotSaveRio(req, &rdb, NULL); - } else { - retval = rdbSaveRioWithEOFMark(req,&rdb,NULL,rsi); - } - - if (retval == C_OK && rioFlush(&rdb) == 0) - retval = C_ERR; - - if (retval == C_OK) { - sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB"); - } - - if (rdb_channel) { - rioFreeConnset(&rdb); - } else { - rioFreeFd(&rdb); - /* wake up the reader, tell it we're done. */ - close(rdb_pipe_write); - close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */ - /* hold exit until the parent tells us it's safe. we're not expecting - * to read anything, just get the error when the pipe is closed. */ - dummy = read(safe_to_exit_pipe, pipefds, 1); - UNUSED(dummy); - } - zfree(conns); - exitFromChild((retval == C_OK) ? 0 : 1, 0); - } else { - /* Parent */ - if (childpid == -1) { - serverLog(LL_WARNING,"Can't save in background: fork: %s", - strerror(errno)); - - /* Undo the state change. The caller will perform cleanup on - * all the slaves in BGSAVE_START state, but an early call to - * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */ - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { - slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START; - } - } - - if (!rdb_channel) { - close(rdb_pipe_write); - close(server.rdb_pipe_read); - close(server.rdb_child_exit_pipe); - zfree(server.rdb_pipe_conns); - server.rdb_pipe_conns = NULL; - server.rdb_pipe_numconns = 0; - server.rdb_pipe_numconns_writing = 0; - } - } else { - serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid, - rdb_channel ? (slots_req ? "slot migration destination socket" : "replica socket") : - "parent process pipe"); - server.rdb_save_time_start = time(NULL); - server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; - if (!rdb_channel) { - close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ - if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { - serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); - } - } - } - if (rdb_channel) - zfree(conns); - else - close(safe_to_exit_pipe); - - return (childpid == -1) ? C_ERR : C_OK; - } - return C_OK; /* Unreached. */ -} - -void saveCommand(client *c) { - if (server.child_type == CHILD_TYPE_RDB) { - addReplyError(c,"Background save already in progress"); - return; - } - - server.stat_rdb_saves++; - - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK) { - addReply(c,shared.ok); - } else { - addReplyErrorObject(c,shared.err); - } -} - -/* BGSAVE [SCHEDULE] */ -void bgsaveCommand(client *c) { - int schedule = 0; - - /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite - * is in progress. Instead of returning an error a BGSAVE gets scheduled. */ - if (c->argc > 1) { - if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) { - schedule = 1; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - - if (server.child_type == CHILD_TYPE_RDB) { - addReplyError(c,"Background save already in progress"); - } else if (hasActiveChildProcess() || server.in_exec) { - if (schedule || server.in_exec) { - server.rdb_bgsave_scheduled = 1; - addReplyStatus(c,"Background saving scheduled"); - } else { - addReplyError(c, - "Another child process is active (AOF?): can't BGSAVE right now. " - "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever " - "possible."); - } - } else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK) { - addReplyStatus(c,"Background saving started"); - } else { - addReplyErrorObject(c,shared.err); - } -} - -/* Populate the rdbSaveInfo structure used to persist the replication - * information inside the RDB file. Currently the structure explicitly - * contains just the currently selected DB from the master stream, however - * if the rdbSave*() family functions receive a NULL rsi structure also - * the Replication ID/offset is not saved. The function populates 'rsi' - * that is normally stack-allocated in the caller, returns the populated - * pointer if the instance has a valid master client, otherwise NULL - * is returned, and the RDB saving will not persist any replication related - * information. */ -rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { - rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT; - *rsi = rsi_init; - - /* If the instance is a master, we can populate the replication info - * only when repl_backlog is not NULL. If the repl_backlog is NULL, - * it means that the instance isn't in any replication chains. In this - * scenario the replication info is useless, because when a slave - * connects to us, the NULL repl_backlog will trigger a full - * synchronization, at the same time we will use a new replid and clear - * replid2. */ - if (!server.masterhost && server.repl_backlog) { - /* Note that when server.slaveseldb is -1, it means that this master - * didn't apply any write commands after a full synchronization. - * So we can let repl_stream_db be 0, this allows a restarted slave - * to reload replication ID/offset, it's safe because the next write - * command must generate a SELECT statement. */ - rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb; - return rsi; - } - - /* If the instance is a slave we need a connected master - * in order to fetch the currently selected DB. */ - if (server.master) { - rsi->repl_stream_db = server.master->db->id; - return rsi; - } - - /* If we have a cached master we can use it in order to populate the - * replication selected DB info inside the RDB file: the slave can - * increment the master_repl_offset only from data arriving from the - * master, so if we are disconnected the offset in the cached master - * is valid. */ - if (server.cached_master) { - rsi->repl_stream_db = server.cached_master->db->id; - return rsi; - } - return NULL; -} |
