diff options
Diffstat (limited to 'examples/redis-unstable/src/logreqres.c')
| -rw-r--r-- | examples/redis-unstable/src/logreqres.c | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/logreqres.c b/examples/redis-unstable/src/logreqres.c new file mode 100644 index 0000000..a769eb1 --- /dev/null +++ b/examples/redis-unstable/src/logreqres.c @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2021-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +/* This file implements the interface of logging clients' requests and + * responses into a file. + * This feature needs the LOG_REQ_RES macro to be compiled and is turned + * on by the req-res-logfile config." + * + * Some examples: + * + * PING: + * + * 4 + * ping + * 12 + * __argv_end__ + * +PONG + * + * LRANGE: + * + * 6 + * lrange + * 4 + * list + * 1 + * 0 + * 2 + * -1 + * 12 + * __argv_end__ + * *1 + * $3 + * ele + * + * The request is everything up until the __argv_end__ marker. + * The format is: + * <number of characters> + * <the argument> + * + * After __argv_end__ the response appears, and the format is + * RESP (2 or 3, depending on what the client has configured) + */ + +#include "server.h" +#include <ctype.h> + +#ifdef LOG_REQ_RES + +/* ----- Helpers ----- */ + +static int reqresShouldLog(client *c) { + if (!server.req_res_logfile) + return 0; + + /* Ignore client with streaming non-standard response */ + if (c->flags & (CLIENT_PUBSUB|CLIENT_MONITOR|CLIENT_SLAVE)) + return 0; + + /* We only work on masters (didn't implement reqresAppendResponse to work on shared slave buffers) */ + if (getClientType(c) == CLIENT_TYPE_MASTER) + return 0; + + return 1; +} + +static size_t reqresAppendBuffer(client *c, void *buf, size_t len) { + if (!c->reqres.buf) { + c->reqres.capacity = max(len, 1024); + c->reqres.buf = zmalloc(c->reqres.capacity); + } else if (c->reqres.capacity - c->reqres.used < len) { + c->reqres.capacity += len; + c->reqres.buf = zrealloc(c->reqres.buf, c->reqres.capacity); + } + + memcpy(c->reqres.buf + c->reqres.used, buf, len); + c->reqres.used += len; + return len; +} + +/* Functions for requests */ + +static size_t reqresAppendArg(client *c, char *arg, size_t arg_len) { + char argv_len_buf[LONG_STR_SIZE]; + size_t argv_len_buf_len = ll2string(argv_len_buf,sizeof(argv_len_buf),(long)arg_len); + size_t ret = reqresAppendBuffer(c, argv_len_buf, argv_len_buf_len); + ret += reqresAppendBuffer(c, "\r\n", 2); + ret += reqresAppendBuffer(c, arg, arg_len); + ret += reqresAppendBuffer(c, "\r\n", 2); + return ret; +} + +/* Helper function to decode and append encoded buffer content. + * Encoded buffers contain payloadHeader structures followed by payloads. + * For PLAIN_REPLY: just copy the payload data. + * For BULK_STR_REF: expand to "$<len>\r\n<string>\r\n" format. */ +static size_t reqresAppendEncodedBuffer(client *c, char *buf, size_t len) { + size_t ret = 0; + char *ptr = buf; + char *end = buf + len; + + while (ptr < end) { + payloadHeader *header = (payloadHeader *)ptr; + if (header->payload_type == PLAIN_REPLY) { + /* Plain reply data - copy directly */ + ret += reqresAppendBuffer(c, ptr + sizeof(payloadHeader), header->payload_len); + } else { + /* BULK_STR_REF - expand to full RESP format */ + bulkStrRef *str_ref = (bulkStrRef *)(ptr + sizeof(payloadHeader)); + + /* Append prefix: "$<len>\r\n" */ + ret += reqresAppendBuffer(c, str_ref->prefix, str_ref->prefix_cnt); + /* Append string content */ + ret += reqresAppendBuffer(c, str_ref->obj->ptr, sdslen(str_ref->obj->ptr)); + /* Append trailing CRLF */ + ret += reqresAppendBuffer(c, str_ref->crlf, 2); + } + ptr += sizeof(payloadHeader) + header->payload_len; + } + + return ret; +} + +/* ----- API ----- */ + + +/* Zero out the clientReqResInfo struct inside the client, + * and free the buffer if needed */ +void reqresReset(client *c, int free_buf) { + if (free_buf && c->reqres.buf) + zfree(c->reqres.buf); + memset(&c->reqres, 0, sizeof(c->reqres)); +} + +/* Save the offset of the reply buffer (or the reply list). + * Should be called when adding a reply (but it will only save the offset + * on the very first time it's called, because of c->reqres.offset.saved) + * The idea is: + * 1. When a client is executing a command, we save the reply offset. + * 2. During the execution, the reply offset may grow, as addReply* functions are called. + * 3. When client is done with the command (commandProcessed), reqresAppendResponse + * is called. + * 4. reqresAppendResponse will append the diff between the current offset and the one from step (1) + * 5. When client is reset before the next command, we clear c->reqres.offset.saved and start again + * + * We cannot reply on c->sentlen to keep track because it depends on the network + * (reqresAppendResponse will always write the whole buffer, unlike writeToClient) + * + * Ideally, we would just have this code inside reqresAppendRequest, which is called + * from processCommand, but we cannot save the reply offset inside processCommand + * because of the following pipe-lining scenario: + * set rd [redis_deferring_client] + * set buf "" + * append buf "SET key vale\r\n" + * append buf "BLPOP mylist 0\r\n" + * $rd write $buf + * $rd flush + * + * Let's assume we save the reply offset in processCommand + * When BLPOP is processed the offset is 5 (+OK\r\n from the SET) + * Then beforeSleep is called, the +OK is written to network, and bufpos is 0 + * When the client is finally unblocked, the cached offset is 5, but bufpos is already + * 0, so we would miss the first 5 bytes of the reply. + **/ +void reqresSaveClientReplyOffset(client *c) { + if (!reqresShouldLog(c)) + return; + + if (c->reqres.offset.saved) + return; + + c->reqres.offset.saved = 1; + + c->reqres.offset.bufpos = c->bufpos; + if (listLength(c->reply) && listNodeValue(listLast(c->reply))) { + c->reqres.offset.last_node.index = listLength(c->reply) - 1; + c->reqres.offset.last_node.used = ((clientReplyBlock *)listNodeValue(listLast(c->reply)))->used; + } else { + c->reqres.offset.last_node.index = 0; + c->reqres.offset.last_node.used = 0; + } +} + +size_t reqresAppendRequest(client *c) { + robj **argv = c->argv; + int argc = c->argc; + + serverAssert(argc); + + if (!reqresShouldLog(c)) + return 0; + + /* Ignore commands that have streaming non-standard response */ + sds cmd = argv[0]->ptr; + if (!strcasecmp(cmd,"debug") || /* because of DEBUG SEGFAULT */ + !strcasecmp(cmd,"sync") || + !strcasecmp(cmd,"psync") || + !strcasecmp(cmd,"monitor") || + !strcasecmp(cmd,"subscribe") || + !strcasecmp(cmd,"unsubscribe") || + !strcasecmp(cmd,"ssubscribe") || + !strcasecmp(cmd,"sunsubscribe") || + !strcasecmp(cmd,"psubscribe") || + !strcasecmp(cmd,"punsubscribe")) + { + return 0; + } + + c->reqres.argv_logged = 1; + + size_t ret = 0; + for (int i = 0; i < argc; i++) { + if (sdsEncodedObject(argv[i])) { + ret += reqresAppendArg(c, argv[i]->ptr, sdslen(argv[i]->ptr)); + } else if (argv[i]->encoding == OBJ_ENCODING_INT) { + char buf[LONG_STR_SIZE]; + size_t len = ll2string(buf,sizeof(buf),(long)argv[i]->ptr); + ret += reqresAppendArg(c, buf, len); + } else { + serverPanic("Wrong encoding in reqresAppendRequest()"); + } + } + return ret + reqresAppendArg(c, "__argv_end__", 12); +} + +size_t reqresAppendResponse(client *c) { + size_t ret = 0; + + if (!reqresShouldLog(c)) + return 0; + + if (!c->reqres.argv_logged) /* Example: UNSUBSCRIBE */ + return 0; + + if (!c->reqres.offset.saved) /* Example: module client blocked on keys + CLIENT KILL */ + return 0; + + /* First append the static reply buffer */ + if (c->bufpos > c->reqres.offset.bufpos) { + size_t written; + if (!c->buf_encoded) { + /* Plain buffer - copy directly */ + written = reqresAppendBuffer(c, c->buf + c->reqres.offset.bufpos, c->bufpos - c->reqres.offset.bufpos); + } else { + /* Decode and append encoded buffer */ + written = reqresAppendEncodedBuffer(c, c->buf + c->reqres.offset.bufpos, c->bufpos - c->reqres.offset.bufpos); + } + ret += written; + } + + int curr_index = 0; + size_t curr_used = 0; + if (listLength(c->reply)) { + curr_index = listLength(c->reply) - 1; + curr_used = ((clientReplyBlock *)listNodeValue(listLast(c->reply)))->used; + } + + /* Now, append reply bytes from the reply list */ + if (curr_index > c->reqres.offset.last_node.index || + curr_used > c->reqres.offset.last_node.used) + { + int i = 0; + listIter iter; + listNode *curr; + clientReplyBlock *o; + listRewind(c->reply, &iter); + while ((curr = listNext(&iter)) != NULL) { + size_t written = 0; + + /* Skip nodes we had already processed */ + if (i < c->reqres.offset.last_node.index) { + i++; + continue; + } + o = listNodeValue(curr); + if (o->used == 0) { + i++; + continue; + } + + if (!o->buf_encoded) { + if (i == c->reqres.offset.last_node.index) { + /* Write the potentially incomplete node, which had data from + * before the current command started */ + written = reqresAppendBuffer(c, o->buf + c->reqres.offset.last_node.used, + o->used - c->reqres.offset.last_node.used); + } else { + /* New node */ + written = reqresAppendBuffer(c, o->buf, o->used); + } + } else { + /* Encoded buffer - decode and append */ + if (i == c->reqres.offset.last_node.index) { + /* Write the potentially incomplete node, which had data from + * before the current command started */ + written = reqresAppendEncodedBuffer(c, o->buf + c->reqres.offset.last_node.used, + o->used - c->reqres.offset.last_node.used); + } else { + /* New node */ + written = reqresAppendEncodedBuffer(c, o->buf, o->used); + } + } + + ret += written; + i++; + } + } + serverAssert(ret); + + /* Flush both request and response to file */ + FILE *fp = fopen(server.req_res_logfile, "a"); + serverAssert(fp); + fwrite(c->reqres.buf, c->reqres.used, 1, fp); + fclose(fp); + + return ret; +} + +#else /* #ifdef LOG_REQ_RES */ + +/* Just mimic the API without doing anything */ + +void reqresReset(client *c, int free_buf) { + UNUSED(c); + UNUSED(free_buf); +} + +inline void reqresSaveClientReplyOffset(client *c) { + UNUSED(c); +} + +inline size_t reqresAppendRequest(client *c) { + UNUSED(c); + return 0; +} + +inline size_t reqresAppendResponse(client *c) { + UNUSED(c); + return 0; +} + +#endif /* #ifdef LOG_REQ_RES */ |
