diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/rio.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/rio.c')
| -rw-r--r-- | examples/redis-unstable/src/rio.c | 640 |
1 files changed, 0 insertions, 640 deletions
diff --git a/examples/redis-unstable/src/rio.c b/examples/redis-unstable/src/rio.c deleted file mode 100644 index 57750ff..0000000 --- a/examples/redis-unstable/src/rio.c +++ /dev/null @@ -1,640 +0,0 @@ -/* - * Copyright (c) 2009-Present, Redis Ltd. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. - */ - -/* rio.c is a simple stream-oriented I/O abstraction that provides an interface - * to write code that can consume/produce data using different concrete input - * and output devices. For instance the same rdb.c code using the rio - * abstraction can be used to read and write the RDB format using in-memory - * buffers or files. - * - * A rio object provides the following methods: - * read: read from stream. - * write: write to stream. - * tell: get the current offset. - * - * It is also possible to set a 'checksum' method that is used by rio.c in order - * to compute a checksum of the data written or read, or to query the rio object - * for the current checksum. - * - * ---------------------------------------------------------------------------- - */ - - -#include "fmacros.h" -#include "fpconv_dtoa.h" -#include <string.h> -#include <stdio.h> -#include <unistd.h> -#include "rio.h" -#include "util.h" -#include "crc64.h" -#include "config.h" -#include "server.h" - -/* ------------------------- Buffer I/O implementation ----------------------- */ - -/* Returns 1 or 0 for success/failure. */ -static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { - r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len); - r->io.buffer.pos += len; - return 1; -} - -/* Returns 1 or 0 for success/failure. */ -static size_t rioBufferRead(rio *r, void *buf, size_t len) { - if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) - return 0; /* not enough buffer to return len bytes. */ - memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); - r->io.buffer.pos += len; - return 1; -} - -/* Returns read/write position in buffer. */ -static off_t rioBufferTell(rio *r) { - return r->io.buffer.pos; -} - -/* Flushes any buffer to target device if applicable. Returns 1 on success - * and 0 on failures. */ -static int rioBufferFlush(rio *r) { - UNUSED(r); - return 1; /* Nothing to do, our write just appends to the buffer. */ -} - -static const rio rioBufferIO = { - rioBufferRead, - rioBufferWrite, - rioBufferTell, - rioBufferFlush, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* flags */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; - -void rioInitWithBuffer(rio *r, sds s) { - *r = rioBufferIO; - r->io.buffer.ptr = s; - r->io.buffer.pos = 0; -} - -/* --------------------- Stdio file pointer implementation ------------------- */ - -/* Returns 1 or 0 for success/failure. */ -static size_t rioFileWrite(rio *r, const void *buf, size_t len) { - if (!r->io.file.autosync) return fwrite(buf,len,1,r->io.file.fp); - - size_t nwritten = 0; - /* Incrementally write data to the file, avoid a single write larger than - * the autosync threshold (so that the kernel's buffer cache never has too - * many dirty pages at once). */ - while (len != nwritten) { - serverAssert(r->io.file.autosync > r->io.file.buffered); - size_t nalign = (size_t)(r->io.file.autosync - r->io.file.buffered); - size_t towrite = nalign > len-nwritten ? len-nwritten : nalign; - - if (fwrite((char*)buf+nwritten,towrite,1,r->io.file.fp) == 0) return 0; - nwritten += towrite; - r->io.file.buffered += towrite; - - if (r->io.file.buffered >= r->io.file.autosync) { - fflush(r->io.file.fp); - - size_t processed = r->processed_bytes + nwritten; - serverAssert(processed % r->io.file.autosync == 0); - serverAssert(r->io.file.buffered == r->io.file.autosync); - -#if HAVE_SYNC_FILE_RANGE - /* Start writeout asynchronously. */ - if (sync_file_range(fileno(r->io.file.fp), - processed - r->io.file.autosync, r->io.file.autosync, - SYNC_FILE_RANGE_WRITE) == -1) - return 0; - - if (processed >= (size_t)r->io.file.autosync * 2) { - /* To keep the promise to 'autosync', we should make sure last - * asynchronous writeout persists into disk. This call may block - * if last writeout is not finished since disk is slow. */ - if (sync_file_range(fileno(r->io.file.fp), - processed - r->io.file.autosync*2, - r->io.file.autosync, SYNC_FILE_RANGE_WAIT_BEFORE| - SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER) == -1) - return 0; - } -#else - if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0; -#endif - if (r->io.file.reclaim_cache) { - /* In Linux sync_file_range just issue a writeback request to - * OS, and when posix_fadvise is called, the dirty page may - * still be in flushing, which means it would be ignored by - * posix_fadvise. - * - * So we posix_fadvise the whole file, and the writeback-ed - * pages will have other chances to be reclaimed. */ - reclaimFilePageCache(fileno(r->io.file.fp), 0, 0); - } - r->io.file.buffered = 0; - } - } - return 1; -} - -/* Returns 1 or 0 for success/failure. */ -static size_t rioFileRead(rio *r, void *buf, size_t len) { - return fread(buf,len,1,r->io.file.fp); -} - -/* Returns read/write position in file. */ -static off_t rioFileTell(rio *r) { - return ftello(r->io.file.fp); -} - -/* Flushes any buffer to target device if applicable. Returns 1 on success - * and 0 on failures. */ -static int rioFileFlush(rio *r) { - return (fflush(r->io.file.fp) == 0) ? 1 : 0; -} - -static const rio rioFileIO = { - rioFileRead, - rioFileWrite, - rioFileTell, - rioFileFlush, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* flags */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; - -void rioInitWithFile(rio *r, FILE *fp) { - *r = rioFileIO; - r->io.file.fp = fp; - r->io.file.buffered = 0; - r->io.file.autosync = 0; - r->io.file.reclaim_cache = 0; -} - -/* ------------------- Connection implementation ------------------- - * We use this RIO implementation when reading an RDB file directly from - * the connection to the memory via rdbLoadRio(), thus this implementation - * only implements reading from a connection that is, normally, - * just a socket. */ - -static size_t rioConnWrite(rio *r, const void *buf, size_t len) { - UNUSED(r); - UNUSED(buf); - UNUSED(len); - return 0; /* Error, this target does not yet support writing. */ -} - -/* Returns 1 or 0 for success/failure. */ -static size_t rioConnRead(rio *r, void *buf, size_t len) { - size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos; - - /* If the buffer is too small for the entire request: realloc. */ - if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len) - r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf)); - - /* If the remaining unused buffer is not large enough: memmove so that we - * can read the rest. */ - if (len > avail && sdsavail(r->io.conn.buf) < len - avail) { - sdsrange(r->io.conn.buf, r->io.conn.pos, -1); - r->io.conn.pos = 0; - } - - /* Make sure the caller didn't request to read past the limit. - * If they didn't we'll buffer till the limit, if they did, we'll - * return an error. */ - if (r->io.conn.read_limit != 0 && r->io.conn.read_limit < r->io.conn.read_so_far + len) { - errno = EOVERFLOW; - return 0; - } - - /* If we don't already have all the data in the sds, read more */ - while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) { - size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos; - size_t needs = len - buffered; - /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of - * the two. */ - size_t toread = needs < PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN: needs; - if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf); - if (r->io.conn.read_limit != 0 && - r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit) - { - toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered; - } - int retval = connRead(r->io.conn.conn, - (char*)r->io.conn.buf + sdslen(r->io.conn.buf), - toread); - if (retval == 0) { - return 0; - } else if (retval < 0) { - if (connLastErrorRetryable(r->io.conn.conn)) continue; - if (errno == EWOULDBLOCK) errno = ETIMEDOUT; - return 0; - } - sdsIncrLen(r->io.conn.buf, retval); - } - - memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len); - r->io.conn.read_so_far += len; - r->io.conn.pos += len; - return len; -} - -/* Returns read/write position in file. */ -static off_t rioConnTell(rio *r) { - return r->io.conn.read_so_far; -} - -/* Flushes any buffer to target device if applicable. Returns 1 on success - * and 0 on failures. */ -static int rioConnFlush(rio *r) { - /* Our flush is implemented by the write method, that recognizes a - * buffer set to NULL with a count of zero as a flush request. */ - return rioConnWrite(r,NULL,0); -} - -static const rio rioConnIO = { - rioConnRead, - rioConnWrite, - rioConnTell, - rioConnFlush, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* flags */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; - -/* Create an RIO that implements a buffered read from an fd - * read_limit argument stops buffering when the reaching the limit. */ -void rioInitWithConn(rio *r, connection *conn, size_t read_limit) { - *r = rioConnIO; - r->io.conn.conn = conn; - r->io.conn.pos = 0; - r->io.conn.read_limit = read_limit; - r->io.conn.read_so_far = 0; - r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); - sdsclear(r->io.conn.buf); -} - -/* Release the RIO stream. Optionally returns the unread buffered data - * when the SDS pointer 'remaining' is passed. */ -void rioFreeConn(rio *r, sds *remaining) { - if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) { - if (r->io.conn.pos > 0) sdsrange(r->io.conn.buf, r->io.conn.pos, -1); - *remaining = r->io.conn.buf; - } else { - sdsfree(r->io.conn.buf); - if (remaining) *remaining = NULL; - } - r->io.conn.buf = NULL; -} - -/* ------------------- File descriptor implementation ------------------ - * This target is used to write the RDB file to pipe, when the master just - * streams the data to the replicas without creating an RDB on-disk image - * (diskless replication option). - * It only implements writes. */ - -/* Returns 1 or 0 for success/failure. - * - * When buf is NULL and len is 0, the function performs a flush operation - * if there is some pending buffer, so this function is also used in order - * to implement rioFdFlush(). */ -static size_t rioFdWrite(rio *r, const void *buf, size_t len) { - ssize_t retval; - unsigned char *p = (unsigned char*) buf; - int doflush = (buf == NULL && len == 0); - - /* For small writes, we rather keep the data in user-space buffer, and flush - * it only when it grows. however for larger writes, we prefer to flush - * any pre-existing buffer, and write the new one directly without reallocs - * and memory copying. */ - if (len > PROTO_IOBUF_LEN) { - /* First, flush any pre-existing buffered data. */ - if (sdslen(r->io.fd.buf)) { - if (rioFdWrite(r, NULL, 0) == 0) - return 0; - } - /* Write the new data, keeping 'p' and 'len' from the input. */ - } else { - if (len) { - r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len); - if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) - doflush = 1; - if (!doflush) - return 1; - } - /* Flushing the buffered data. set 'p' and 'len' accordingly. */ - p = (unsigned char*) r->io.fd.buf; - len = sdslen(r->io.fd.buf); - } - - size_t nwritten = 0; - while(nwritten != len) { - retval = write(r->io.fd.fd,p+nwritten,len-nwritten); - if (retval <= 0) { - if (retval == -1 && errno == EINTR) continue; - /* With blocking io, which is the sole user of this - * rio target, EWOULDBLOCK is returned only because of - * the SO_SNDTIMEO socket option, so we translate the error - * into one more recognizable by the user. */ - if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; - return 0; /* error. */ - } - nwritten += retval; - } - - r->io.fd.pos += len; - sdsclear(r->io.fd.buf); - return 1; -} - -/* Returns 1 or 0 for success/failure. */ -static size_t rioFdRead(rio *r, void *buf, size_t len) { - UNUSED(r); - UNUSED(buf); - UNUSED(len); - return 0; /* Error, this target does not support reading. */ -} - -/* Returns read/write position in file. */ -static off_t rioFdTell(rio *r) { - return r->io.fd.pos; -} - -/* Flushes any buffer to target device if applicable. Returns 1 on success - * and 0 on failures. */ -static int rioFdFlush(rio *r) { - /* Our flush is implemented by the write method, that recognizes a - * buffer set to NULL with a count of zero as a flush request. */ - return rioFdWrite(r,NULL,0); -} - -static const rio rioFdIO = { - rioFdRead, - rioFdWrite, - rioFdTell, - rioFdFlush, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* flags */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; - -void rioInitWithFd(rio *r, int fd) { - *r = rioFdIO; - r->io.fd.fd = fd; - r->io.fd.pos = 0; - r->io.fd.buf = sdsempty(); -} - -/* release the rio stream. */ -void rioFreeFd(rio *r) { - sdsfree(r->io.fd.buf); -} - -/* ------------------- Connection set implementation ------------------ - * This target is used to write the RDB file to a set of replica connections as - * part of rdb channel replication. */ - -/* Returns 1 for success, 0 for failure. - * The function returns success as long as we are able to correctly write - * to at least one file descriptor. - * - * When buf is NULL or len is 0, the function performs a flush operation if - * there is some pending buffer, so this function is also used in order to - * implement rioConnsetFlush(). */ -static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) { - const size_t pre_flush_size = 256 * 1024; - unsigned char *p = (unsigned char*) buf; - size_t buflen = len; - size_t failed = 0; /* number of connections that write() returned error. */ - - /* For small writes, we rather keep the data in user-space buffer, and flush - * it only when it grows. however for larger writes, we prefer to flush - * any pre-existing buffer, and write the new one directly without reallocs - * and memory copying. */ - if (len > pre_flush_size) { - rioConnsetWrite(r, NULL, 0); - } else { - if (buf && len) { - r->io.connset.buf = sdscatlen(r->io.connset.buf, buf, len); - if (sdslen(r->io.connset.buf) <= PROTO_IOBUF_LEN) - return 1; - } - - p = (unsigned char *)r->io.connset.buf; - buflen = sdslen(r->io.connset.buf); - } - - while (buflen > 0) { - /* Write in little chunks so that when there are big writes we - * parallelize while the kernel is sending data in background to the - * TCP socket. */ - size_t limit = PROTO_IOBUF_LEN * 2; - size_t count = buflen < limit ? buflen : limit; - - for (size_t i = 0; i < r->io.connset.n_dst; i++) { - size_t n_written = 0; - - if (r->io.connset.dst[i].failed != 0) { - failed++; - continue; /* Skip failed connections. */ - } - - do { - ssize_t ret; - connection *c = r->io.connset.dst[i].conn; - - ret = connWrite(c, p + n_written, count - n_written); - if (ret <= 0) { - if (errno == 0) - errno = EIO; - /* With blocking sockets, which is the sole user of this - * rio target, EWOULDBLOCK is returned only because of - * the SO_SNDTIMEO socket option, so we translate the error - * into one more recognizable by the user. */ - if (ret == -1 && errno == EWOULDBLOCK) - errno = ETIMEDOUT; - - r->io.connset.dst[i].failed = 1; - failed++; - break; - } - n_written += ret; - } while (n_written != count); - } - if (failed == r->io.connset.n_dst) - return 0; /* All the connections have failed. */ - - p += count; - buflen -= count; - r->io.connset.pos += count; - } - - sdsclear(r->io.connset.buf); - return 1; -} - -/* Returns 1 or 0 for success/failure. */ -static size_t rioConnsetRead(rio *r, void *buf, size_t len) { - UNUSED(r); - UNUSED(buf); - UNUSED(len); - return 0; /* Error, this target does not support reading. */ -} - -/* Returns the number of sent bytes. */ -static off_t rioConnsetTell(rio *r) { - return r->io.connset.pos; -} - -/* Flushes any buffer to target device if applicable. Returns 1 on success - * and 0 on failures. */ -static int rioConnsetFlush(rio *r) { - /* Our flush is implemented by the write method, that recognizes a - * buffer set to NULL with a count of zero as a flush request. */ - return rioConnsetWrite(r, NULL, 0); -} - -static const rio rioConnsetIO = { - rioConnsetRead, - rioConnsetWrite, - rioConnsetTell, - rioConnsetFlush, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* flags */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; - -void rioInitWithConnset(rio *r, connection **conns, size_t n_conns) { - *r = rioConnsetIO; - r->io.connset.dst = zcalloc(sizeof(*r->io.connset.dst) * n_conns); - r->io.connset.n_dst = n_conns; - r->io.connset.pos = 0; - r->io.connset.buf = sdsempty(); - - for (size_t i = 0; i < n_conns; i++) - r->io.connset.dst[i].conn = conns[i]; -} - -/* release the rio stream. */ -void rioFreeConnset(rio *r) { - zfree(r->io.connset.dst); - sdsfree(r->io.connset.buf); -} - -/* ---------------------------- Generic functions ---------------------------- */ - -/* This function can be installed both in memory and file streams when checksum - * computation is needed. */ -void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { - r->cksum = crc64(r->cksum,buf,len); -} - -/* Set the file-based rio object to auto-fsync every 'bytes' file written. - * By default this is set to zero that means no automatic file sync is - * performed. - * - * This feature is useful in a few contexts since when we rely on OS write - * buffers sometimes the OS buffers way too much, resulting in too many - * disk I/O concentrated in very little time. When we fsync in an explicit - * way instead the I/O pressure is more distributed across time. */ -void rioSetAutoSync(rio *r, off_t bytes) { - if(r->write != rioFileIO.write) return; - r->io.file.autosync = bytes; -} - -/* Set the file-based rio object to reclaim cache after every auto-sync. - * In the Linux implementation POSIX_FADV_DONTNEED skips the dirty - * pages, so if auto sync is unset this option will have no effect. - * - * This feature can reduce the cache footprint backed by the file. */ -void rioSetReclaimCache(rio *r, int enabled) { - r->io.file.reclaim_cache = enabled; -} - -/* Check the type of rio. */ -uint8_t rioCheckType(rio *r) { - if (r->read == rioFileRead) { - return RIO_TYPE_FILE; - } else if (r->read == rioBufferRead) { - return RIO_TYPE_BUFFER; - } else if (r->read == rioConnRead) { - return RIO_TYPE_CONN; - } else { - /* r->read == rioFdRead */ - return RIO_TYPE_FD; - } -} - -/* --------------------------- Higher level interface -------------------------- - * - * The following higher level functions use lower level rio.c functions to help - * generating the Redis protocol for the Append Only File. */ - -/* Write multi bulk count in the format: "*<count>\r\n". */ -size_t rioWriteBulkCount(rio *r, char prefix, long count) { - char cbuf[128]; - int clen; - - cbuf[0] = prefix; - clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count); - cbuf[clen++] = '\r'; - cbuf[clen++] = '\n'; - if (rioWrite(r,cbuf,clen) == 0) return 0; - return clen; -} - -/* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */ -size_t rioWriteBulkString(rio *r, const char *buf, size_t len) { - size_t nwritten; - - if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0; - if (len > 0 && rioWrite(r,buf,len) == 0) return 0; - if (rioWrite(r,"\r\n",2) == 0) return 0; - return nwritten+len+2; -} - -/* Write a long long value in format: "$<count>\r\n<payload>\r\n". */ -size_t rioWriteBulkLongLong(rio *r, long long l) { - char lbuf[32]; - unsigned int llen; - - llen = ll2string(lbuf,sizeof(lbuf),l); - return rioWriteBulkString(r,lbuf,llen); -} - -/* Write a double value in the format: "$<count>\r\n<payload>\r\n" */ -size_t rioWriteBulkDouble(rio *r, double d) { - char dbuf[128]; - unsigned int dlen; - dlen = fpconv_dtoa(d, dbuf); - dbuf[dlen] = '\0'; - return rioWriteBulkString(r,dbuf,dlen); -} |
