summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/redis-benchmark.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/redis-benchmark.c')
-rw-r--r--examples/redis-unstable/src/redis-benchmark.c2028
1 files changed, 0 insertions, 2028 deletions
diff --git a/examples/redis-unstable/src/redis-benchmark.c b/examples/redis-unstable/src/redis-benchmark.c
deleted file mode 100644
index 8c9e062..0000000
--- a/examples/redis-unstable/src/redis-benchmark.c
+++ /dev/null
@@ -1,2028 +0,0 @@
-/* Redis benchmark utility.
- *
- * Copyright (c) 2009-Present, Redis Ltd.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- */
-
-#include "fmacros.h"
-
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <errno.h>
-#include <time.h>
-#include <sys/time.h>
-#include <signal.h>
-#include <assert.h>
-#include <math.h>
-#include <pthread.h>
-
-#include <sdscompat.h> /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */
-#include <sds.h> /* Use hiredis sds. */
-#include "ae.h"
-#include <hiredis.h>
-#ifdef USE_OPENSSL
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-#include <hiredis_ssl.h>
-#endif
-#include "adlist.h"
-#include "dict.h"
-#include "zmalloc.h"
-#include "atomicvar.h"
-#include "crc16_slottable.h"
-#include "hdr_histogram.h"
-#include "cli_common.h"
-#include "mt19937-64.h"
-
-#define UNUSED(V) ((void) V)
-#define RANDPTR_INITIAL_SIZE 8
-#define DEFAULT_LATENCY_PRECISION 3
-#define MAX_LATENCY_PRECISION 4
-#define MAX_THREADS 500
-#define CLUSTER_SLOTS 16384
-#define CONFIG_LATENCY_HISTOGRAM_MIN_VALUE 10L /* >= 10 usecs */
-#define CONFIG_LATENCY_HISTOGRAM_MAX_VALUE 3000000L /* <= 3 secs(us precision) */
-#define CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE 3000000L /* <= 3 secs(us precision) */
-#define SHOW_THROUGHPUT_INTERVAL 250 /* 250ms */
-
-#define CLIENT_GET_EVENTLOOP(c) \
- (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)
-
-struct benchmarkThread;
-struct clusterNode;
-struct redisConfig;
-
-static struct config {
- aeEventLoop *el;
- cliConnInfo conn_info;
- const char *hostsocket;
- int tls;
- struct cliSSLconfig sslconfig;
- int numclients;
- redisAtomic int liveclients;
- int requests;
- redisAtomic int requests_issued;
- redisAtomic int requests_finished;
- redisAtomic int previous_requests_finished;
- int last_printed_bytes;
- long long previous_tick;
- int keysize;
- int datasize;
- int randomkeys;
- int randomkeys_keyspacelen;
- int keepalive;
- int pipeline;
- long long start;
- long long totlatency;
- const char *title;
- list *clients;
- int quiet;
- int csv;
- int loop;
- int idlemode;
- sds input_dbnumstr;
- char *tests;
- int stdinarg; /* get last arg from stdin. (-x option) */
- int precision;
- int num_threads;
- struct benchmarkThread **threads;
- int cluster_mode;
- int cluster_node_count;
- struct clusterNode **cluster_nodes;
- struct redisConfig *redis_config;
- struct hdr_histogram* latency_histogram;
- struct hdr_histogram* current_sec_latency_histogram;
- redisAtomic int is_fetching_slots;
- redisAtomic int is_updating_slots;
- redisAtomic int slots_last_update;
- int enable_tracking;
- pthread_mutex_t liveclients_mutex;
- pthread_mutex_t is_updating_slots_mutex;
- int resp3; /* use RESP3 */
-} config;
-
-typedef struct _client {
- redisContext *context;
- sds obuf;
- char **randptr; /* Pointers to :rand: strings inside the command buf */
- size_t randlen; /* Number of pointers in client->randptr */
- size_t randfree; /* Number of unused pointers in client->randptr */
- char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
- size_t staglen; /* Number of pointers in client->stagptr */
- size_t stagfree; /* Number of unused pointers in client->stagptr */
- size_t written; /* Bytes of 'obuf' already written */
- long long start; /* Start time of a request */
- long long latency; /* Request latency */
- int pending; /* Number of pending requests (replies to consume) */
- int prefix_pending; /* If non-zero, number of pending prefix commands. Commands
- such as auth and select are prefixed to the pipeline of
- benchmark commands and discarded after the first send. */
- int prefixlen; /* Size in bytes of the pending prefix commands */
- int thread_id;
- struct clusterNode *cluster_node;
- int slots_last_update;
-} *client;
-
-/* Threads. */
-
-typedef struct benchmarkThread {
- int index;
- pthread_t thread;
- aeEventLoop *el;
-} benchmarkThread;
-
-/* Cluster. */
-typedef struct clusterNode {
- char *ip;
- int port;
- sds name;
- int flags;
- sds replicate; /* Master ID if node is a slave */
- int *slots;
- int slots_count;
- int *updated_slots; /* Used by updateClusterSlotsConfiguration */
- int updated_slots_count; /* Used by updateClusterSlotsConfiguration */
- int replicas_count;
- sds *migrating; /* An array of sds where even strings are slots and odd
- * strings are the destination node IDs. */
- sds *importing; /* An array of sds where even strings are slots and odd
- * strings are the source node IDs. */
- int migrating_count; /* Length of the migrating array (migrating slots*2) */
- int importing_count; /* Length of the importing array (importing slots*2) */
- struct redisConfig *redis_config;
-} clusterNode;
-
-typedef struct redisConfig {
- sds save;
- sds appendonly;
-} redisConfig;
-
-/* Prototypes */
-static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
-static void createMissingClients(client c);
-static benchmarkThread *createBenchmarkThread(int index);
-static void freeBenchmarkThread(benchmarkThread *thread);
-static void freeBenchmarkThreads(void);
-static void *execBenchmarkThread(void *ptr);
-static clusterNode *createClusterNode(char *ip, int port);
-static redisConfig *getRedisConfig(const char *ip, int port,
- const char *hostsocket);
-static redisContext *getRedisContext(const char *ip, int port,
- const char *hostsocket);
-static void freeRedisConfig(redisConfig *cfg);
-static int fetchClusterSlotsConfiguration(client c);
-static void updateClusterSlotsConfiguration(void);
-int showThroughput(struct aeEventLoop *eventLoop, long long id,
- void *clientData);
-
-/* Dict callbacks */
-static uint64_t dictSdsHash(const void *key);
-static int dictSdsKeyCompare(dictCmpCache *cache, const void *key1, const void *key2);
-
-/* Implementation */
-static long long ustime(void) {
- struct timeval tv;
- long long ust;
-
- gettimeofday(&tv, NULL);
- ust = ((long long)tv.tv_sec)*1000000;
- ust += tv.tv_usec;
- return ust;
-}
-
-static long long mstime(void) {
- return ustime()/1000;
-}
-
-static uint64_t dictSdsHash(const void *key) {
- return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
-}
-
-static int dictSdsKeyCompare(dictCmpCache *cache, const void *key1, const void *key2)
-{
- int l1,l2;
- UNUSED(cache);
-
- l1 = sdslen((sds)key1);
- l2 = sdslen((sds)key2);
- if (l1 != l2) return 0;
- return memcmp(key1, key2, l1) == 0;
-}
-
-static redisContext *getRedisContext(const char *ip, int port,
- const char *hostsocket)
-{
- redisContext *ctx = NULL;
- redisReply *reply = NULL;
- if (hostsocket == NULL)
- ctx = redisConnect(ip, port);
- else
- ctx = redisConnectUnix(hostsocket);
- if (ctx == NULL || ctx->err) {
- fprintf(stderr,"Could not connect to Redis at ");
- char *err = (ctx != NULL ? ctx->errstr : "");
- if (hostsocket == NULL)
- fprintf(stderr,"%s:%d: %s\n",ip,port,err);
- else
- fprintf(stderr,"%s: %s\n",hostsocket,err);
- goto cleanup;
- }
- if (config.tls==1) {
- const char *err = NULL;
- if (cliSecureConnection(ctx, config.sslconfig, &err) == REDIS_ERR && err) {
- fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
- goto cleanup;
- }
- }
- if (config.conn_info.auth == NULL)
- return ctx;
- if (config.conn_info.user == NULL)
- reply = redisCommand(ctx,"AUTH %s", config.conn_info.auth);
- else
- reply = redisCommand(ctx,"AUTH %s %s", config.conn_info.user, config.conn_info.auth);
- if (reply != NULL) {
- if (reply->type == REDIS_REPLY_ERROR) {
- if (hostsocket == NULL)
- fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
- else
- fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
- freeReplyObject(reply);
- redisFree(ctx);
- exit(1);
- }
- freeReplyObject(reply);
- return ctx;
- }
- fprintf(stderr, "ERROR: failed to fetch reply from ");
- if (hostsocket == NULL)
- fprintf(stderr, "%s:%d\n", ip, port);
- else
- fprintf(stderr, "%s\n", hostsocket);
-cleanup:
- freeReplyObject(reply);
- redisFree(ctx);
- return NULL;
-}
-
-
-
-static redisConfig *getRedisConfig(const char *ip, int port,
- const char *hostsocket)
-{
- redisConfig *cfg = zcalloc(sizeof(*cfg));
- if (!cfg) return NULL;
- redisContext *c = NULL;
- redisReply *reply = NULL, *sub_reply = NULL;
- c = getRedisContext(ip, port, hostsocket);
- if (c == NULL) {
- freeRedisConfig(cfg);
- exit(1);
- }
- redisAppendCommand(c, "CONFIG GET %s", "save");
- redisAppendCommand(c, "CONFIG GET %s", "appendonly");
- int abort_test = 0;
- int i = 0;
- void *r = NULL;
- for (; i < 2; i++) {
- int res = redisGetReply(c, &r);
- if (reply) freeReplyObject(reply);
- reply = res == REDIS_OK ? ((redisReply *) r) : NULL;
- if (res != REDIS_OK || !r) goto fail;
- if (reply->type == REDIS_REPLY_ERROR) {
- goto fail;
- }
- if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail;
- sub_reply = reply->element[1];
- char *value = sub_reply->str;
- if (!value) value = "";
- switch (i) {
- case 0: cfg->save = sdsnew(value); break;
- case 1: cfg->appendonly = sdsnew(value); break;
- }
- }
- freeReplyObject(reply);
- redisFree(c);
- return cfg;
-fail:
- if (reply && reply->type == REDIS_REPLY_ERROR &&
- !strncmp(reply->str,"NOAUTH",6)) {
- if (hostsocket == NULL)
- fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
- else
- fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
- abort_test = 1;
- }
- freeReplyObject(reply);
- redisFree(c);
- freeRedisConfig(cfg);
- if (abort_test) exit(1);
- return NULL;
-}
-static void freeRedisConfig(redisConfig *cfg) {
- if (cfg->save) sdsfree(cfg->save);
- if (cfg->appendonly) sdsfree(cfg->appendonly);
- zfree(cfg);
-}
-
-static void freeClient(client c) {
- aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
- listNode *ln;
- aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
- aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
- if (c->thread_id >= 0) {
- int requests_finished = 0;
- atomicGet(config.requests_finished, requests_finished);
- if (requests_finished >= config.requests) {
- aeStop(el);
- }
- }
- redisFree(c->context);
- sdsfree(c->obuf);
- zfree(c->randptr);
- zfree(c->stagptr);
- zfree(c);
- if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
- config.liveclients--;
- ln = listSearchKey(config.clients,c);
- assert(ln != NULL);
- listDelNode(config.clients,ln);
- if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex));
-}
-
-static void freeAllClients(void) {
- listNode *ln = config.clients->head, *next;
-
- while(ln) {
- next = ln->next;
- freeClient(ln->value);
- ln = next;
- }
-}
-
-static void resetClient(client c) {
- aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
- aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
- aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
- aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
- c->written = 0;
- c->pending = config.pipeline;
-}
-
-static void randomizeClientKey(client c) {
- size_t i;
-
- for (i = 0; i < c->randlen; i++) {
- char *p = c->randptr[i]+11;
- size_t r = 0;
- if (config.randomkeys_keyspacelen != 0)
- r = random() % config.randomkeys_keyspacelen;
- size_t j;
-
- for (j = 0; j < 12; j++) {
- *p = '0'+r%10;
- r/=10;
- p--;
- }
- }
-}
-
-static void setClusterKeyHashTag(client c) {
- assert(c->thread_id >= 0);
- clusterNode *node = c->cluster_node;
- assert(node);
- int is_updating_slots = 0;
- atomicGet(config.is_updating_slots, is_updating_slots);
- /* If updateClusterSlotsConfiguration is updating the slots array,
- * call updateClusterSlotsConfiguration is order to block the thread
- * since the mutex is locked. When the slots will be updated by the
- * thread that's actually performing the update, the execution of
- * updateClusterSlotsConfiguration won't actually do anything, since
- * the updated_slots_count array will be already NULL. */
- if (is_updating_slots) updateClusterSlotsConfiguration();
- int slot = node->slots[rand() % node->slots_count];
- const char *tag = crc16_slot_table[slot];
- int taglen = strlen(tag);
- size_t i;
- for (i = 0; i < c->staglen; i++) {
- char *p = c->stagptr[i] + 1;
- p[0] = tag[0];
- p[1] = (taglen >= 2 ? tag[1] : '}');
- p[2] = (taglen == 3 ? tag[2] : '}');
- }
-}
-
-static void clientDone(client c) {
- int requests_finished = 0;
- atomicGet(config.requests_finished, requests_finished);
- if (requests_finished >= config.requests) {
- freeClient(c);
- if (!config.num_threads && config.el) aeStop(config.el);
- return;
- }
- if (config.keepalive) {
- resetClient(c);
- } else {
- if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
- config.liveclients--;
- createMissingClients(c);
- config.liveclients++;
- if (config.num_threads)
- pthread_mutex_unlock(&(config.liveclients_mutex));
- freeClient(c);
- }
-}
-
-REDIS_NO_SANITIZE_MSAN("memory")
-static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- client c = privdata;
- void *reply = NULL;
- UNUSED(el);
- UNUSED(fd);
- UNUSED(mask);
-
- /* Calculate latency only for the first read event. This means that the
- * server already sent the reply and we need to parse it. Parsing overhead
- * is not part of the latency, so calculate it only once, here. */
- if (c->latency < 0) c->latency = ustime()-(c->start);
-
- if (redisBufferRead(c->context) != REDIS_OK) {
- fprintf(stderr,"Error: %s\n",c->context->errstr);
- exit(1);
- } else {
- while(c->pending) {
- if (redisGetReply(c->context,&reply) != REDIS_OK) {
- fprintf(stderr,"Error: %s\n",c->context->errstr);
- exit(1);
- }
- if (reply != NULL) {
- if (reply == (void*)REDIS_REPLY_ERROR) {
- fprintf(stderr,"Unexpected error reply, exiting...\n");
- exit(1);
- }
- redisReply *r = reply;
- if (r->type == REDIS_REPLY_ERROR) {
- /* Try to update slots configuration if reply error is
- * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
- * contain(s) the slot hash tag.
- * If the error is not topology-update related then we
- * immediately exit to avoid false results. */
- if (c->cluster_node && c->staglen) {
- int fetch_slots = 0, do_wait = 0;
- if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
- fetch_slots = 1;
- else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
- /* Usually the cluster is able to recover itself after
- * a CLUSTERDOWN error, so try to sleep one second
- * before requesting the new configuration. */
- fetch_slots = 1;
- do_wait = 1;
- fprintf(stderr, "Error from server %s:%d: %s.\n",
- c->cluster_node->ip,
- c->cluster_node->port,
- r->str);
- }
- if (do_wait) sleep(1);
- if (fetch_slots && !fetchClusterSlotsConfiguration(c))
- exit(1);
- } else {
- if (c->cluster_node) {
- fprintf(stderr, "Error from server %s:%d: %s\n",
- c->cluster_node->ip,
- c->cluster_node->port,
- r->str);
- } else fprintf(stderr, "Error from server: %s\n", r->str);
- exit(1);
- }
- }
-
- freeReplyObject(reply);
- /* This is an OK for prefix commands such as auth and select.*/
- if (c->prefix_pending > 0) {
- c->prefix_pending--;
- c->pending--;
- /* Discard prefix commands on first response.*/
- if (c->prefixlen > 0) {
- size_t j;
- sdsrange(c->obuf, c->prefixlen, -1);
- /* We also need to fix the pointers to the strings
- * we need to randomize. */
- for (j = 0; j < c->randlen; j++)
- c->randptr[j] -= c->prefixlen;
- /* Fix the pointers to the slot hash tags */
- for (j = 0; j < c->staglen; j++)
- c->stagptr[j] -= c->prefixlen;
- c->prefixlen = 0;
- }
- continue;
- }
- int requests_finished = 0;
- atomicGetIncr(config.requests_finished, requests_finished, 1);
- if (requests_finished < config.requests){
- if (config.num_threads == 0) {
- hdr_record_value(
- config.latency_histogram, // Histogram to record to
- (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record
- hdr_record_value(
- config.current_sec_latency_histogram, // Histogram to record to
- (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record
- } else {
- hdr_record_value_atomic(
- config.latency_histogram, // Histogram to record to
- (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record
- hdr_record_value_atomic(
- config.current_sec_latency_histogram, // Histogram to record to
- (long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record
- }
- }
- c->pending--;
- if (c->pending == 0) {
- clientDone(c);
- break;
- }
- } else {
- break;
- }
- }
- }
-}
-
-static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- client c = privdata;
- UNUSED(el);
- UNUSED(fd);
- UNUSED(mask);
-
- /* Initialize request when nothing was written. */
- if (c->written == 0) {
- /* Enforce upper bound to number of requests. */
- int requests_issued = 0;
- atomicGetIncr(config.requests_issued, requests_issued, config.pipeline);
- if (requests_issued >= config.requests) {
- return;
- }
-
- /* Really initialize: randomize keys and set start time. */
- if (config.randomkeys) randomizeClientKey(c);
- if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
- atomicGet(config.slots_last_update, c->slots_last_update);
- c->start = ustime();
- c->latency = -1;
- }
- const ssize_t buflen = sdslen(c->obuf);
- const ssize_t writeLen = buflen-c->written;
- if (writeLen > 0) {
- void *ptr = c->obuf+c->written;
- while(1) {
- /* Optimistically try to write before checking if the file descriptor
- * is actually writable. At worst we get EAGAIN. */
- const ssize_t nwritten = cliWriteConn(c->context,ptr,writeLen);
- if (nwritten != writeLen) {
- if (nwritten == -1 && errno != EAGAIN) {
- if (errno != EPIPE)
- fprintf(stderr, "Error writing to the server: %s\n", strerror(errno));
- freeClient(c);
- return;
- } else if (nwritten > 0) {
- c->written += nwritten;
- return;
- }
- } else {
- aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
- aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c);
- return;
- }
- }
- }
-}
-
-/* Create a benchmark client, configured to send the command passed as 'cmd' of
- * 'len' bytes.
- *
- * The command is copied N times in the client output buffer (that is reused
- * again and again to send the request to the server) accordingly to the configured
- * pipeline size.
- *
- * Also an initial SELECT command is prepended in order to make sure the right
- * database is selected, if needed. The initial SELECT will be discarded as soon
- * as the first reply is received.
- *
- * To create a client from scratch, the 'from' pointer is set to NULL. If instead
- * we want to create a client using another client as reference, the 'from' pointer
- * points to the client to use as reference. In such a case the following
- * information is take from the 'from' client:
- *
- * 1) The command line to use.
- * 2) The offsets of the __rand_int__ elements inside the command line, used
- * for arguments randomization.
- *
- * Even when cloning another client, prefix commands are applied if needed.*/
-static client createClient(char *cmd, size_t len, client from, int thread_id) {
- int j;
- int is_cluster_client = (config.cluster_mode && thread_id >= 0);
- client c = zmalloc(sizeof(struct _client));
-
- const char *ip = NULL;
- int port = 0;
- c->cluster_node = NULL;
- if (config.hostsocket == NULL || is_cluster_client) {
- if (!is_cluster_client) {
- ip = config.conn_info.hostip;
- port = config.conn_info.hostport;
- } else {
- int node_idx = 0;
- if (config.num_threads < config.cluster_node_count)
- node_idx = config.liveclients % config.cluster_node_count;
- else
- node_idx = thread_id % config.cluster_node_count;
- clusterNode *node = config.cluster_nodes[node_idx];
- assert(node != NULL);
- ip = (const char *) node->ip;
- port = node->port;
- c->cluster_node = node;
- }
- c->context = redisConnectNonBlock(ip,port);
- } else {
- c->context = redisConnectUnixNonBlock(config.hostsocket);
- }
- if (c->context->err) {
- fprintf(stderr,"Could not connect to Redis at ");
- if (config.hostsocket == NULL || is_cluster_client)
- fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr);
- else
- fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
- exit(1);
- }
- if (config.tls==1) {
- const char *err = NULL;
- if (cliSecureConnection(c->context, config.sslconfig, &err) == REDIS_ERR && err) {
- fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
- exit(1);
- }
- }
- c->thread_id = thread_id;
- /* Suppress hiredis cleanup of unused buffers for max speed. */
- c->context->reader->maxbuf = 0;
-
- /* Build the request buffer:
- * Queue N requests accordingly to the pipeline size, or simply clone
- * the example client buffer. */
- c->obuf = sdsempty();
- /* Prefix the request buffer with AUTH and/or SELECT commands, if applicable.
- * These commands are discarded after the first response, so if the client is
- * reused the commands will not be used again. */
- c->prefix_pending = 0;
- if (config.conn_info.auth) {
- char *buf = NULL;
- int len;
- if (config.conn_info.user == NULL)
- len = redisFormatCommand(&buf, "AUTH %s", config.conn_info.auth);
- else
- len = redisFormatCommand(&buf, "AUTH %s %s",
- config.conn_info.user, config.conn_info.auth);
- c->obuf = sdscatlen(c->obuf, buf, len);
- free(buf);
- c->prefix_pending++;
- }
-
- if (config.enable_tracking) {
- char *buf = NULL;
- int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
- c->obuf = sdscatlen(c->obuf, buf, len);
- free(buf);
- c->prefix_pending++;
- }
-
- /* If a DB number different than zero is selected, prefix our request
- * buffer with the SELECT command, that will be discarded the first
- * time the replies are received, so if the client is reused the
- * SELECT command will not be used again. */
- if (config.conn_info.input_dbnum != 0 && !is_cluster_client) {
- c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
- (int)sdslen(config.input_dbnumstr),config.input_dbnumstr);
- c->prefix_pending++;
- }
-
- if (config.resp3) {
- char *buf = NULL;
- int len = redisFormatCommand(&buf, "HELLO 3");
- c->obuf = sdscatlen(c->obuf, buf, len);
- free(buf);
- c->prefix_pending++;
- }
-
- c->prefixlen = sdslen(c->obuf);
- /* Append the request itself. */
- if (from) {
- c->obuf = sdscatlen(c->obuf,
- from->obuf+from->prefixlen,
- sdslen(from->obuf)-from->prefixlen);
- } else {
- for (j = 0; j < config.pipeline; j++)
- c->obuf = sdscatlen(c->obuf,cmd,len);
- }
-
- c->written = 0;
- c->pending = config.pipeline+c->prefix_pending;
- c->randptr = NULL;
- c->randlen = 0;
- c->stagptr = NULL;
- c->staglen = 0;
-
- /* Find substrings in the output buffer that need to be randomized. */
- if (config.randomkeys) {
- if (from) {
- c->randlen = from->randlen;
- c->randfree = 0;
- c->randptr = zmalloc(sizeof(char*)*c->randlen);
- /* copy the offsets. */
- for (j = 0; j < (int)c->randlen; j++) {
- c->randptr[j] = c->obuf + (from->randptr[j]-from->obuf);
- /* Adjust for the different select prefix length. */
- c->randptr[j] += c->prefixlen - from->prefixlen;
- }
- } else {
- char *p = c->obuf;
-
- c->randlen = 0;
- c->randfree = RANDPTR_INITIAL_SIZE;
- c->randptr = zmalloc(sizeof(char*)*c->randfree);
- while ((p = strstr(p,"__rand_int__")) != NULL) {
- if (c->randfree == 0) {
- c->randptr = zrealloc(c->randptr,sizeof(char*)*c->randlen*2);
- c->randfree += c->randlen;
- }
- c->randptr[c->randlen++] = p;
- c->randfree--;
- p += 12; /* 12 is strlen("__rand_int__). */
- }
- }
- }
- /* If cluster mode is enabled, set slot hashtags pointers. */
- if (config.cluster_mode) {
- if (from) {
- c->staglen = from->staglen;
- c->stagfree = 0;
- c->stagptr = zmalloc(sizeof(char*)*c->staglen);
- /* copy the offsets. */
- for (j = 0; j < (int)c->staglen; j++) {
- c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf);
- /* Adjust for the different select prefix length. */
- c->stagptr[j] += c->prefixlen - from->prefixlen;
- }
- } else {
- char *p = c->obuf;
-
- c->staglen = 0;
- c->stagfree = RANDPTR_INITIAL_SIZE;
- c->stagptr = zmalloc(sizeof(char*)*c->stagfree);
- while ((p = strstr(p,"{tag}")) != NULL) {
- if (c->stagfree == 0) {
- c->stagptr = zrealloc(c->stagptr,
- sizeof(char*) * c->staglen*2);
- c->stagfree += c->staglen;
- }
- c->stagptr[c->staglen++] = p;
- c->stagfree--;
- p += 5; /* 5 is strlen("{tag}"). */
- }
- }
- }
- aeEventLoop *el = NULL;
- if (thread_id < 0) el = config.el;
- else {
- benchmarkThread *thread = config.threads[thread_id];
- el = thread->el;
- }
- if (config.idlemode == 0)
- aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
- else
- /* In idle mode, clients still need to register readHandler for catching errors */
- aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c);
-
- listAddNodeTail(config.clients,c);
- atomicIncr(config.liveclients, 1);
- atomicGet(config.slots_last_update, c->slots_last_update);
- return c;
-}
-
-static void createMissingClients(client c) {
- int n = 0;
- while(config.liveclients < config.numclients) {
- int thread_id = -1;
- if (config.num_threads)
- thread_id = config.liveclients % config.num_threads;
- createClient(NULL,0,c,thread_id);
-
- /* Listen backlog is quite limited on most systems */
- if (++n > 64) {
- usleep(50000);
- n = 0;
- }
- }
-}
-
-static void showLatencyReport(void) {
-
- const float reqpersec = (float)config.requests_finished/((float)config.totlatency/1000.0f);
- const float p0 = ((float) hdr_min(config.latency_histogram))/1000.0f;
- const float p50 = hdr_value_at_percentile(config.latency_histogram, 50.0 )/1000.0f;
- const float p95 = hdr_value_at_percentile(config.latency_histogram, 95.0 )/1000.0f;
- const float p99 = hdr_value_at_percentile(config.latency_histogram, 99.0 )/1000.0f;
- const float p100 = ((float) hdr_max(config.latency_histogram))/1000.0f;
- const float avg = hdr_mean(config.latency_histogram)/1000.0f;
-
- if (!config.quiet && !config.csv) {
- printf("%*s\r", config.last_printed_bytes, " "); // ensure there is a clean line
- printf("====== %s ======\n", config.title);
- printf(" %d requests completed in %.2f seconds\n", config.requests_finished,
- (float)config.totlatency/1000);
- printf(" %d parallel clients\n", config.numclients);
- printf(" %d bytes payload\n", config.datasize);
- printf(" keep alive: %d\n", config.keepalive);
- if (config.cluster_mode) {
- printf(" cluster mode: yes (%d masters)\n",
- config.cluster_node_count);
- int m ;
- for (m = 0; m < config.cluster_node_count; m++) {
- clusterNode *node = config.cluster_nodes[m];
- redisConfig *cfg = node->redis_config;
- if (cfg == NULL) continue;
- printf(" node [%d] configuration:\n",m );
- printf(" save: %s\n",
- sdslen(cfg->save) ? cfg->save : "NONE");
- printf(" appendonly: %s\n", cfg->appendonly);
- }
- } else {
- if (config.redis_config) {
- printf(" host configuration \"save\": %s\n",
- config.redis_config->save);
- printf(" host configuration \"appendonly\": %s\n",
- config.redis_config->appendonly);
- }
- }
- printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no"));
- if (config.num_threads)
- printf(" threads: %d\n", config.num_threads);
-
- printf("\n");
- printf("Latency by percentile distribution:\n");
- struct hdr_iter iter;
- long long previous_cumulative_count = -1;
- const long long total_count = config.latency_histogram->total_count;
- hdr_iter_percentile_init(&iter, config.latency_histogram, 1);
- struct hdr_iter_percentiles *percentiles = &iter.specifics.percentiles;
- while (hdr_iter_next(&iter))
- {
- const double value = iter.highest_equivalent_value / 1000.0f;
- const double percentile = percentiles->percentile;
- const long long cumulative_count = iter.cumulative_count;
- if( previous_cumulative_count != cumulative_count || cumulative_count == total_count ){
- printf("%3.3f%% <= %.3f milliseconds (cumulative count %lld)\n", percentile, value, cumulative_count);
- }
- previous_cumulative_count = cumulative_count;
- }
- printf("\n");
- printf("Cumulative distribution of latencies:\n");
- previous_cumulative_count = -1;
- hdr_iter_linear_init(&iter, config.latency_histogram, 100);
- while (hdr_iter_next(&iter))
- {
- const double value = iter.highest_equivalent_value / 1000.0f;
- const long long cumulative_count = iter.cumulative_count;
- const double percentile = ((double)cumulative_count/(double)total_count)*100.0;
- if( previous_cumulative_count != cumulative_count || cumulative_count == total_count ){
- printf("%3.3f%% <= %.3f milliseconds (cumulative count %lld)\n", percentile, value, cumulative_count);
- }
- /* After the 2 milliseconds latency to have percentages split
- * by decimals will just add a lot of noise to the output. */
- if(iter.highest_equivalent_value > 2000){
- hdr_iter_linear_set_value_units_per_bucket(&iter,1000);
- }
- previous_cumulative_count = cumulative_count;
- }
- printf("\n");
- printf("Summary:\n");
- printf(" throughput summary: %.2f requests per second\n", reqpersec);
- printf(" latency summary (msec):\n");
- printf(" %9s %9s %9s %9s %9s %9s\n", "avg", "min", "p50", "p95", "p99", "max");
- printf(" %9.3f %9.3f %9.3f %9.3f %9.3f %9.3f\n", avg, p0, p50, p95, p99, p100);
- } else if (config.csv) {
- printf("\"%s\",\"%.2f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\"\n", config.title, reqpersec, avg, p0, p50, p95, p99, p100);
- } else {
- printf("%*s\r", config.last_printed_bytes, " "); // ensure there is a clean line
- printf("%s: %.2f requests per second, p50=%.3f msec\n", config.title, reqpersec, p50);
- }
-}
-
-static void initBenchmarkThreads(void) {
- int i;
- if (config.threads) freeBenchmarkThreads();
- config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
- for (i = 0; i < config.num_threads; i++) {
- benchmarkThread *thread = createBenchmarkThread(i);
- config.threads[i] = thread;
- }
-}
-
-static void startBenchmarkThreads(void) {
- int i;
- for (i = 0; i < config.num_threads; i++) {
- benchmarkThread *t = config.threads[i];
- if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
- fprintf(stderr, "FATAL: Failed to start thread %d.\n", i);
- exit(1);
- }
- }
- for (i = 0; i < config.num_threads; i++)
- pthread_join(config.threads[i]->thread, NULL);
-}
-
-static void benchmark(const char *title, char *cmd, int len) {
- client c;
-
- config.title = title;
- config.requests_issued = 0;
- config.requests_finished = 0;
- config.previous_requests_finished = 0;
- config.last_printed_bytes = 0;
- hdr_init(
- CONFIG_LATENCY_HISTOGRAM_MIN_VALUE, // Minimum value
- CONFIG_LATENCY_HISTOGRAM_MAX_VALUE, // Maximum value
- config.precision, // Number of significant figures
- &config.latency_histogram); // Pointer to initialise
- hdr_init(
- CONFIG_LATENCY_HISTOGRAM_MIN_VALUE, // Minimum value
- CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE, // Maximum value
- config.precision, // Number of significant figures
- &config.current_sec_latency_histogram); // Pointer to initialise
-
- if (config.num_threads) initBenchmarkThreads();
-
- int thread_id = config.num_threads > 0 ? 0 : -1;
- c = createClient(cmd,len,NULL,thread_id);
- createMissingClients(c);
-
- config.start = mstime();
- if (!config.num_threads) aeMain(config.el);
- else startBenchmarkThreads();
- config.totlatency = mstime()-config.start;
-
- showLatencyReport();
- freeAllClients();
- if (config.threads) freeBenchmarkThreads();
- if (config.current_sec_latency_histogram) hdr_close(config.current_sec_latency_histogram);
- if (config.latency_histogram) hdr_close(config.latency_histogram);
-
-}
-
-/* Thread functions. */
-
-static benchmarkThread *createBenchmarkThread(int index) {
- benchmarkThread *thread = zmalloc(sizeof(*thread));
- if (thread == NULL) return NULL;
- thread->index = index;
- thread->el = aeCreateEventLoop(1024*10);
- aeCreateTimeEvent(thread->el,1,showThroughput,(void *)thread,NULL);
- return thread;
-}
-
-static void freeBenchmarkThread(benchmarkThread *thread) {
- if (thread->el) aeDeleteEventLoop(thread->el);
- zfree(thread);
-}
-
-static void freeBenchmarkThreads(void) {
- int i = 0;
- for (; i < config.num_threads; i++) {
- benchmarkThread *thread = config.threads[i];
- if (thread) freeBenchmarkThread(thread);
- }
- zfree(config.threads);
- config.threads = NULL;
-}
-
-static void *execBenchmarkThread(void *ptr) {
- benchmarkThread *thread = (benchmarkThread *) ptr;
- aeMain(thread->el);
- return NULL;
-}
-
-/* Cluster helper functions. */
-
-static clusterNode *createClusterNode(char *ip, int port) {
- clusterNode *node = zmalloc(sizeof(*node));
- if (!node) return NULL;
- node->ip = ip;
- node->port = port;
- node->name = NULL;
- node->flags = 0;
- node->replicate = NULL;
- node->replicas_count = 0;
- node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int));
- node->slots_count = 0;
- node->updated_slots = NULL;
- node->updated_slots_count = 0;
- node->migrating = NULL;
- node->importing = NULL;
- node->migrating_count = 0;
- node->importing_count = 0;
- node->redis_config = NULL;
- return node;
-}
-
-static void freeClusterNode(clusterNode *node) {
- int i;
- if (node->name) sdsfree(node->name);
- if (node->replicate) sdsfree(node->replicate);
- if (node->migrating != NULL) {
- for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
- zfree(node->migrating);
- }
- if (node->importing != NULL) {
- for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
- zfree(node->importing);
- }
- /* If the node is not the reference node, that uses the address from
- * config.conn_info.hostip and config.conn_info.hostport, then the node ip has been
- * allocated by fetchClusterConfiguration, so it must be freed. */
- if (node->ip && strcmp(node->ip, config.conn_info.hostip) != 0) sdsfree(node->ip);
- if (node->redis_config != NULL) freeRedisConfig(node->redis_config);
- zfree(node->slots);
- zfree(node);
-}
-
-static void freeClusterNodes(void) {
- int i = 0;
- for (; i < config.cluster_node_count; i++) {
- clusterNode *n = config.cluster_nodes[i];
- if (n) freeClusterNode(n);
- }
- zfree(config.cluster_nodes);
- config.cluster_nodes = NULL;
-}
-
-static clusterNode **addClusterNode(clusterNode *node) {
- int count = config.cluster_node_count + 1;
- config.cluster_nodes = zrealloc(config.cluster_nodes,
- count * sizeof(*node));
- if (!config.cluster_nodes) return NULL;
- config.cluster_nodes[config.cluster_node_count++] = node;
- return config.cluster_nodes;
-}
-
-/* TODO: This should be refactored to use CLUSTER SLOTS, the migrating/importing
- * information is anyway not used.
- */
-static int fetchClusterConfiguration(void) {
- int success = 1;
- redisContext *ctx = NULL;
- redisReply *reply = NULL;
- ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
- if (ctx == NULL) {
- exit(1);
- }
- clusterNode *firstNode = createClusterNode((char *) config.conn_info.hostip,
- config.conn_info.hostport);
- if (!firstNode) {success = 0; goto cleanup;}
- reply = redisCommand(ctx, "CLUSTER NODES");
- success = (reply != NULL);
- if (!success) goto cleanup;
- success = (reply->type != REDIS_REPLY_ERROR);
- if (!success) {
- if (config.hostsocket == NULL) {
- fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n",
- config.conn_info.hostip, config.conn_info.hostport, reply->str);
- } else {
- fprintf(stderr, "Cluster node %s replied with error:\n%s\n",
- config.hostsocket, reply->str);
- }
- goto cleanup;
- }
- char *lines = reply->str, *p, *line;
- while ((p = strstr(lines, "\n")) != NULL) {
- *p = '\0';
- line = lines;
- lines = p + 1;
- char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL;
- int i = 0;
- while ((p = strchr(line, ' ')) != NULL) {
- *p = '\0';
- char *token = line;
- line = p + 1;
- switch(i++){
- case 0: name = token; break;
- case 1: addr = token; break;
- case 2: flags = token; break;
- case 3: master_id = token; break;
- }
- if (i == 8) break; // Slots
- }
- if (!flags) {
- fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n");
- success = 0;
- goto cleanup;
- }
- int myself = (strstr(flags, "myself") != NULL);
- int is_replica = (strstr(flags, "slave") != NULL ||
- (master_id != NULL && master_id[0] != '-'));
- if (is_replica) continue;
- if (addr == NULL) {
- fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n");
- success = 0;
- goto cleanup;
- }
- clusterNode *node = NULL;
- char *ip = NULL;
- int port = 0;
- char *paddr = strrchr(addr, ':');
- if (paddr != NULL) {
- *paddr = '\0';
- ip = addr;
- addr = paddr + 1;
- /* If internal bus is specified, then just drop it. */
- if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
- port = atoi(addr);
- }
- if (myself) {
- node = firstNode;
- if (ip != NULL && strcmp(node->ip, ip) != 0) {
- node->ip = sdsnew(ip);
- node->port = port;
- }
- } else {
- node = createClusterNode(sdsnew(ip), port);
- }
- if (node == NULL) {
- success = 0;
- goto cleanup;
- }
- if (name != NULL) node->name = sdsnew(name);
- if (i == 8) {
- int remaining = strlen(line);
- while (remaining > 0) {
- p = strchr(line, ' ');
- if (p == NULL) p = line + remaining;
- remaining -= (p - line);
-
- char *slotsdef = line;
- *p = '\0';
- if (remaining) {
- line = p + 1;
- remaining--;
- } else line = p;
- char *dash = NULL;
- if (slotsdef[0] == '[') {
- slotsdef++;
- if ((p = strstr(slotsdef, "->-"))) { // Migrating
- *p = '\0';
- p += 3;
- char *closing_bracket = strchr(p, ']');
- if (closing_bracket) *closing_bracket = '\0';
- sds slot = sdsnew(slotsdef);
- sds dst = sdsnew(p);
- node->migrating_count += 2;
- node->migrating =
- zrealloc(node->migrating,
- (node->migrating_count * sizeof(sds)));
- node->migrating[node->migrating_count - 2] =
- slot;
- node->migrating[node->migrating_count - 1] =
- dst;
- } else if ((p = strstr(slotsdef, "-<-"))) {//Importing
- *p = '\0';
- p += 3;
- char *closing_bracket = strchr(p, ']');
- if (closing_bracket) *closing_bracket = '\0';
- sds slot = sdsnew(slotsdef);
- sds src = sdsnew(p);
- node->importing_count += 2;
- node->importing = zrealloc(node->importing,
- (node->importing_count * sizeof(sds)));
- node->importing[node->importing_count - 2] =
- slot;
- node->importing[node->importing_count - 1] =
- src;
- }
- } else if ((dash = strchr(slotsdef, '-')) != NULL) {
- p = dash;
- int start, stop;
- *p = '\0';
- start = atoi(slotsdef);
- stop = atoi(p + 1);
- while (start <= stop) {
- int slot = start++;
- node->slots[node->slots_count++] = slot;
- }
- } else if (p > slotsdef) {
- int slot = atoi(slotsdef);
- node->slots[node->slots_count++] = slot;
- }
- }
- }
- if (node->slots_count == 0) {
- fprintf(stderr,
- "WARNING: Master node %s:%d has no slots, skipping...\n",
- node->ip, node->port);
- continue;
- }
- if (!addClusterNode(node)) {
- success = 0;
- goto cleanup;
- }
- }
-cleanup:
- if (ctx) redisFree(ctx);
- if (!success) {
- if (config.cluster_nodes) freeClusterNodes();
- }
- if (reply) freeReplyObject(reply);
- return success;
-}
-
-/* Request the current cluster slots configuration by calling CLUSTER SLOTS
- * and atomically update the slots after a successful reply. */
-static int fetchClusterSlotsConfiguration(client c) {
- UNUSED(c);
- int success = 1, is_fetching_slots = 0, last_update = 0;
- size_t i;
- atomicGet(config.slots_last_update, last_update);
- if (c->slots_last_update < last_update) {
- c->slots_last_update = last_update;
- return -1;
- }
- redisReply *reply = NULL;
- atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
- if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
- atomicSet(config.is_fetching_slots, 1);
- fprintf(stderr,
- "WARNING: Cluster slots configuration changed, fetching new one...\n");
- const char *errmsg = "Failed to update cluster slots configuration";
- static dictType dtype = {
- dictSdsHash, /* hash function */
- NULL, /* key dup */
- NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
- NULL, /* key destructor */
- NULL, /* val destructor */
- NULL /* allow to expand */
- };
- /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */
- dict *masters = dictCreate(&dtype);
- redisContext *ctx = NULL;
- for (i = 0; i < (size_t) config.cluster_node_count; i++) {
- clusterNode *node = config.cluster_nodes[i];
- assert(node->ip != NULL);
- assert(node->name != NULL);
- assert(node->port);
- /* Use first node as entry point to connect to. */
- if (ctx == NULL) {
- ctx = getRedisContext(node->ip, node->port, NULL);
- if (!ctx) {
- success = 0;
- goto cleanup;
- }
- }
- if (node->updated_slots != NULL)
- zfree(node->updated_slots);
- node->updated_slots = NULL;
- node->updated_slots_count = 0;
- dictReplace(masters, node->name, node) ;
- }
- reply = redisCommand(ctx, "CLUSTER SLOTS");
- if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
- success = 0;
- if (reply)
- fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n",errmsg,reply->str);
- goto cleanup;
- }
- assert(reply->type == REDIS_REPLY_ARRAY);
- for (i = 0; i < reply->elements; i++) {
- redisReply *r = reply->element[i];
- assert(r->type == REDIS_REPLY_ARRAY);
- assert(r->elements >= 3);
- int from, to, slot;
- from = r->element[0]->integer;
- to = r->element[1]->integer;
- redisReply *nr = r->element[2];
- assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
- assert(nr->element[2]->str != NULL);
- sds name = sdsnew(nr->element[2]->str);
- dictEntry *entry = dictFind(masters, name);
- if (entry == NULL) {
- success = 0;
- fprintf(stderr, "%s: could not find node with ID %s in current "
- "configuration.\n", errmsg, name);
- if (name) sdsfree(name);
- goto cleanup;
- }
- sdsfree(name);
- clusterNode *node = dictGetVal(entry);
- if (node->updated_slots == NULL)
- node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int));
- for (slot = from; slot <= to; slot++)
- node->updated_slots[node->updated_slots_count++] = slot;
- }
- updateClusterSlotsConfiguration();
-cleanup:
- freeReplyObject(reply);
- redisFree(ctx);
- dictRelease(masters);
- atomicSet(config.is_fetching_slots, 0);
- return success;
-}
-
-/* Atomically update the new slots configuration. */
-static void updateClusterSlotsConfiguration(void) {
- pthread_mutex_lock(&config.is_updating_slots_mutex);
- atomicSet(config.is_updating_slots, 1);
- int i;
- for (i = 0; i < config.cluster_node_count; i++) {
- clusterNode *node = config.cluster_nodes[i];
- if (node->updated_slots != NULL) {
- int *oldslots = node->slots;
- node->slots = node->updated_slots;
- node->slots_count = node->updated_slots_count;
- node->updated_slots = NULL;
- node->updated_slots_count = 0;
- zfree(oldslots);
- }
- }
- atomicSet(config.is_updating_slots, 0);
- atomicIncr(config.slots_last_update, 1);
- pthread_mutex_unlock(&config.is_updating_slots_mutex);
-}
-
-/* Generate random data for redis benchmark. See #7196. */
-static void genBenchmarkRandomData(char *data, int count) {
- static uint32_t state = 1234;
- int i = 0;
-
- while (count--) {
- state = (state*1103515245+12345);
- data[i++] = '0'+((state>>16)&63);
- }
-}
-
-/* Returns number of consumed options. */
-int parseOptions(int argc, char **argv) {
- int i;
- int lastarg;
- int exit_status = 1;
- char *tls_usage;
-
- for (i = 1; i < argc; i++) {
- lastarg = (i == (argc-1));
-
- if (!strcmp(argv[i],"-c")) {
- if (lastarg) goto invalid;
- config.numclients = atoi(argv[++i]);
- } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
- sds version = cliVersion();
- printf("redis-benchmark %s\n", version);
- sdsfree(version);
- exit(0);
- } else if (!strcmp(argv[i],"-n")) {
- if (lastarg) goto invalid;
- config.requests = atoi(argv[++i]);
- } else if (!strcmp(argv[i],"-k")) {
- if (lastarg) goto invalid;
- config.keepalive = atoi(argv[++i]);
- } else if (!strcmp(argv[i],"-h")) {
- if (lastarg) goto invalid;
- sdsfree(config.conn_info.hostip);
- config.conn_info.hostip = sdsnew(argv[++i]);
- } else if (!strcmp(argv[i],"-p")) {
- if (lastarg) goto invalid;
- config.conn_info.hostport = atoi(argv[++i]);
- if (config.conn_info.hostport < 0 || config.conn_info.hostport > 65535) {
- fprintf(stderr, "Invalid server port.\n");
- exit(1);
- }
- } else if (!strcmp(argv[i],"-s")) {
- if (lastarg) goto invalid;
- config.hostsocket = strdup(argv[++i]);
- } else if (!strcmp(argv[i],"-x")) {
- config.stdinarg = 1;
- } else if (!strcmp(argv[i],"-a") ) {
- if (lastarg) goto invalid;
- config.conn_info.auth = sdsnew(argv[++i]);
- } else if (!strcmp(argv[i],"--user")) {
- if (lastarg) goto invalid;
- config.conn_info.user = sdsnew(argv[++i]);
- } else if (!strcmp(argv[i],"-u") && !lastarg) {
- parseRedisUri(argv[++i],"redis-benchmark",&config.conn_info,&config.tls);
- if (config.conn_info.hostport < 0 || config.conn_info.hostport > 65535) {
- fprintf(stderr, "Invalid server port.\n");
- exit(1);
- }
- config.input_dbnumstr = sdsfromlonglong(config.conn_info.input_dbnum);
- } else if (!strcmp(argv[i],"-3")) {
- config.resp3 = 1;
- } else if (!strcmp(argv[i],"-d")) {
- if (lastarg) goto invalid;
- config.datasize = atoi(argv[++i]);
- if (config.datasize < 1) config.datasize=1;
- if (config.datasize > 1024*1024*1024) config.datasize = 1024*1024*1024;
- } else if (!strcmp(argv[i],"-P")) {
- if (lastarg) goto invalid;
- config.pipeline = atoi(argv[++i]);
- if (config.pipeline <= 0) config.pipeline=1;
- } else if (!strcmp(argv[i],"-r")) {
- if (lastarg) goto invalid;
- const char *next = argv[++i], *p = next;
- if (*p == '-') {
- p++;
- if (*p < '0' || *p > '9') goto invalid;
- }
- config.randomkeys = 1;
- config.randomkeys_keyspacelen = atoi(next);
- if (config.randomkeys_keyspacelen < 0)
- config.randomkeys_keyspacelen = 0;
- } else if (!strcmp(argv[i],"-q")) {
- config.quiet = 1;
- } else if (!strcmp(argv[i],"--csv")) {
- config.csv = 1;
- } else if (!strcmp(argv[i],"-l")) {
- config.loop = 1;
- } else if (!strcmp(argv[i],"-I")) {
- config.idlemode = 1;
- } else if (!strcmp(argv[i],"-e")) {
- fprintf(stderr,
- "WARNING: -e option has no effect. "
- "We now immediately exit on error to avoid false results.\n");
- } else if (!strcmp(argv[i],"--seed")) {
- if (lastarg) goto invalid;
- int rand_seed = atoi(argv[++i]);
- srandom(rand_seed);
- init_genrand64(rand_seed);
- } else if (!strcmp(argv[i],"-t")) {
- if (lastarg) goto invalid;
- /* We get the list of tests to run as a string in the form
- * get,set,lrange,...,test_N. Then we add a comma before and
- * after the string in order to make sure that searching
- * for ",testname," will always get a match if the test is
- * enabled. */
- config.tests = sdsnew(",");
- config.tests = sdscat(config.tests,(char*)argv[++i]);
- config.tests = sdscat(config.tests,",");
- sdstolower(config.tests);
- } else if (!strcmp(argv[i],"--dbnum")) {
- if (lastarg) goto invalid;
- config.conn_info.input_dbnum = atoi(argv[++i]);
- config.input_dbnumstr = sdsfromlonglong(config.conn_info.input_dbnum);
- } else if (!strcmp(argv[i],"--precision")) {
- if (lastarg) goto invalid;
- config.precision = atoi(argv[++i]);
- if (config.precision < 0) config.precision = DEFAULT_LATENCY_PRECISION;
- if (config.precision > MAX_LATENCY_PRECISION) config.precision = MAX_LATENCY_PRECISION;
- } else if (!strcmp(argv[i],"--threads")) {
- if (lastarg) goto invalid;
- config.num_threads = atoi(argv[++i]);
- if (config.num_threads > MAX_THREADS) {
- fprintf(stderr,
- "WARNING: Too many threads, limiting threads to %d.\n",
- MAX_THREADS);
- config.num_threads = MAX_THREADS;
- } else if (config.num_threads < 0) config.num_threads = 0;
- } else if (!strcmp(argv[i],"--cluster")) {
- config.cluster_mode = 1;
- } else if (!strcmp(argv[i],"--enable-tracking")) {
- config.enable_tracking = 1;
- } else if (!strcmp(argv[i],"--help")) {
- exit_status = 0;
- goto usage;
- #ifdef USE_OPENSSL
- } else if (!strcmp(argv[i],"--tls")) {
- config.tls = 1;
- } else if (!strcmp(argv[i],"--sni")) {
- if (lastarg) goto invalid;
- config.sslconfig.sni = strdup(argv[++i]);
- } else if (!strcmp(argv[i],"--cacertdir")) {
- if (lastarg) goto invalid;
- config.sslconfig.cacertdir = strdup(argv[++i]);
- } else if (!strcmp(argv[i],"--cacert")) {
- if (lastarg) goto invalid;
- config.sslconfig.cacert = strdup(argv[++i]);
- } else if (!strcmp(argv[i],"--insecure")) {
- config.sslconfig.skip_cert_verify = 1;
- } else if (!strcmp(argv[i],"--cert")) {
- if (lastarg) goto invalid;
- config.sslconfig.cert = strdup(argv[++i]);
- } else if (!strcmp(argv[i],"--key")) {
- if (lastarg) goto invalid;
- config.sslconfig.key = strdup(argv[++i]);
- } else if (!strcmp(argv[i],"--tls-ciphers")) {
- if (lastarg) goto invalid;
- config.sslconfig.ciphers = strdup(argv[++i]);
- #ifdef TLS1_3_VERSION
- } else if (!strcmp(argv[i],"--tls-ciphersuites")) {
- if (lastarg) goto invalid;
- config.sslconfig.ciphersuites = strdup(argv[++i]);
- #endif
- #endif
- } else {
- /* Assume the user meant to provide an option when the arg starts
- * with a dash. We're done otherwise and should use the remainder
- * as the command and arguments for running the benchmark. */
- if (argv[i][0] == '-') goto invalid;
- return i;
- }
- }
-
- return i;
-
-invalid:
- printf("Invalid option \"%s\" or option argument missing\n\n",argv[i]);
-
-usage:
- tls_usage =
-#ifdef USE_OPENSSL
-" --tls Establish a secure TLS connection.\n"
-" --sni <host> Server name indication for TLS.\n"
-" --cacert <file> CA Certificate file to verify with.\n"
-" --cacertdir <dir> Directory where trusted CA certificates are stored.\n"
-" If neither cacert nor cacertdir are specified, the default\n"
-" system-wide trusted root certs configuration will apply.\n"
-" --insecure Allow insecure TLS connection by skipping cert validation.\n"
-" --cert <file> Client certificate to authenticate with.\n"
-" --key <file> Private key file to authenticate with.\n"
-" --tls-ciphers <list> Sets the list of preferred ciphers (TLSv1.2 and below)\n"
-" in order of preference from highest to lowest separated by colon (\":\").\n"
-" See the ciphers(1ssl) manpage for more information about the syntax of this string.\n"
-#ifdef TLS1_3_VERSION
-" --tls-ciphersuites <list> Sets the list of preferred ciphersuites (TLSv1.3)\n"
-" in order of preference from highest to lowest separated by colon (\":\").\n"
-" See the ciphers(1ssl) manpage for more information about the syntax of this string,\n"
-" and specifically for TLSv1.3 ciphersuites.\n"
-#endif
-#endif
-"";
-
- printf(
-"%s%s%s", /* Split to avoid strings longer than 4095 (-Woverlength-strings). */
-"Usage: redis-benchmark [OPTIONS] [COMMAND ARGS...]\n\n"
-"Options:\n"
-" -h <hostname> Server hostname (default 127.0.0.1)\n"
-" -p <port> Server port (default 6379)\n"
-" -s <socket> Server socket (overrides host and port)\n"
-" -a <password> Password for Redis Auth\n"
-" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
-" -u <uri> Server URI on format redis://user:password@host:port/dbnum\n"
-" User, password and dbnum are optional. For authentication\n"
-" without a username, use username 'default'. For TLS, use\n"
-" the scheme 'rediss'.\n"
-" -c <clients> Number of parallel connections (default 50).\n"
-" Note: If --cluster is used then number of clients has to be\n"
-" the same or higher than the number of nodes.\n"
-" -n <requests> Total number of requests (default 100000)\n"
-" -d <size> Data size of SET/GET value in bytes (default 3)\n"
-" --dbnum <db> SELECT the specified db number (default 0)\n"
-" -3 Start session in RESP3 protocol mode.\n"
-" --threads <num> Enable multi-thread mode.\n"
-" --cluster Enable cluster mode.\n"
-" If the command is supplied on the command line in cluster\n"
-" mode, the key must contain \"{tag}\". Otherwise, the\n"
-" command will not be sent to the right cluster node.\n"
-" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n"
-" -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
-" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD,\n"
-" random members and scores for ZADD.\n"
-" Using this option the benchmark will expand the string\n"
-" __rand_int__ inside an argument with a 12 digits number in\n"
-" the specified range from 0 to keyspacelen-1. The\n"
-" substitution changes every time a command is executed.\n"
-" Default tests use this to hit random keys in the specified\n"
-" range.\n"
-" Note: If -r is omitted, all commands in a benchmark will\n"
-" use the same key.\n"
-" -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n"
-" -q Quiet. Just show query/sec values\n"
-" --precision Number of decimal places to display in latency output (default 0)\n"
-" --csv Output in CSV format\n"
-" -l Loop. Run the tests forever\n"
-" -t <tests> Only run the comma separated list of tests. The test\n"
-" names are the same as the ones produced as output.\n"
-" The -t option is ignored if a specific command is supplied\n"
-" on the command line.\n"
-" -I Idle mode. Just open N idle connections and wait.\n"
-" -x Read last argument from STDIN.\n"
-" --seed <num> Set the seed for random number generator. Default seed is based on time.\n",
-tls_usage,
-" --help Output this help and exit.\n"
-" --version Output version and exit.\n\n"
-"Examples:\n\n"
-" Run the benchmark with the default configuration against 127.0.0.1:6379:\n"
-" $ redis-benchmark\n\n"
-" Use 20 parallel clients, for a total of 100k requests, against 192.168.1.1:\n"
-" $ redis-benchmark -h 192.168.1.1 -p 6379 -n 100000 -c 20\n\n"
-" Fill 127.0.0.1:6379 with about 1 million keys only using the SET test:\n"
-" $ redis-benchmark -t set -n 1000000 -r 100000000\n\n"
-" Benchmark 127.0.0.1:6379 for a few commands producing CSV output:\n"
-" $ redis-benchmark -t ping,set,get -n 100000 --csv\n\n"
-" Benchmark a specific command line:\n"
-" $ redis-benchmark -r 10000 -n 10000 eval 'return redis.call(\"ping\")' 0\n\n"
-" Fill a list with 10000 random elements:\n"
-" $ redis-benchmark -r 10000 -n 10000 lpush mylist __rand_int__\n\n"
-" On user specified command lines __rand_int__ is replaced with a random integer\n"
-" with a range of values selected by the -r option.\n"
- );
- exit(exit_status);
-}
-
-int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
- UNUSED(eventLoop);
- UNUSED(id);
- benchmarkThread *thread = (benchmarkThread *)clientData;
- int liveclients = 0;
- int requests_finished = 0;
- int previous_requests_finished = 0;
- long long current_tick = mstime();
- atomicGet(config.liveclients, liveclients);
- atomicGet(config.requests_finished, requests_finished);
- atomicGet(config.previous_requests_finished, previous_requests_finished);
-
- if (liveclients == 0 && requests_finished != config.requests) {
- fprintf(stderr,"All clients disconnected... aborting.\n");
- exit(1);
- }
- if (config.num_threads && requests_finished >= config.requests) {
- aeStop(eventLoop);
- return AE_NOMORE;
- }
- if (config.csv) return SHOW_THROUGHPUT_INTERVAL;
- /* only first thread output throughput */
- if (thread != NULL && thread->index != 0) {
- return SHOW_THROUGHPUT_INTERVAL;
- }
- if (config.idlemode == 1) {
- printf("clients: %d\r", config.liveclients);
- fflush(stdout);
- return SHOW_THROUGHPUT_INTERVAL;
- }
- const float dt = (float)(current_tick-config.start)/1000.0;
- const float rps = (float)requests_finished/dt;
- const float instantaneous_dt = (float)(current_tick-config.previous_tick)/1000.0;
- const float instantaneous_rps = (float)(requests_finished-previous_requests_finished)/instantaneous_dt;
- config.previous_tick = current_tick;
- atomicSet(config.previous_requests_finished,requests_finished);
- printf("%*s\r", config.last_printed_bytes, " "); /* ensure there is a clean line */
- int printed_bytes = printf("%s: rps=%.1f (overall: %.1f) avg_msec=%.3f (overall: %.3f)\r", config.title, instantaneous_rps, rps, hdr_mean(config.current_sec_latency_histogram)/1000.0f, hdr_mean(config.latency_histogram)/1000.0f);
- config.last_printed_bytes = printed_bytes;
- hdr_reset(config.current_sec_latency_histogram);
- fflush(stdout);
- return SHOW_THROUGHPUT_INTERVAL;
-}
-
-/* Return true if the named test was selected using the -t command line
- * switch, or if all the tests are selected (no -t passed by user). */
-int test_is_selected(const char *name) {
- char buf[256];
- int l = strlen(name);
-
- if (config.tests == NULL) return 1;
- buf[0] = ',';
- memcpy(buf+1,name,l);
- buf[l+1] = ',';
- buf[l+2] = '\0';
- return strstr(config.tests,buf) != NULL;
-}
-
-int main(int argc, char **argv) {
- int i;
- char *data, *cmd, *tag;
- int len;
-
- client c;
-
- srandom(time(NULL) ^ getpid());
- init_genrand64(ustime() ^ getpid());
- signal(SIGHUP, SIG_IGN);
- signal(SIGPIPE, SIG_IGN);
-
- memset(&config.sslconfig, 0, sizeof(config.sslconfig));
- config.numclients = 50;
- config.requests = 100000;
- config.liveclients = 0;
- config.el = aeCreateEventLoop(1024*10);
- aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
- config.keepalive = 1;
- config.datasize = 3;
- config.pipeline = 1;
- config.randomkeys = 0;
- config.randomkeys_keyspacelen = 0;
- config.quiet = 0;
- config.csv = 0;
- config.loop = 0;
- config.idlemode = 0;
- config.clients = listCreate();
- config.conn_info.hostip = sdsnew("127.0.0.1");
- config.conn_info.hostport = 6379;
- config.hostsocket = NULL;
- config.tests = NULL;
- config.conn_info.input_dbnum = 0;
- config.stdinarg = 0;
- config.conn_info.auth = NULL;
- config.precision = DEFAULT_LATENCY_PRECISION;
- config.num_threads = 0;
- config.threads = NULL;
- config.cluster_mode = 0;
- config.cluster_node_count = 0;
- config.cluster_nodes = NULL;
- config.redis_config = NULL;
- config.is_fetching_slots = 0;
- config.is_updating_slots = 0;
- config.slots_last_update = 0;
- config.enable_tracking = 0;
- config.resp3 = 0;
-
- i = parseOptions(argc,argv);
- argc -= i;
- argv += i;
-
- tag = "";
-
-#ifdef USE_OPENSSL
- if (config.tls) {
- cliSecureInit();
- }
-#endif
-
- if (config.cluster_mode) {
- // We only include the slot placeholder {tag} if cluster mode is enabled
- tag = ":{tag}";
-
- /* Fetch cluster configuration. */
- if (!fetchClusterConfiguration() || !config.cluster_nodes) {
- if (!config.hostsocket) {
- fprintf(stderr, "Failed to fetch cluster configuration from "
- "%s:%d\n", config.conn_info.hostip, config.conn_info.hostport);
- } else {
- fprintf(stderr, "Failed to fetch cluster configuration from "
- "%s\n", config.hostsocket);
- }
- exit(1);
- }
- if (config.cluster_node_count <= 1) {
- fprintf(stderr, "Invalid cluster: %d node(s).\n",
- config.cluster_node_count);
- exit(1);
- }
- printf("Cluster has %d master nodes:\n\n", config.cluster_node_count);
- int i = 0;
- for (; i < config.cluster_node_count; i++) {
- clusterNode *node = config.cluster_nodes[i];
- if (!node) {
- fprintf(stderr, "Invalid cluster node #%d\n", i);
- exit(1);
- }
- printf("Master %d: ", i);
- if (node->name) printf("%s ", node->name);
- printf("%s:%d\n", node->ip, node->port);
- node->redis_config = getRedisConfig(node->ip, node->port, NULL);
- if (node->redis_config == NULL) {
- fprintf(stderr, "WARNING: Could not fetch node CONFIG %s:%d\n",
- node->ip, node->port);
- }
- }
- printf("\n");
- /* Automatically set thread number to node count if not specified
- * by the user. */
- if (config.num_threads == 0)
- config.num_threads = config.cluster_node_count;
- } else {
- config.redis_config =
- getRedisConfig(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
- if (config.redis_config == NULL) {
- fprintf(stderr, "WARNING: Could not fetch server CONFIG\n");
- }
- }
- if (config.num_threads > 0) {
- pthread_mutex_init(&(config.liveclients_mutex), NULL);
- pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
- }
-
- if (config.keepalive == 0) {
- fprintf(stderr,
- "WARNING: Keepalive disabled. You probably need "
- "'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and "
- "'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order "
- "to use a lot of clients/requests\n");
- }
- if (argc > 0 && config.tests != NULL) {
- fprintf(stderr, "WARNING: Option -t is ignored.\n");
- }
-
- if (config.idlemode) {
- printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
- int thread_id = -1, use_threads = (config.num_threads > 0);
- if (use_threads) {
- thread_id = 0;
- initBenchmarkThreads();
- }
- c = createClient("",0,NULL,thread_id); /* will never receive a reply */
- createMissingClients(c);
- if (use_threads) startBenchmarkThreads();
- else aeMain(config.el);
- /* and will wait for every */
- }
- if(config.csv){
- printf("\"test\",\"rps\",\"avg_latency_ms\",\"min_latency_ms\",\"p50_latency_ms\",\"p95_latency_ms\",\"p99_latency_ms\",\"max_latency_ms\"\n");
- }
- /* Run benchmark with command in the remainder of the arguments. */
- if (argc) {
- sds title = sdsnew(argv[0]);
- for (i = 1; i < argc; i++) {
- title = sdscatlen(title, " ", 1);
- title = sdscatlen(title, (char*)argv[i], strlen(argv[i]));
- }
- sds *sds_args = getSdsArrayFromArgv(argc, argv, 0);
- if (!sds_args) {
- fprintf(stderr, "Invalid quoted string\n");
- return 1;
- }
- if (config.stdinarg) {
- sds_args = sds_realloc(sds_args,(argc + 1) * sizeof(sds));
- sds_args[argc] = readArgFromStdin();
- argc++;
- }
- /* Setup argument length */
- size_t *argvlen = zmalloc(argc*sizeof(size_t));
- for (i = 0; i < argc; i++)
- argvlen[i] = sdslen(sds_args[i]);
- do {
- len = redisFormatCommandArgv(&cmd,argc,(const char**)sds_args,argvlen);
- // adjust the datasize to the parsed command
- config.datasize = len;
- benchmark(title,cmd,len);
- free(cmd);
- } while(config.loop);
- sdsfreesplitres(sds_args, argc);
-
- sdsfree(title);
- if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
- zfree(argvlen);
- return 0;
- }
-
- /* Run default benchmark suite. */
- data = zmalloc(config.datasize+1);
- do {
- genBenchmarkRandomData(data, config.datasize);
- data[config.datasize] = '\0';
-
- if (test_is_selected("ping_inline") || test_is_selected("ping"))
- benchmark("PING_INLINE","PING\r\n",6);
-
- if (test_is_selected("ping_mbulk") || test_is_selected("ping")) {
- len = redisFormatCommand(&cmd,"PING");
- benchmark("PING_MBULK",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("set")) {
- len = redisFormatCommand(&cmd,"SET key%s:__rand_int__ %s",tag,data);
- benchmark("SET",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("get")) {
- len = redisFormatCommand(&cmd,"GET key%s:__rand_int__",tag);
- benchmark("GET",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("incr")) {
- len = redisFormatCommand(&cmd,"INCR counter%s:__rand_int__",tag);
- benchmark("INCR",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("lpush")) {
- len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
- benchmark("LPUSH",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("rpush")) {
- len = redisFormatCommand(&cmd,"RPUSH mylist%s %s",tag,data);
- benchmark("RPUSH",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("lpop")) {
- len = redisFormatCommand(&cmd,"LPOP mylist%s",tag);
- benchmark("LPOP",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("rpop")) {
- len = redisFormatCommand(&cmd,"RPOP mylist%s",tag);
- benchmark("RPOP",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("sadd")) {
- len = redisFormatCommand(&cmd,
- "SADD myset%s element:__rand_int__",tag);
- benchmark("SADD",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("hset")) {
- len = redisFormatCommand(&cmd,
- "HSET myhash%s element:__rand_int__ %s",tag,data);
- benchmark("HSET",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("spop")) {
- len = redisFormatCommand(&cmd,"SPOP myset%s",tag);
- benchmark("SPOP",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("zadd")) {
- char *score = "0";
- if (config.randomkeys) score = "__rand_int__";
- len = redisFormatCommand(&cmd,
- "ZADD myzset%s %s element:__rand_int__",tag,score);
- benchmark("ZADD",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("zpopmin")) {
- len = redisFormatCommand(&cmd,"ZPOPMIN myzset%s",tag);
- benchmark("ZPOPMIN",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("lrange") ||
- test_is_selected("lrange_100") ||
- test_is_selected("lrange_300") ||
- test_is_selected("lrange_500") ||
- test_is_selected("lrange_600"))
- {
- len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
- benchmark("LPUSH (needed to benchmark LRANGE)",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("lrange") || test_is_selected("lrange_100")) {
- len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 99",tag);
- benchmark("LRANGE_100 (first 100 elements)",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("lrange") || test_is_selected("lrange_300")) {
- len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 299",tag);
- benchmark("LRANGE_300 (first 300 elements)",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("lrange") || test_is_selected("lrange_500")) {
- len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 499",tag);
- benchmark("LRANGE_500 (first 500 elements)",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("lrange") || test_is_selected("lrange_600")) {
- len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 599",tag);
- benchmark("LRANGE_600 (first 600 elements)",cmd,len);
- free(cmd);
- }
-
- if (test_is_selected("mset")) {
- const char *cmd_argv[21];
- cmd_argv[0] = "MSET";
- sds key_placeholder = sdscatprintf(sdsnew(""),"key%s:__rand_int__",tag);
- for (i = 1; i < 21; i += 2) {
- cmd_argv[i] = key_placeholder;
- cmd_argv[i+1] = data;
- }
- len = redisFormatCommandArgv(&cmd,21,cmd_argv,NULL);
- benchmark("MSET (10 keys)",cmd,len);
- free(cmd);
- sdsfree(key_placeholder);
- }
-
- if (test_is_selected("xadd")) {
- len = redisFormatCommand(&cmd,"XADD mystream%s * myfield %s", tag, data);
- benchmark("XADD",cmd,len);
- free(cmd);
- }
-
- if (!config.csv) printf("\n");
- } while(config.loop);
-
- zfree(data);
- freeCliConnInfo(config.conn_info);
- if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
-
- return 0;
-}