summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/rio.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/rio.c')
-rw-r--r--examples/redis-unstable/src/rio.c640
1 files changed, 0 insertions, 640 deletions
diff --git a/examples/redis-unstable/src/rio.c b/examples/redis-unstable/src/rio.c
deleted file mode 100644
index 57750ff..0000000
--- a/examples/redis-unstable/src/rio.c
+++ /dev/null
@@ -1,640 +0,0 @@
-/*
- * Copyright (c) 2009-Present, Redis Ltd.
- * All rights reserved.
- *
- * Copyright (c) 2024-present, Valkey contributors.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- *
- * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
- */
-
-/* rio.c is a simple stream-oriented I/O abstraction that provides an interface
- * to write code that can consume/produce data using different concrete input
- * and output devices. For instance the same rdb.c code using the rio
- * abstraction can be used to read and write the RDB format using in-memory
- * buffers or files.
- *
- * A rio object provides the following methods:
- * read: read from stream.
- * write: write to stream.
- * tell: get the current offset.
- *
- * It is also possible to set a 'checksum' method that is used by rio.c in order
- * to compute a checksum of the data written or read, or to query the rio object
- * for the current checksum.
- *
- * ----------------------------------------------------------------------------
- */
-
-
-#include "fmacros.h"
-#include "fpconv_dtoa.h"
-#include <string.h>
-#include <stdio.h>
-#include <unistd.h>
-#include "rio.h"
-#include "util.h"
-#include "crc64.h"
-#include "config.h"
-#include "server.h"
-
-/* ------------------------- Buffer I/O implementation ----------------------- */
-
-/* Returns 1 or 0 for success/failure. */
-static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
- r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
- r->io.buffer.pos += len;
- return 1;
-}
-
-/* Returns 1 or 0 for success/failure. */
-static size_t rioBufferRead(rio *r, void *buf, size_t len) {
- if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
- return 0; /* not enough buffer to return len bytes. */
- memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
- r->io.buffer.pos += len;
- return 1;
-}
-
-/* Returns read/write position in buffer. */
-static off_t rioBufferTell(rio *r) {
- return r->io.buffer.pos;
-}
-
-/* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
-static int rioBufferFlush(rio *r) {
- UNUSED(r);
- return 1; /* Nothing to do, our write just appends to the buffer. */
-}
-
-static const rio rioBufferIO = {
- rioBufferRead,
- rioBufferWrite,
- rioBufferTell,
- rioBufferFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* flags */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
-};
-
-void rioInitWithBuffer(rio *r, sds s) {
- *r = rioBufferIO;
- r->io.buffer.ptr = s;
- r->io.buffer.pos = 0;
-}
-
-/* --------------------- Stdio file pointer implementation ------------------- */
-
-/* Returns 1 or 0 for success/failure. */
-static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
- if (!r->io.file.autosync) return fwrite(buf,len,1,r->io.file.fp);
-
- size_t nwritten = 0;
- /* Incrementally write data to the file, avoid a single write larger than
- * the autosync threshold (so that the kernel's buffer cache never has too
- * many dirty pages at once). */
- while (len != nwritten) {
- serverAssert(r->io.file.autosync > r->io.file.buffered);
- size_t nalign = (size_t)(r->io.file.autosync - r->io.file.buffered);
- size_t towrite = nalign > len-nwritten ? len-nwritten : nalign;
-
- if (fwrite((char*)buf+nwritten,towrite,1,r->io.file.fp) == 0) return 0;
- nwritten += towrite;
- r->io.file.buffered += towrite;
-
- if (r->io.file.buffered >= r->io.file.autosync) {
- fflush(r->io.file.fp);
-
- size_t processed = r->processed_bytes + nwritten;
- serverAssert(processed % r->io.file.autosync == 0);
- serverAssert(r->io.file.buffered == r->io.file.autosync);
-
-#if HAVE_SYNC_FILE_RANGE
- /* Start writeout asynchronously. */
- if (sync_file_range(fileno(r->io.file.fp),
- processed - r->io.file.autosync, r->io.file.autosync,
- SYNC_FILE_RANGE_WRITE) == -1)
- return 0;
-
- if (processed >= (size_t)r->io.file.autosync * 2) {
- /* To keep the promise to 'autosync', we should make sure last
- * asynchronous writeout persists into disk. This call may block
- * if last writeout is not finished since disk is slow. */
- if (sync_file_range(fileno(r->io.file.fp),
- processed - r->io.file.autosync*2,
- r->io.file.autosync, SYNC_FILE_RANGE_WAIT_BEFORE|
- SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER) == -1)
- return 0;
- }
-#else
- if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0;
-#endif
- if (r->io.file.reclaim_cache) {
- /* In Linux sync_file_range just issue a writeback request to
- * OS, and when posix_fadvise is called, the dirty page may
- * still be in flushing, which means it would be ignored by
- * posix_fadvise.
- *
- * So we posix_fadvise the whole file, and the writeback-ed
- * pages will have other chances to be reclaimed. */
- reclaimFilePageCache(fileno(r->io.file.fp), 0, 0);
- }
- r->io.file.buffered = 0;
- }
- }
- return 1;
-}
-
-/* Returns 1 or 0 for success/failure. */
-static size_t rioFileRead(rio *r, void *buf, size_t len) {
- return fread(buf,len,1,r->io.file.fp);
-}
-
-/* Returns read/write position in file. */
-static off_t rioFileTell(rio *r) {
- return ftello(r->io.file.fp);
-}
-
-/* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
-static int rioFileFlush(rio *r) {
- return (fflush(r->io.file.fp) == 0) ? 1 : 0;
-}
-
-static const rio rioFileIO = {
- rioFileRead,
- rioFileWrite,
- rioFileTell,
- rioFileFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* flags */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
-};
-
-void rioInitWithFile(rio *r, FILE *fp) {
- *r = rioFileIO;
- r->io.file.fp = fp;
- r->io.file.buffered = 0;
- r->io.file.autosync = 0;
- r->io.file.reclaim_cache = 0;
-}
-
-/* ------------------- Connection implementation -------------------
- * We use this RIO implementation when reading an RDB file directly from
- * the connection to the memory via rdbLoadRio(), thus this implementation
- * only implements reading from a connection that is, normally,
- * just a socket. */
-
-static size_t rioConnWrite(rio *r, const void *buf, size_t len) {
- UNUSED(r);
- UNUSED(buf);
- UNUSED(len);
- return 0; /* Error, this target does not yet support writing. */
-}
-
-/* Returns 1 or 0 for success/failure. */
-static size_t rioConnRead(rio *r, void *buf, size_t len) {
- size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos;
-
- /* If the buffer is too small for the entire request: realloc. */
- if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
- r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
-
- /* If the remaining unused buffer is not large enough: memmove so that we
- * can read the rest. */
- if (len > avail && sdsavail(r->io.conn.buf) < len - avail) {
- sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
- r->io.conn.pos = 0;
- }
-
- /* Make sure the caller didn't request to read past the limit.
- * If they didn't we'll buffer till the limit, if they did, we'll
- * return an error. */
- if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) {
- errno = EOVERFLOW;
- return 0;
- }
-
- /* If we don't already have all the data in the sds, read more */
- while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) {
- size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos;
- size_t needs = len - buffered;
- /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
- * the two. */
- size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs;
- if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);
- if (r->io.conn.read_limit != 0 &&
- r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit)
- {
- toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered;
- }
- int retval = connRead(r->io.conn.conn,
- (char*)r->io.conn.buf + sdslen(r->io.conn.buf),
- toread);
- if (retval == 0) {
- return 0;
- } else if (retval < 0) {
- if (connLastErrorRetryable(r->io.conn.conn)) continue;
- if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
- return 0;
- }
- sdsIncrLen(r->io.conn.buf, retval);
- }
-
- memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
- r->io.conn.read_so_far += len;
- r->io.conn.pos += len;
- return len;
-}
-
-/* Returns read/write position in file. */
-static off_t rioConnTell(rio *r) {
- return r->io.conn.read_so_far;
-}
-
-/* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
-static int rioConnFlush(rio *r) {
- /* Our flush is implemented by the write method, that recognizes a
- * buffer set to NULL with a count of zero as a flush request. */
- return rioConnWrite(r,NULL,0);
-}
-
-static const rio rioConnIO = {
- rioConnRead,
- rioConnWrite,
- rioConnTell,
- rioConnFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* flags */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
-};
-
-/* Create an RIO that implements a buffered read from an fd
- * read_limit argument stops buffering when the reaching the limit. */
-void rioInitWithConn(rio *r, connection *conn, size_t read_limit) {
- *r = rioConnIO;
- r->io.conn.conn = conn;
- r->io.conn.pos = 0;
- r->io.conn.read_limit = read_limit;
- r->io.conn.read_so_far = 0;
- r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
- sdsclear(r->io.conn.buf);
-}
-
-/* Release the RIO stream. Optionally returns the unread buffered data
- * when the SDS pointer 'remaining' is passed. */
-void rioFreeConn(rio *r, sds *remaining) {
- if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) {
- if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
- *remaining = r->io.conn.buf;
- } else {
- sdsfree(r->io.conn.buf);
- if (remaining) *remaining = NULL;
- }
- r->io.conn.buf = NULL;
-}
-
-/* ------------------- File descriptor implementation ------------------
- * This target is used to write the RDB file to pipe, when the master just
- * streams the data to the replicas without creating an RDB on-disk image
- * (diskless replication option).
- * It only implements writes. */
-
-/* Returns 1 or 0 for success/failure.
- *
- * When buf is NULL and len is 0, the function performs a flush operation
- * if there is some pending buffer, so this function is also used in order
- * to implement rioFdFlush(). */
-static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
- ssize_t retval;
- unsigned char *p = (unsigned char*) buf;
- int doflush = (buf == NULL && len == 0);
-
- /* For small writes, we rather keep the data in user-space buffer, and flush
- * it only when it grows. however for larger writes, we prefer to flush
- * any pre-existing buffer, and write the new one directly without reallocs
- * and memory copying. */
- if (len > PROTO_IOBUF_LEN) {
- /* First, flush any pre-existing buffered data. */
- if (sdslen(r->io.fd.buf)) {
- if (rioFdWrite(r, NULL, 0) == 0)
- return 0;
- }
- /* Write the new data, keeping 'p' and 'len' from the input. */
- } else {
- if (len) {
- r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
- if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
- doflush = 1;
- if (!doflush)
- return 1;
- }
- /* Flushing the buffered data. set 'p' and 'len' accordingly. */
- p = (unsigned char*) r->io.fd.buf;
- len = sdslen(r->io.fd.buf);
- }
-
- size_t nwritten = 0;
- while(nwritten != len) {
- retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
- if (retval <= 0) {
- if (retval == -1 && errno == EINTR) continue;
- /* With blocking io, which is the sole user of this
- * rio target, EWOULDBLOCK is returned only because of
- * the SO_SNDTIMEO socket option, so we translate the error
- * into one more recognizable by the user. */
- if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
- return 0; /* error. */
- }
- nwritten += retval;
- }
-
- r->io.fd.pos += len;
- sdsclear(r->io.fd.buf);
- return 1;
-}
-
-/* Returns 1 or 0 for success/failure. */
-static size_t rioFdRead(rio *r, void *buf, size_t len) {
- UNUSED(r);
- UNUSED(buf);
- UNUSED(len);
- return 0; /* Error, this target does not support reading. */
-}
-
-/* Returns read/write position in file. */
-static off_t rioFdTell(rio *r) {
- return r->io.fd.pos;
-}
-
-/* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
-static int rioFdFlush(rio *r) {
- /* Our flush is implemented by the write method, that recognizes a
- * buffer set to NULL with a count of zero as a flush request. */
- return rioFdWrite(r,NULL,0);
-}
-
-static const rio rioFdIO = {
- rioFdRead,
- rioFdWrite,
- rioFdTell,
- rioFdFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* flags */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
-};
-
-void rioInitWithFd(rio *r, int fd) {
- *r = rioFdIO;
- r->io.fd.fd = fd;
- r->io.fd.pos = 0;
- r->io.fd.buf = sdsempty();
-}
-
-/* release the rio stream. */
-void rioFreeFd(rio *r) {
- sdsfree(r->io.fd.buf);
-}
-
-/* ------------------- Connection set implementation ------------------
- * This target is used to write the RDB file to a set of replica connections as
- * part of rdb channel replication. */
-
-/* Returns 1 for success, 0 for failure.
- * The function returns success as long as we are able to correctly write
- * to at least one file descriptor.
- *
- * When buf is NULL or len is 0, the function performs a flush operation if
- * there is some pending buffer, so this function is also used in order to
- * implement rioConnsetFlush(). */
-static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) {
- const size_t pre_flush_size = 256 * 1024;
- unsigned char *p = (unsigned char*) buf;
- size_t buflen = len;
- size_t failed = 0; /* number of connections that write() returned error. */
-
- /* For small writes, we rather keep the data in user-space buffer, and flush
- * it only when it grows. however for larger writes, we prefer to flush
- * any pre-existing buffer, and write the new one directly without reallocs
- * and memory copying. */
- if (len > pre_flush_size) {
- rioConnsetWrite(r, NULL, 0);
- } else {
- if (buf && len) {
- r->io.connset.buf = sdscatlen(r->io.connset.buf, buf, len);
- if (sdslen(r->io.connset.buf) <= PROTO_IOBUF_LEN)
- return 1;
- }
-
- p = (unsigned char *)r->io.connset.buf;
- buflen = sdslen(r->io.connset.buf);
- }
-
- while (buflen > 0) {
- /* Write in little chunks so that when there are big writes we
- * parallelize while the kernel is sending data in background to the
- * TCP socket. */
- size_t limit = PROTO_IOBUF_LEN * 2;
- size_t count = buflen < limit ? buflen : limit;
-
- for (size_t i = 0; i < r->io.connset.n_dst; i++) {
- size_t n_written = 0;
-
- if (r->io.connset.dst[i].failed != 0) {
- failed++;
- continue; /* Skip failed connections. */
- }
-
- do {
- ssize_t ret;
- connection *c = r->io.connset.dst[i].conn;
-
- ret = connWrite(c, p + n_written, count - n_written);
- if (ret <= 0) {
- if (errno == 0)
- errno = EIO;
- /* With blocking sockets, which is the sole user of this
- * rio target, EWOULDBLOCK is returned only because of
- * the SO_SNDTIMEO socket option, so we translate the error
- * into one more recognizable by the user. */
- if (ret == -1 && errno == EWOULDBLOCK)
- errno = ETIMEDOUT;
-
- r->io.connset.dst[i].failed = 1;
- failed++;
- break;
- }
- n_written += ret;
- } while (n_written != count);
- }
- if (failed == r->io.connset.n_dst)
- return 0; /* All the connections have failed. */
-
- p += count;
- buflen -= count;
- r->io.connset.pos += count;
- }
-
- sdsclear(r->io.connset.buf);
- return 1;
-}
-
-/* Returns 1 or 0 for success/failure. */
-static size_t rioConnsetRead(rio *r, void *buf, size_t len) {
- UNUSED(r);
- UNUSED(buf);
- UNUSED(len);
- return 0; /* Error, this target does not support reading. */
-}
-
-/* Returns the number of sent bytes. */
-static off_t rioConnsetTell(rio *r) {
- return r->io.connset.pos;
-}
-
-/* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
-static int rioConnsetFlush(rio *r) {
- /* Our flush is implemented by the write method, that recognizes a
- * buffer set to NULL with a count of zero as a flush request. */
- return rioConnsetWrite(r, NULL, 0);
-}
-
-static const rio rioConnsetIO = {
- rioConnsetRead,
- rioConnsetWrite,
- rioConnsetTell,
- rioConnsetFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* flags */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
-};
-
-void rioInitWithConnset(rio *r, connection **conns, size_t n_conns) {
- *r = rioConnsetIO;
- r->io.connset.dst = zcalloc(sizeof(*r->io.connset.dst) * n_conns);
- r->io.connset.n_dst = n_conns;
- r->io.connset.pos = 0;
- r->io.connset.buf = sdsempty();
-
- for (size_t i = 0; i < n_conns; i++)
- r->io.connset.dst[i].conn = conns[i];
-}
-
-/* release the rio stream. */
-void rioFreeConnset(rio *r) {
- zfree(r->io.connset.dst);
- sdsfree(r->io.connset.buf);
-}
-
-/* ---------------------------- Generic functions ---------------------------- */
-
-/* This function can be installed both in memory and file streams when checksum
- * computation is needed. */
-void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
- r->cksum = crc64(r->cksum,buf,len);
-}
-
-/* Set the file-based rio object to auto-fsync every 'bytes' file written.
- * By default this is set to zero that means no automatic file sync is
- * performed.
- *
- * This feature is useful in a few contexts since when we rely on OS write
- * buffers sometimes the OS buffers way too much, resulting in too many
- * disk I/O concentrated in very little time. When we fsync in an explicit
- * way instead the I/O pressure is more distributed across time. */
-void rioSetAutoSync(rio *r, off_t bytes) {
- if(r->write != rioFileIO.write) return;
- r->io.file.autosync = bytes;
-}
-
-/* Set the file-based rio object to reclaim cache after every auto-sync.
- * In the Linux implementation POSIX_FADV_DONTNEED skips the dirty
- * pages, so if auto sync is unset this option will have no effect.
- *
- * This feature can reduce the cache footprint backed by the file. */
-void rioSetReclaimCache(rio *r, int enabled) {
- r->io.file.reclaim_cache = enabled;
-}
-
-/* Check the type of rio. */
-uint8_t rioCheckType(rio *r) {
- if (r->read == rioFileRead) {
- return RIO_TYPE_FILE;
- } else if (r->read == rioBufferRead) {
- return RIO_TYPE_BUFFER;
- } else if (r->read == rioConnRead) {
- return RIO_TYPE_CONN;
- } else {
- /* r->read == rioFdRead */
- return RIO_TYPE_FD;
- }
-}
-
-/* --------------------------- Higher level interface --------------------------
- *
- * The following higher level functions use lower level rio.c functions to help
- * generating the Redis protocol for the Append Only File. */
-
-/* Write multi bulk count in the format: "*<count>\r\n". */
-size_t rioWriteBulkCount(rio *r, char prefix, long count) {
- char cbuf[128];
- int clen;
-
- cbuf[0] = prefix;
- clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
- cbuf[clen++] = '\r';
- cbuf[clen++] = '\n';
- if (rioWrite(r,cbuf,clen) == 0) return 0;
- return clen;
-}
-
-/* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */
-size_t rioWriteBulkString(rio *r, const char *buf, size_t len) {
- size_t nwritten;
-
- if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0;
- if (len > 0 && rioWrite(r,buf,len) == 0) return 0;
- if (rioWrite(r,"\r\n",2) == 0) return 0;
- return nwritten+len+2;
-}
-
-/* Write a long long value in format: "$<count>\r\n<payload>\r\n". */
-size_t rioWriteBulkLongLong(rio *r, long long l) {
- char lbuf[32];
- unsigned int llen;
-
- llen = ll2string(lbuf,sizeof(lbuf),l);
- return rioWriteBulkString(r,lbuf,llen);
-}
-
-/* Write a double value in the format: "$<count>\r\n<payload>\r\n" */
-size_t rioWriteBulkDouble(rio *r, double d) {
- char dbuf[128];
- unsigned int dlen;
- dlen = fpconv_dtoa(d, dbuf);
- dbuf[dlen] = '\0';
- return rioWriteBulkString(r,dbuf,dlen);
-}