summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/pubsub.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/pubsub.c')
-rw-r--r--examples/redis-unstable/src/pubsub.c768
1 files changed, 768 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/pubsub.c b/examples/redis-unstable/src/pubsub.c
new file mode 100644
index 0000000..7199be1
--- /dev/null
+++ b/examples/redis-unstable/src/pubsub.c
@@ -0,0 +1,768 @@
+/*
+ * Copyright (c) 2009-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Copyright (c) 2024-present, Valkey contributors.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ *
+ * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+ */
+
+#include "server.h"
+#include "cluster.h"
+#include "cluster_slot_stats.h"
+
+/* Structure to hold the pubsub related metadata. Currently used
+ * for pubsub and pubsubshard feature. */
+typedef struct pubsubtype {
+ int shard;
+ dict *(*clientPubSubChannels)(client*);
+ int (*subscriptionCount)(client*);
+ kvstore **serverPubSubChannels;
+ robj **subscribeMsg;
+ robj **unsubscribeMsg;
+ robj **messageBulk;
+}pubsubtype;
+
+/*
+ * Get client's global Pub/Sub channels subscription count.
+ */
+int clientSubscriptionsCount(client *c);
+
+/*
+ * Get client's shard level Pub/Sub channels subscription count.
+ */
+int clientShardSubscriptionsCount(client *c);
+
+/*
+ * Get client's global Pub/Sub channels dict.
+ */
+dict* getClientPubSubChannels(client *c);
+
+/*
+ * Get client's shard level Pub/Sub channels dict.
+ */
+dict* getClientPubSubShardChannels(client *c);
+
+/*
+ * Get list of channels client is subscribed to.
+ * If a pattern is provided, the subset of channels is returned
+ * matching the pattern.
+ */
+void channelList(client *c, sds pat, kvstore *pubsub_channels);
+
+/*
+ * Pub/Sub type for global channels.
+ */
+pubsubtype pubSubType = {
+ .shard = 0,
+ .clientPubSubChannels = getClientPubSubChannels,
+ .subscriptionCount = clientSubscriptionsCount,
+ .serverPubSubChannels = &server.pubsub_channels,
+ .subscribeMsg = &shared.subscribebulk,
+ .unsubscribeMsg = &shared.unsubscribebulk,
+ .messageBulk = &shared.messagebulk,
+};
+
+/*
+ * Pub/Sub type for shard level channels bounded to a slot.
+ */
+pubsubtype pubSubShardType = {
+ .shard = 1,
+ .clientPubSubChannels = getClientPubSubShardChannels,
+ .subscriptionCount = clientShardSubscriptionsCount,
+ .serverPubSubChannels = &server.pubsubshard_channels,
+ .subscribeMsg = &shared.ssubscribebulk,
+ .unsubscribeMsg = &shared.sunsubscribebulk,
+ .messageBulk = &shared.smessagebulk,
+};
+
+/*-----------------------------------------------------------------------------
+ * Pubsub client replies API
+ *----------------------------------------------------------------------------*/
+
+/* Send a pubsub message of type "message" to the client.
+ * Normally 'msg' is a Redis object containing the string to send as
+ * message. However if the caller sets 'msg' as NULL, it will be able
+ * to send a special message (for instance an Array type) by using the
+ * addReply*() API family. */
+void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,message_bulk);
+ addReplyBulk(c,channel);
+ if (msg) addReplyBulk(c,msg);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send a pubsub message of type "pmessage" to the client. The difference
+ * with the "message" type delivered by addReplyPubsubMessage() is that
+ * this message format also includes the pattern that matched the message. */
+void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[4]);
+ else
+ addReplyPushLen(c,4);
+ addReply(c,shared.pmessagebulk);
+ addReplyBulk(c,pat);
+ addReplyBulk(c,channel);
+ addReplyBulk(c,msg);
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub subscription notification to the client. */
+void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,*type.subscribeMsg);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,type.subscriptionCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub unsubscription notification to the client.
+ * Channel can be NULL: this is useful when the client sends a mass
+ * unsubscribe command but there are no channels to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c, *type.unsubscribeMsg);
+ if (channel)
+ addReplyBulk(c,channel);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,type.subscriptionCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub pattern subscription notification to the client. */
+void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.psubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/* Send the pubsub pattern unsubscription notification to the client.
+ * Pattern can be NULL: this is useful when the client sends a mass
+ * punsubscribe command but there are no pattern to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
+ uint64_t old_flags = c->flags;
+ c->flags |= CLIENT_PUSHING;
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.punsubscribebulk);
+ if (pattern)
+ addReplyBulk(c,pattern);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+ if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
+}
+
+/*-----------------------------------------------------------------------------
+ * Pubsub low level API
+ *----------------------------------------------------------------------------*/
+
+/* Return the number of pubsub channels + patterns is handled. */
+int serverPubsubSubscriptionCount(void) {
+ return kvstoreSize(server.pubsub_channels) + dictSize(server.pubsub_patterns);
+}
+
+/* Return the number of pubsub shard level channels is handled. */
+int serverPubsubShardSubscriptionCount(void) {
+ return kvstoreSize(server.pubsubshard_channels);
+}
+
+/* Return the number of channels + patterns a client is subscribed to. */
+int clientSubscriptionsCount(client *c) {
+ return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns);
+}
+
+/* Return the number of shard level channels a client is subscribed to. */
+int clientShardSubscriptionsCount(client *c) {
+ return dictSize(c->pubsubshard_channels);
+}
+
+dict* getClientPubSubChannels(client *c) {
+ return c->pubsub_channels;
+}
+
+dict* getClientPubSubShardChannels(client *c) {
+ return c->pubsubshard_channels;
+}
+
+/* Return the number of pubsub + pubsub shard level channels
+ * a client is subscribed to. */
+int clientTotalPubSubSubscriptionCount(client *c) {
+ return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c);
+}
+
+void markClientAsPubSub(client *c) {
+ if (!(c->flags & CLIENT_PUBSUB)) {
+ c->flags |= CLIENT_PUBSUB;
+ server.pubsub_clients++;
+ }
+}
+
+void unmarkClientAsPubSub(client *c) {
+ if (c->flags & CLIENT_PUBSUB) {
+ c->flags &= ~CLIENT_PUBSUB;
+ server.pubsub_clients--;
+ }
+}
+
+/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was already subscribed to that channel. */
+int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
+ dictEntry *de, *existing;
+ dict *clients = NULL;
+ int retval = 0;
+ unsigned int slot = 0;
+
+ /* Add the channel to the client -> channels hash table */
+ dictEntryLink bucket;
+ dictEntryLink link = dictFindLink(type.clientPubSubChannels(c),channel,&bucket);
+ if (link == NULL) { /* Not yet subscribed to this channel */
+ retval = 1;
+ /* Add the client to the channel -> list of clients hash table */
+ if (server.cluster_enabled && type.shard) {
+ slot = getKeySlot(channel->ptr);
+ }
+
+ de = kvstoreDictAddRaw(*type.serverPubSubChannels, slot, channel, &existing);
+
+ if (existing) {
+ clients = dictGetVal(existing);
+ channel = dictGetKey(existing);
+ } else {
+ clients = dictCreate(&clientDictType);
+ kvstoreDictSetVal(*type.serverPubSubChannels, slot, de, clients);
+ incrRefCount(channel);
+ }
+
+ serverAssert(dictAdd(clients, c, NULL) != DICT_ERR);
+ dictSetKeyAtLink(type.clientPubSubChannels(c), channel, &bucket, 1);
+ incrRefCount(channel);
+ }
+ /* Notify the client */
+ addReplyPubsubSubscribed(c,channel,type);
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
+ dictEntry *de;
+ dict *clients;
+ int retval = 0;
+ int slot = 0;
+
+ /* Remove the channel from the client -> channels hash table */
+ incrRefCount(channel); /* channel may be just a pointer to the same object
+ we have in the hash tables. Protect it... */
+ if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) {
+ retval = 1;
+ /* Remove the client from the channel -> clients list hash table */
+ if (server.cluster_enabled && type.shard) {
+ slot = getKeySlot(channel->ptr);
+ }
+ de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
+ serverAssertWithInfo(c,NULL,de != NULL);
+ clients = dictGetVal(de);
+ serverAssertWithInfo(c, NULL, dictDelete(clients, c) == DICT_OK);
+ if (dictSize(clients) == 0) {
+ /* Free the dict and associated hash entry at all if this was
+ * the latest client, so that it will be possible to abuse
+ * Redis PUBSUB creating millions of channels. */
+ kvstoreDictDelete(*type.serverPubSubChannels, slot, channel);
+ }
+ }
+ /* Notify the client */
+ if (notify) {
+ addReplyPubsubUnsubscribed(c,channel,type);
+ }
+ decrRefCount(channel); /* it is finally safe to release it */
+ return retval;
+}
+
+/* Unsubscribe all shard channels in a slot. */
+void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
+ if (!kvstoreDictSize(server.pubsubshard_channels, slot))
+ return;
+
+ dictEntry *de;
+ kvstoreDictIterator kvs_di;
+ kvstoreInitDictSafeIterator(&kvs_di, server.pubsubshard_channels, slot);
+ while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
+ robj *channel = dictGetKey(de);
+ dict *clients = dictGetVal(de);
+ /* For each client subscribed to the channel, unsubscribe it. */
+ dictIterator iter;
+ dictEntry *entry;
+
+ dictInitIterator(&iter, clients);
+ while ((entry = dictNext(&iter)) != NULL) {
+ client *c = dictGetKey(entry);
+ int retval = dictDelete(c->pubsubshard_channels, channel);
+ serverAssertWithInfo(c,channel,retval == DICT_OK);
+ addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
+ /* If the client has no other pubsub subscription,
+ * move out of pubsub mode. */
+ if (clientTotalPubSubSubscriptionCount(c) == 0) {
+ unmarkClientAsPubSub(c);
+ }
+ }
+ dictResetIterator(&iter);
+ kvstoreDictDelete(server.pubsubshard_channels, slot, channel);
+ }
+ kvstoreResetDictIterator(&kvs_di);
+}
+
+/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
+int pubsubSubscribePattern(client *c, robj *pattern) {
+ dictEntry *de;
+ dict *clients;
+ int retval = 0;
+
+ if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) {
+ retval = 1;
+ incrRefCount(pattern);
+ /* Add the client to the pattern -> list of clients hash table */
+ de = dictFind(server.pubsub_patterns,pattern);
+ if (de == NULL) {
+ clients = dictCreate(&clientDictType);
+ dictAdd(server.pubsub_patterns,pattern,clients);
+ incrRefCount(pattern);
+ } else {
+ clients = dictGetVal(de);
+ }
+ serverAssert(dictAdd(clients, c, NULL) != DICT_ERR);
+ }
+ /* Notify the client */
+ addReplyPubsubPatSubscribed(c,pattern);
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
+ dictEntry *de;
+ dict *clients;
+ int retval = 0;
+
+ incrRefCount(pattern); /* Protect the object. May be the same we remove */
+ if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) {
+ retval = 1;
+ /* Remove the client from the pattern -> clients list hash table */
+ de = dictFind(server.pubsub_patterns,pattern);
+ serverAssertWithInfo(c,NULL,de != NULL);
+ clients = dictGetVal(de);
+ serverAssertWithInfo(c, NULL, dictDelete(clients, c) == DICT_OK);
+ if (dictSize(clients) == 0) {
+ /* Free the dict and associated hash entry at all if this was
+ * the latest client. */
+ dictDelete(server.pubsub_patterns,pattern);
+ }
+ }
+ /* Notify the client */
+ if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
+ decrRefCount(pattern);
+ return retval;
+}
+
+/* Unsubscribe from all the channels. Return the number of channels the
+ * client was subscribed to. */
+int pubsubUnsubscribeAllChannelsInternal(client *c, int notify, pubsubtype type) {
+ int count = 0;
+ if (dictSize(type.clientPubSubChannels(c)) > 0) {
+ dictIterator di;
+ dictEntry *de;
+
+ dictInitSafeIterator(&di, type.clientPubSubChannels(c));
+ while((de = dictNext(&di)) != NULL) {
+ robj *channel = dictGetKey(de);
+
+ count += pubsubUnsubscribeChannel(c,channel,notify,type);
+ }
+ dictResetIterator(&di);
+ }
+ /* We were subscribed to nothing? Still reply to the client. */
+ if (notify && count == 0) {
+ addReplyPubsubUnsubscribed(c,NULL,type);
+ }
+ return count;
+}
+
+/*
+ * Unsubscribe a client from all global channels.
+ */
+int pubsubUnsubscribeAllChannels(client *c, int notify) {
+ int count = pubsubUnsubscribeAllChannelsInternal(c,notify,pubSubType);
+ return count;
+}
+
+/*
+ * Unsubscribe a client from all shard subscribed channels.
+ */
+int pubsubUnsubscribeShardAllChannels(client *c, int notify) {
+ int count = pubsubUnsubscribeAllChannelsInternal(c, notify, pubSubShardType);
+ return count;
+}
+
+/* Unsubscribe from all the patterns. Return the number of patterns the
+ * client was subscribed from. */
+int pubsubUnsubscribeAllPatterns(client *c, int notify) {
+ int count = 0;
+
+ if (dictSize(c->pubsub_patterns) > 0) {
+ dictIterator di;
+ dictEntry *de;
+
+ dictInitSafeIterator(&di, c->pubsub_patterns);
+ while ((de = dictNext(&di)) != NULL) {
+ robj *pattern = dictGetKey(de);
+ count += pubsubUnsubscribePattern(c, pattern, notify);
+ }
+ dictResetIterator(&di);
+ }
+
+ /* We were subscribed to nothing? Still reply to the client. */
+ if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
+ return count;
+}
+
+/*
+ * Publish a message to all the subscribers.
+ */
+int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
+ int receivers = 0;
+ dictEntry *de;
+ dictIterator di;
+ unsigned int slot = 0;
+
+ /* Send to clients listening for that channel */
+ if (server.cluster_enabled && type.shard) {
+ slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
+ }
+ de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
+ if (de) {
+ dict *clients = dictGetVal(de);
+ dictEntry *entry;
+ dictIterator iter;
+
+ dictInitIterator(&iter, clients);
+ while ((entry = dictNext(&iter)) != NULL) {
+ client *c = dictGetKey(entry);
+ addReplyPubsubMessage(c,channel,message,*type.messageBulk);
+ if (clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET))
+ clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
+ updateClientMemUsageAndBucket(c);
+ receivers++;
+ }
+ dictResetIterator(&iter);
+ }
+
+ if (type.shard) {
+ /* Shard pubsub ignores patterns. */
+ return receivers;
+ }
+
+ /* Send to clients listening to matching channels */
+ if (dictSize(server.pubsub_patterns) > 0) {
+ channel = getDecodedObject(channel);
+ dictInitIterator(&di, server.pubsub_patterns);
+ while((de = dictNext(&di)) != NULL) {
+ robj *pattern = dictGetKey(de);
+ dict *clients = dictGetVal(de);
+ if (!stringmatchlen((char*)pattern->ptr,
+ sdslen(pattern->ptr),
+ (char*)channel->ptr,
+ sdslen(channel->ptr),0)) continue;
+
+ dictEntry *entry;
+ dictIterator iter;
+
+ dictInitIterator(&iter, clients);
+ while ((entry = dictNext(&iter)) != NULL) {
+ client *c = dictGetKey(entry);
+ addReplyPubsubPatMessage(c,pattern,channel,message);
+ updateClientMemUsageAndBucket(c);
+ receivers++;
+ }
+ dictResetIterator(&iter);
+ }
+ decrRefCount(channel);
+ dictResetIterator(&di);
+ }
+ return receivers;
+}
+
+/* Publish a message to all the subscribers. */
+int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
+ return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType);
+}
+
+/*-----------------------------------------------------------------------------
+ * Pubsub commands implementation
+ *----------------------------------------------------------------------------*/
+
+/* SUBSCRIBE channel [channel ...] */
+void subscribeCommand(client *c) {
+ int j;
+ if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
+ /**
+ * A client that has CLIENT_DENY_BLOCKING flag on
+ * expect a reply per command and so can not execute subscribe.
+ *
+ * Notice that we have a special treatment for multi because of
+ * backward compatibility
+ */
+ addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
+ return;
+ }
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribeChannel(c,c->argv[j],pubSubType);
+ markClientAsPubSub(c);
+}
+
+/* UNSUBSCRIBE [channel ...] */
+void unsubscribeCommand(client *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllChannels(c,1);
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType);
+ }
+ if (clientTotalPubSubSubscriptionCount(c) == 0) {
+ unmarkClientAsPubSub(c);
+ }
+}
+
+/* PSUBSCRIBE pattern [pattern ...] */
+void psubscribeCommand(client *c) {
+ int j;
+ if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
+ /**
+ * A client that has CLIENT_DENY_BLOCKING flag on
+ * expect a reply per command and so can not execute subscribe.
+ *
+ * Notice that we have a special treatment for multi because of
+ * backward compatibility
+ */
+ addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
+ return;
+ }
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribePattern(c,c->argv[j]);
+ markClientAsPubSub(c);
+}
+
+/* PUNSUBSCRIBE [pattern [pattern ...]] */
+void punsubscribeCommand(client *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllPatterns(c,1);
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribePattern(c,c->argv[j],1);
+ }
+ if (clientTotalPubSubSubscriptionCount(c) == 0) {
+ unmarkClientAsPubSub(c);
+ }
+}
+
+/* This function wraps pubsubPublishMessage and also propagates the message to cluster.
+ * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/
+int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) {
+ int receivers = pubsubPublishMessage(channel, message, sharded);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message, sharded);
+ return receivers;
+}
+
+/* PUBLISH <channel> <message> */
+void publishCommand(client *c) {
+ if (server.sentinel_mode) {
+ sentinelPublishCommand(c);
+ return;
+ }
+
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0);
+ if (!server.cluster_enabled)
+ forceCommandPropagation(c,PROPAGATE_REPL);
+ addReplyLongLong(c,receivers);
+}
+
+/* PUBSUB command for Pub/Sub introspection. */
+void pubsubCommand(client *c) {
+ if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
+ const char *help[] = {
+"CHANNELS [<pattern>]",
+" Return the currently active channels matching a <pattern> (default: '*').",
+"NUMPAT",
+" Return number of subscriptions to patterns.",
+"NUMSUB [<channel> ...]",
+" Return the number of subscribers for the specified channels, excluding",
+" pattern subscriptions(default: no channels).",
+"SHARDCHANNELS [<pattern>]",
+" Return the currently active shard level channels matching a <pattern> (default: '*').",
+"SHARDNUMSUB [<shardchannel> ...]",
+" Return the number of subscribers for the specified shard level channel(s)",
+NULL
+ };
+ addReplyHelp(c, help);
+ } else if (!strcasecmp(c->argv[1]->ptr,"channels") &&
+ (c->argc == 2 || c->argc == 3))
+ {
+ /* PUBSUB CHANNELS [<pattern>] */
+ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
+ channelList(c, pat, server.pubsub_channels);
+ } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
+ /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
+ int j;
+
+ addReplyArrayLen(c,(c->argc-2)*2);
+ for (j = 2; j < c->argc; j++) {
+ dict *d = kvstoreDictFetchValue(server.pubsub_channels, 0, c->argv[j]);
+
+ addReplyBulk(c,c->argv[j]);
+ addReplyLongLong(c, d ? dictSize(d) : 0);
+ }
+ } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
+ /* PUBSUB NUMPAT */
+ addReplyLongLong(c,dictSize(server.pubsub_patterns));
+ } else if (!strcasecmp(c->argv[1]->ptr,"shardchannels") &&
+ (c->argc == 2 || c->argc == 3))
+ {
+ /* PUBSUB SHARDCHANNELS */
+ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
+ channelList(c,pat,server.pubsubshard_channels);
+ } else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) {
+ /* PUBSUB SHARDNUMSUB [ShardChannel_1 ... ShardChannel_N] */
+ int j;
+ addReplyArrayLen(c, (c->argc-2)*2);
+ for (j = 2; j < c->argc; j++) {
+ unsigned int slot = calculateKeySlot(c->argv[j]->ptr);
+ dict *clients = kvstoreDictFetchValue(server.pubsubshard_channels, slot, c->argv[j]);
+
+ addReplyBulk(c,c->argv[j]);
+ addReplyLongLong(c, clients ? dictSize(clients) : 0);
+ }
+ } else {
+ addReplySubcommandSyntaxError(c);
+ }
+}
+
+void channelList(client *c, sds pat, kvstore *pubsub_channels) {
+ long mblen = 0;
+ void *replylen;
+ unsigned int slot_cnt = kvstoreNumDicts(pubsub_channels);
+
+ replylen = addReplyDeferredLen(c);
+ for (unsigned int i = 0; i < slot_cnt; i++) {
+ if (!kvstoreDictSize(pubsub_channels, i))
+ continue;
+ dictEntry *de;
+ kvstoreDictIterator kvs_di;
+ kvstoreInitDictIterator(&kvs_di, pubsub_channels, i);
+ while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
+ robj *cobj = dictGetKey(de);
+ sds channel = cobj->ptr;
+
+ if (!pat || stringmatchlen(pat, sdslen(pat),
+ channel, sdslen(channel),0))
+ {
+ addReplyBulk(c,cobj);
+ mblen++;
+ }
+ }
+ kvstoreResetDictIterator(&kvs_di);
+ }
+ setDeferredArrayLen(c,replylen,mblen);
+}
+
+/* SPUBLISH <shardchannel> <message> */
+void spublishCommand(client *c) {
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
+ if (!server.cluster_enabled)
+ forceCommandPropagation(c,PROPAGATE_REPL);
+ addReplyLongLong(c,receivers);
+}
+
+/* SSUBSCRIBE shardchannel [shardchannel ...] */
+void ssubscribeCommand(client *c) {
+ if (c->flags & CLIENT_DENY_BLOCKING) {
+ /* A client that has CLIENT_DENY_BLOCKING flag on
+ * expect a reply per command and so can not execute subscribe. */
+ addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
+ return;
+ }
+
+ for (int j = 1; j < c->argc; j++) {
+ pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
+ }
+ markClientAsPubSub(c);
+}
+
+/* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */
+void sunsubscribeCommand(client *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeShardAllChannels(c, 1);
+ } else {
+ for (int j = 1; j < c->argc; j++) {
+ pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType);
+ }
+ }
+ if (clientTotalPubSubSubscriptionCount(c) == 0) {
+ unmarkClientAsPubSub(c);
+ }
+}
+
+size_t pubsubMemOverhead(client *c) {
+ /* PubSub patterns */
+ size_t mem = dictMemUsage(c->pubsub_patterns);
+ /* Global PubSub channels */
+ mem += dictMemUsage(c->pubsub_channels);
+ /* Sharded PubSub channels */
+ mem += dictMemUsage(c->pubsubshard_channels);
+ return mem;
+}
+
+int pubsubTotalSubscriptions(void) {
+ return dictSize(server.pubsub_patterns) +
+ kvstoreSize(server.pubsub_channels) +
+ kvstoreSize(server.pubsubshard_channels);
+}