summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/logreqres.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/logreqres.c
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/logreqres.c')
-rw-r--r--examples/redis-unstable/src/logreqres.c347
1 files changed, 347 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/logreqres.c b/examples/redis-unstable/src/logreqres.c
new file mode 100644
index 0000000..a769eb1
--- /dev/null
+++ b/examples/redis-unstable/src/logreqres.c
@@ -0,0 +1,347 @@
+/*
+ * Copyright (c) 2021-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ */
+
+/* This file implements the interface of logging clients' requests and
+ * responses into a file.
+ * This feature needs the LOG_REQ_RES macro to be compiled and is turned
+ * on by the req-res-logfile config."
+ *
+ * Some examples:
+ *
+ * PING:
+ *
+ * 4
+ * ping
+ * 12
+ * __argv_end__
+ * +PONG
+ *
+ * LRANGE:
+ *
+ * 6
+ * lrange
+ * 4
+ * list
+ * 1
+ * 0
+ * 2
+ * -1
+ * 12
+ * __argv_end__
+ * *1
+ * $3
+ * ele
+ *
+ * The request is everything up until the __argv_end__ marker.
+ * The format is:
+ * <number of characters>
+ * <the argument>
+ *
+ * After __argv_end__ the response appears, and the format is
+ * RESP (2 or 3, depending on what the client has configured)
+ */
+
+#include "server.h"
+#include <ctype.h>
+
+#ifdef LOG_REQ_RES
+
+/* ----- Helpers ----- */
+
+static int reqresShouldLog(client *c) {
+ if (!server.req_res_logfile)
+ return 0;
+
+ /* Ignore client with streaming non-standard response */
+ if (c->flags & (CLIENT_PUBSUB|CLIENT_MONITOR|CLIENT_SLAVE))
+ return 0;
+
+ /* We only work on masters (didn't implement reqresAppendResponse to work on shared slave buffers) */
+ if (getClientType(c) == CLIENT_TYPE_MASTER)
+ return 0;
+
+ return 1;
+}
+
+static size_t reqresAppendBuffer(client *c, void *buf, size_t len) {
+ if (!c->reqres.buf) {
+ c->reqres.capacity = max(len, 1024);
+ c->reqres.buf = zmalloc(c->reqres.capacity);
+ } else if (c->reqres.capacity - c->reqres.used < len) {
+ c->reqres.capacity += len;
+ c->reqres.buf = zrealloc(c->reqres.buf, c->reqres.capacity);
+ }
+
+ memcpy(c->reqres.buf + c->reqres.used, buf, len);
+ c->reqres.used += len;
+ return len;
+}
+
+/* Functions for requests */
+
+static size_t reqresAppendArg(client *c, char *arg, size_t arg_len) {
+ char argv_len_buf[LONG_STR_SIZE];
+ size_t argv_len_buf_len = ll2string(argv_len_buf,sizeof(argv_len_buf),(long)arg_len);
+ size_t ret = reqresAppendBuffer(c, argv_len_buf, argv_len_buf_len);
+ ret += reqresAppendBuffer(c, "\r\n", 2);
+ ret += reqresAppendBuffer(c, arg, arg_len);
+ ret += reqresAppendBuffer(c, "\r\n", 2);
+ return ret;
+}
+
+/* Helper function to decode and append encoded buffer content.
+ * Encoded buffers contain payloadHeader structures followed by payloads.
+ * For PLAIN_REPLY: just copy the payload data.
+ * For BULK_STR_REF: expand to "$<len>\r\n<string>\r\n" format. */
+static size_t reqresAppendEncodedBuffer(client *c, char *buf, size_t len) {
+ size_t ret = 0;
+ char *ptr = buf;
+ char *end = buf + len;
+
+ while (ptr < end) {
+ payloadHeader *header = (payloadHeader *)ptr;
+ if (header->payload_type == PLAIN_REPLY) {
+ /* Plain reply data - copy directly */
+ ret += reqresAppendBuffer(c, ptr + sizeof(payloadHeader), header->payload_len);
+ } else {
+ /* BULK_STR_REF - expand to full RESP format */
+ bulkStrRef *str_ref = (bulkStrRef *)(ptr + sizeof(payloadHeader));
+
+ /* Append prefix: "$<len>\r\n" */
+ ret += reqresAppendBuffer(c, str_ref->prefix, str_ref->prefix_cnt);
+ /* Append string content */
+ ret += reqresAppendBuffer(c, str_ref->obj->ptr, sdslen(str_ref->obj->ptr));
+ /* Append trailing CRLF */
+ ret += reqresAppendBuffer(c, str_ref->crlf, 2);
+ }
+ ptr += sizeof(payloadHeader) + header->payload_len;
+ }
+
+ return ret;
+}
+
+/* ----- API ----- */
+
+
+/* Zero out the clientReqResInfo struct inside the client,
+ * and free the buffer if needed */
+void reqresReset(client *c, int free_buf) {
+ if (free_buf && c->reqres.buf)
+ zfree(c->reqres.buf);
+ memset(&c->reqres, 0, sizeof(c->reqres));
+}
+
+/* Save the offset of the reply buffer (or the reply list).
+ * Should be called when adding a reply (but it will only save the offset
+ * on the very first time it's called, because of c->reqres.offset.saved)
+ * The idea is:
+ * 1. When a client is executing a command, we save the reply offset.
+ * 2. During the execution, the reply offset may grow, as addReply* functions are called.
+ * 3. When client is done with the command (commandProcessed), reqresAppendResponse
+ * is called.
+ * 4. reqresAppendResponse will append the diff between the current offset and the one from step (1)
+ * 5. When client is reset before the next command, we clear c->reqres.offset.saved and start again
+ *
+ * We cannot reply on c->sentlen to keep track because it depends on the network
+ * (reqresAppendResponse will always write the whole buffer, unlike writeToClient)
+ *
+ * Ideally, we would just have this code inside reqresAppendRequest, which is called
+ * from processCommand, but we cannot save the reply offset inside processCommand
+ * because of the following pipe-lining scenario:
+ * set rd [redis_deferring_client]
+ * set buf ""
+ * append buf "SET key vale\r\n"
+ * append buf "BLPOP mylist 0\r\n"
+ * $rd write $buf
+ * $rd flush
+ *
+ * Let's assume we save the reply offset in processCommand
+ * When BLPOP is processed the offset is 5 (+OK\r\n from the SET)
+ * Then beforeSleep is called, the +OK is written to network, and bufpos is 0
+ * When the client is finally unblocked, the cached offset is 5, but bufpos is already
+ * 0, so we would miss the first 5 bytes of the reply.
+ **/
+void reqresSaveClientReplyOffset(client *c) {
+ if (!reqresShouldLog(c))
+ return;
+
+ if (c->reqres.offset.saved)
+ return;
+
+ c->reqres.offset.saved = 1;
+
+ c->reqres.offset.bufpos = c->bufpos;
+ if (listLength(c->reply) && listNodeValue(listLast(c->reply))) {
+ c->reqres.offset.last_node.index = listLength(c->reply) - 1;
+ c->reqres.offset.last_node.used = ((clientReplyBlock *)listNodeValue(listLast(c->reply)))->used;
+ } else {
+ c->reqres.offset.last_node.index = 0;
+ c->reqres.offset.last_node.used = 0;
+ }
+}
+
+size_t reqresAppendRequest(client *c) {
+ robj **argv = c->argv;
+ int argc = c->argc;
+
+ serverAssert(argc);
+
+ if (!reqresShouldLog(c))
+ return 0;
+
+ /* Ignore commands that have streaming non-standard response */
+ sds cmd = argv[0]->ptr;
+ if (!strcasecmp(cmd,"debug") || /* because of DEBUG SEGFAULT */
+ !strcasecmp(cmd,"sync") ||
+ !strcasecmp(cmd,"psync") ||
+ !strcasecmp(cmd,"monitor") ||
+ !strcasecmp(cmd,"subscribe") ||
+ !strcasecmp(cmd,"unsubscribe") ||
+ !strcasecmp(cmd,"ssubscribe") ||
+ !strcasecmp(cmd,"sunsubscribe") ||
+ !strcasecmp(cmd,"psubscribe") ||
+ !strcasecmp(cmd,"punsubscribe"))
+ {
+ return 0;
+ }
+
+ c->reqres.argv_logged = 1;
+
+ size_t ret = 0;
+ for (int i = 0; i < argc; i++) {
+ if (sdsEncodedObject(argv[i])) {
+ ret += reqresAppendArg(c, argv[i]->ptr, sdslen(argv[i]->ptr));
+ } else if (argv[i]->encoding == OBJ_ENCODING_INT) {
+ char buf[LONG_STR_SIZE];
+ size_t len = ll2string(buf,sizeof(buf),(long)argv[i]->ptr);
+ ret += reqresAppendArg(c, buf, len);
+ } else {
+ serverPanic("Wrong encoding in reqresAppendRequest()");
+ }
+ }
+ return ret + reqresAppendArg(c, "__argv_end__", 12);
+}
+
+size_t reqresAppendResponse(client *c) {
+ size_t ret = 0;
+
+ if (!reqresShouldLog(c))
+ return 0;
+
+ if (!c->reqres.argv_logged) /* Example: UNSUBSCRIBE */
+ return 0;
+
+ if (!c->reqres.offset.saved) /* Example: module client blocked on keys + CLIENT KILL */
+ return 0;
+
+ /* First append the static reply buffer */
+ if (c->bufpos > c->reqres.offset.bufpos) {
+ size_t written;
+ if (!c->buf_encoded) {
+ /* Plain buffer - copy directly */
+ written = reqresAppendBuffer(c, c->buf + c->reqres.offset.bufpos, c->bufpos - c->reqres.offset.bufpos);
+ } else {
+ /* Decode and append encoded buffer */
+ written = reqresAppendEncodedBuffer(c, c->buf + c->reqres.offset.bufpos, c->bufpos - c->reqres.offset.bufpos);
+ }
+ ret += written;
+ }
+
+ int curr_index = 0;
+ size_t curr_used = 0;
+ if (listLength(c->reply)) {
+ curr_index = listLength(c->reply) - 1;
+ curr_used = ((clientReplyBlock *)listNodeValue(listLast(c->reply)))->used;
+ }
+
+ /* Now, append reply bytes from the reply list */
+ if (curr_index > c->reqres.offset.last_node.index ||
+ curr_used > c->reqres.offset.last_node.used)
+ {
+ int i = 0;
+ listIter iter;
+ listNode *curr;
+ clientReplyBlock *o;
+ listRewind(c->reply, &iter);
+ while ((curr = listNext(&iter)) != NULL) {
+ size_t written = 0;
+
+ /* Skip nodes we had already processed */
+ if (i < c->reqres.offset.last_node.index) {
+ i++;
+ continue;
+ }
+ o = listNodeValue(curr);
+ if (o->used == 0) {
+ i++;
+ continue;
+ }
+
+ if (!o->buf_encoded) {
+ if (i == c->reqres.offset.last_node.index) {
+ /* Write the potentially incomplete node, which had data from
+ * before the current command started */
+ written = reqresAppendBuffer(c, o->buf + c->reqres.offset.last_node.used,
+ o->used - c->reqres.offset.last_node.used);
+ } else {
+ /* New node */
+ written = reqresAppendBuffer(c, o->buf, o->used);
+ }
+ } else {
+ /* Encoded buffer - decode and append */
+ if (i == c->reqres.offset.last_node.index) {
+ /* Write the potentially incomplete node, which had data from
+ * before the current command started */
+ written = reqresAppendEncodedBuffer(c, o->buf + c->reqres.offset.last_node.used,
+ o->used - c->reqres.offset.last_node.used);
+ } else {
+ /* New node */
+ written = reqresAppendEncodedBuffer(c, o->buf, o->used);
+ }
+ }
+
+ ret += written;
+ i++;
+ }
+ }
+ serverAssert(ret);
+
+ /* Flush both request and response to file */
+ FILE *fp = fopen(server.req_res_logfile, "a");
+ serverAssert(fp);
+ fwrite(c->reqres.buf, c->reqres.used, 1, fp);
+ fclose(fp);
+
+ return ret;
+}
+
+#else /* #ifdef LOG_REQ_RES */
+
+/* Just mimic the API without doing anything */
+
+void reqresReset(client *c, int free_buf) {
+ UNUSED(c);
+ UNUSED(free_buf);
+}
+
+inline void reqresSaveClientReplyOffset(client *c) {
+ UNUSED(c);
+}
+
+inline size_t reqresAppendRequest(client *c) {
+ UNUSED(c);
+ return 0;
+}
+
+inline size_t reqresAppendResponse(client *c) {
+ UNUSED(c);
+ return 0;
+}
+
+#endif /* #ifdef LOG_REQ_RES */