/* * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * * Copyright (c) 2024-present, Valkey contributors. * All rights reserved. * * Licensed under your choice of (a) the Redis Source Available License 2.0 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * GNU Affero General Public License v3 (AGPLv3). * * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ /* rio.c is a simple stream-oriented I/O abstraction that provides an interface * to write code that can consume/produce data using different concrete input * and output devices. For instance the same rdb.c code using the rio * abstraction can be used to read and write the RDB format using in-memory * buffers or files. * * A rio object provides the following methods: * read: read from stream. * write: write to stream. * tell: get the current offset. * * It is also possible to set a 'checksum' method that is used by rio.c in order * to compute a checksum of the data written or read, or to query the rio object * for the current checksum. * * ---------------------------------------------------------------------------- */ #include "fmacros.h" #include "fpconv_dtoa.h" #include #include #include #include "rio.h" #include "util.h" #include "crc64.h" #include "config.h" #include "server.h" /* ------------------------- Buffer I/O implementation ----------------------- */ /* Returns 1 or 0 for success/failure. */ static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len); r->io.buffer.pos += len; return 1; } /* Returns 1 or 0 for success/failure. */ static size_t rioBufferRead(rio *r, void *buf, size_t len) { if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) return 0; /* not enough buffer to return len bytes. */ memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); r->io.buffer.pos += len; return 1; } /* Returns read/write position in buffer. */ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ static int rioBufferFlush(rio *r) { UNUSED(r); return 1; /* Nothing to do, our write just appends to the buffer. */ } static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, rioBufferFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; void rioInitWithBuffer(rio *r, sds s) { *r = rioBufferIO; r->io.buffer.ptr = s; r->io.buffer.pos = 0; } /* --------------------- Stdio file pointer implementation ------------------- */ /* Returns 1 or 0 for success/failure. */ static size_t rioFileWrite(rio *r, const void *buf, size_t len) { if (!r->io.file.autosync) return fwrite(buf,len,1,r->io.file.fp); size_t nwritten = 0; /* Incrementally write data to the file, avoid a single write larger than * the autosync threshold (so that the kernel's buffer cache never has too * many dirty pages at once). */ while (len != nwritten) { serverAssert(r->io.file.autosync > r->io.file.buffered); size_t nalign = (size_t)(r->io.file.autosync - r->io.file.buffered); size_t towrite = nalign > len-nwritten ? len-nwritten : nalign; if (fwrite((char*)buf+nwritten,towrite,1,r->io.file.fp) == 0) return 0; nwritten += towrite; r->io.file.buffered += towrite; if (r->io.file.buffered >= r->io.file.autosync) { fflush(r->io.file.fp); size_t processed = r->processed_bytes + nwritten; serverAssert(processed % r->io.file.autosync == 0); serverAssert(r->io.file.buffered == r->io.file.autosync); #if HAVE_SYNC_FILE_RANGE /* Start writeout asynchronously. */ if (sync_file_range(fileno(r->io.file.fp), processed - r->io.file.autosync, r->io.file.autosync, SYNC_FILE_RANGE_WRITE) == -1) return 0; if (processed >= (size_t)r->io.file.autosync * 2) { /* To keep the promise to 'autosync', we should make sure last * asynchronous writeout persists into disk. This call may block * if last writeout is not finished since disk is slow. */ if (sync_file_range(fileno(r->io.file.fp), processed - r->io.file.autosync*2, r->io.file.autosync, SYNC_FILE_RANGE_WAIT_BEFORE| SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER) == -1) return 0; } #else if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0; #endif if (r->io.file.reclaim_cache) { /* In Linux sync_file_range just issue a writeback request to * OS, and when posix_fadvise is called, the dirty page may * still be in flushing, which means it would be ignored by * posix_fadvise. * * So we posix_fadvise the whole file, and the writeback-ed * pages will have other chances to be reclaimed. */ reclaimFilePageCache(fileno(r->io.file.fp), 0, 0); } r->io.file.buffered = 0; } } return 1; } /* Returns 1 or 0 for success/failure. */ static size_t rioFileRead(rio *r, void *buf, size_t len) { return fread(buf,len,1,r->io.file.fp); } /* Returns read/write position in file. */ static off_t rioFileTell(rio *r) { return ftello(r->io.file.fp); } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ static int rioFileFlush(rio *r) { return (fflush(r->io.file.fp) == 0) ? 1 : 0; } static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; void rioInitWithFile(rio *r, FILE *fp) { *r = rioFileIO; r->io.file.fp = fp; r->io.file.buffered = 0; r->io.file.autosync = 0; r->io.file.reclaim_cache = 0; } /* ------------------- Connection implementation ------------------- * We use this RIO implementation when reading an RDB file directly from * the connection to the memory via rdbLoadRio(), thus this implementation * only implements reading from a connection that is, normally, * just a socket. */ static size_t rioConnWrite(rio *r, const void *buf, size_t len) { UNUSED(r); UNUSED(buf); UNUSED(len); return 0; /* Error, this target does not yet support writing. */ } /* Returns 1 or 0 for success/failure. */ static size_t rioConnRead(rio *r, void *buf, size_t len) { size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos; /* If the buffer is too small for the entire request: realloc. */ if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len) r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf)); /* If the remaining unused buffer is not large enough: memmove so that we * can read the rest. */ if (len > avail && sdsavail(r->io.conn.buf) < len - avail) { sdsrange(r->io.conn.buf, r->io.conn.pos, -1); r->io.conn.pos = 0; } /* Make sure the caller didn't request to read past the limit. * If they didn't we'll buffer till the limit, if they did, we'll * return an error. */ if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) { errno = EOVERFLOW; return 0; } /* If we don't already have all the data in the sds, read more */ while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) { size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos; size_t needs = len - buffered; /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of * the two. */ size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs; if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf); if (r->io.conn.read_limit != 0 && r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit) { toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered; } int retval = connRead(r->io.conn.conn, (char*)r->io.conn.buf + sdslen(r->io.conn.buf), toread); if (retval == 0) { return 0; } else if (retval < 0) { if (connLastErrorRetryable(r->io.conn.conn)) continue; if (errno == EWOULDBLOCK) errno = ETIMEDOUT; return 0; } sdsIncrLen(r->io.conn.buf, retval); } memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len); r->io.conn.read_so_far += len; r->io.conn.pos += len; return len; } /* Returns read/write position in file. */ static off_t rioConnTell(rio *r) { return r->io.conn.read_so_far; } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ static int rioConnFlush(rio *r) { /* Our flush is implemented by the write method, that recognizes a * buffer set to NULL with a count of zero as a flush request. */ return rioConnWrite(r,NULL,0); } static const rio rioConnIO = { rioConnRead, rioConnWrite, rioConnTell, rioConnFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; /* Create an RIO that implements a buffered read from an fd * read_limit argument stops buffering when the reaching the limit. */ void rioInitWithConn(rio *r, connection *conn, size_t read_limit) { *r = rioConnIO; r->io.conn.conn = conn; r->io.conn.pos = 0; r->io.conn.read_limit = read_limit; r->io.conn.read_so_far = 0; r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); sdsclear(r->io.conn.buf); } /* Release the RIO stream. Optionally returns the unread buffered data * when the SDS pointer 'remaining' is passed. */ void rioFreeConn(rio *r, sds *remaining) { if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) { if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1); *remaining = r->io.conn.buf; } else { sdsfree(r->io.conn.buf); if (remaining) *remaining = NULL; } r->io.conn.buf = NULL; } /* ------------------- File descriptor implementation ------------------ * This target is used to write the RDB file to pipe, when the master just * streams the data to the replicas without creating an RDB on-disk image * (diskless replication option). * It only implements writes. */ /* Returns 1 or 0 for success/failure. * * When buf is NULL and len is 0, the function performs a flush operation * if there is some pending buffer, so this function is also used in order * to implement rioFdFlush(). */ static size_t rioFdWrite(rio *r, const void *buf, size_t len) { ssize_t retval; unsigned char *p = (unsigned char*) buf; int doflush = (buf == NULL && len == 0); /* For small writes, we rather keep the data in user-space buffer, and flush * it only when it grows. however for larger writes, we prefer to flush * any pre-existing buffer, and write the new one directly without reallocs * and memory copying. */ if (len > PROTO_IOBUF_LEN) { /* First, flush any pre-existing buffered data. */ if (sdslen(r->io.fd.buf)) { if (rioFdWrite(r, NULL, 0) == 0) return 0; } /* Write the new data, keeping 'p' and 'len' from the input. */ } else { if (len) { r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len); if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) doflush = 1; if (!doflush) return 1; } /* Flushing the buffered data. set 'p' and 'len' accordingly. */ p = (unsigned char*) r->io.fd.buf; len = sdslen(r->io.fd.buf); } size_t nwritten = 0; while(nwritten != len) { retval = write(r->io.fd.fd,p+nwritten,len-nwritten); if (retval <= 0) { if (retval == -1 && errno == EINTR) continue; /* With blocking io, which is the sole user of this * rio target, EWOULDBLOCK is returned only because of * the SO_SNDTIMEO socket option, so we translate the error * into one more recognizable by the user. */ if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; return 0; /* error. */ } nwritten += retval; } r->io.fd.pos += len; sdsclear(r->io.fd.buf); return 1; } /* Returns 1 or 0 for success/failure. */ static size_t rioFdRead(rio *r, void *buf, size_t len) { UNUSED(r); UNUSED(buf); UNUSED(len); return 0; /* Error, this target does not support reading. */ } /* Returns read/write position in file. */ static off_t rioFdTell(rio *r) { return r->io.fd.pos; } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ static int rioFdFlush(rio *r) { /* Our flush is implemented by the write method, that recognizes a * buffer set to NULL with a count of zero as a flush request. */ return rioFdWrite(r,NULL,0); } static const rio rioFdIO = { rioFdRead, rioFdWrite, rioFdTell, rioFdFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; void rioInitWithFd(rio *r, int fd) { *r = rioFdIO; r->io.fd.fd = fd; r->io.fd.pos = 0; r->io.fd.buf = sdsempty(); } /* release the rio stream. */ void rioFreeFd(rio *r) { sdsfree(r->io.fd.buf); } /* ------------------- Connection set implementation ------------------ * This target is used to write the RDB file to a set of replica connections as * part of rdb channel replication. */ /* Returns 1 for success, 0 for failure. * The function returns success as long as we are able to correctly write * to at least one file descriptor. * * When buf is NULL or len is 0, the function performs a flush operation if * there is some pending buffer, so this function is also used in order to * implement rioConnsetFlush(). */ static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) { const size_t pre_flush_size = 256 * 1024; unsigned char *p = (unsigned char*) buf; size_t buflen = len; size_t failed = 0; /* number of connections that write() returned error. */ /* For small writes, we rather keep the data in user-space buffer, and flush * it only when it grows. however for larger writes, we prefer to flush * any pre-existing buffer, and write the new one directly without reallocs * and memory copying. */ if (len > pre_flush_size) { rioConnsetWrite(r, NULL, 0); } else { if (buf && len) { r->io.connset.buf = sdscatlen(r->io.connset.buf, buf, len); if (sdslen(r->io.connset.buf) <= PROTO_IOBUF_LEN) return 1; } p = (unsigned char *)r->io.connset.buf; buflen = sdslen(r->io.connset.buf); } while (buflen > 0) { /* Write in little chunks so that when there are big writes we * parallelize while the kernel is sending data in background to the * TCP socket. */ size_t limit = PROTO_IOBUF_LEN * 2; size_t count = buflen < limit ? buflen : limit; for (size_t i = 0; i < r->io.connset.n_dst; i++) { size_t n_written = 0; if (r->io.connset.dst[i].failed != 0) { failed++; continue; /* Skip failed connections. */ } do { ssize_t ret; connection *c = r->io.connset.dst[i].conn; ret = connWrite(c, p + n_written, count - n_written); if (ret <= 0) { if (errno == 0) errno = EIO; /* With blocking sockets, which is the sole user of this * rio target, EWOULDBLOCK is returned only because of * the SO_SNDTIMEO socket option, so we translate the error * into one more recognizable by the user. */ if (ret == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; r->io.connset.dst[i].failed = 1; failed++; break; } n_written += ret; } while (n_written != count); } if (failed == r->io.connset.n_dst) return 0; /* All the connections have failed. */ p += count; buflen -= count; r->io.connset.pos += count; } sdsclear(r->io.connset.buf); return 1; } /* Returns 1 or 0 for success/failure. */ static size_t rioConnsetRead(rio *r, void *buf, size_t len) { UNUSED(r); UNUSED(buf); UNUSED(len); return 0; /* Error, this target does not support reading. */ } /* Returns the number of sent bytes. */ static off_t rioConnsetTell(rio *r) { return r->io.connset.pos; } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ static int rioConnsetFlush(rio *r) { /* Our flush is implemented by the write method, that recognizes a * buffer set to NULL with a count of zero as a flush request. */ return rioConnsetWrite(r, NULL, 0); } static const rio rioConnsetIO = { rioConnsetRead, rioConnsetWrite, rioConnsetTell, rioConnsetFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; void rioInitWithConnset(rio *r, connection **conns, size_t n_conns) { *r = rioConnsetIO; r->io.connset.dst = zcalloc(sizeof(*r->io.connset.dst) * n_conns); r->io.connset.n_dst = n_conns; r->io.connset.pos = 0; r->io.connset.buf = sdsempty(); for (size_t i = 0; i < n_conns; i++) r->io.connset.dst[i].conn = conns[i]; } /* release the rio stream. */ void rioFreeConnset(rio *r) { zfree(r->io.connset.dst); sdsfree(r->io.connset.buf); } /* ---------------------------- Generic functions ---------------------------- */ /* This function can be installed both in memory and file streams when checksum * computation is needed. */ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { r->cksum = crc64(r->cksum,buf,len); } /* Set the file-based rio object to auto-fsync every 'bytes' file written. * By default this is set to zero that means no automatic file sync is * performed. * * This feature is useful in a few contexts since when we rely on OS write * buffers sometimes the OS buffers way too much, resulting in too many * disk I/O concentrated in very little time. When we fsync in an explicit * way instead the I/O pressure is more distributed across time. */ void rioSetAutoSync(rio *r, off_t bytes) { if(r->write != rioFileIO.write) return; r->io.file.autosync = bytes; } /* Set the file-based rio object to reclaim cache after every auto-sync. * In the Linux implementation POSIX_FADV_DONTNEED skips the dirty * pages, so if auto sync is unset this option will have no effect. * * This feature can reduce the cache footprint backed by the file. */ void rioSetReclaimCache(rio *r, int enabled) { r->io.file.reclaim_cache = enabled; } /* Check the type of rio. */ uint8_t rioCheckType(rio *r) { if (r->read == rioFileRead) { return RIO_TYPE_FILE; } else if (r->read == rioBufferRead) { return RIO_TYPE_BUFFER; } else if (r->read == rioConnRead) { return RIO_TYPE_CONN; } else { /* r->read == rioFdRead */ return RIO_TYPE_FD; } } /* --------------------------- Higher level interface -------------------------- * * The following higher level functions use lower level rio.c functions to help * generating the Redis protocol for the Append Only File. */ /* Write multi bulk count in the format: "*\r\n". */ size_t rioWriteBulkCount(rio *r, char prefix, long count) { char cbuf[128]; int clen; cbuf[0] = prefix; clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count); cbuf[clen++] = '\r'; cbuf[clen++] = '\n'; if (rioWrite(r,cbuf,clen) == 0) return 0; return clen; } /* Write binary-safe string in the format: "$\r\n\r\n". */ size_t rioWriteBulkString(rio *r, const char *buf, size_t len) { size_t nwritten; if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0; if (len > 0 && rioWrite(r,buf,len) == 0) return 0; if (rioWrite(r,"\r\n",2) == 0) return 0; return nwritten+len+2; } /* Write a long long value in format: "$\r\n\r\n". */ size_t rioWriteBulkLongLong(rio *r, long long l) { char lbuf[32]; unsigned int llen; llen = ll2string(lbuf,sizeof(lbuf),l); return rioWriteBulkString(r,lbuf,llen); } /* Write a double value in the format: "$\r\n\r\n" */ size_t rioWriteBulkDouble(rio *r, double d) { char dbuf[128]; unsigned int dlen; dlen = fpconv_dtoa(d, dbuf); dbuf[dlen] = '\0'; return rioWriteBulkString(r,dbuf,dlen); }