Simple Server-Sent Events based PubSub Server

post, Mar 22, 2020 on Mitja Felicijan's blog

Before we continue ...

Publisher Subscriber model is nothing new and there are many amazing solutions -out there, so writing a new one would be a waste of time if other solutions -wouldn't have quite complex install procedures and weren't so hard to maintain. -But to be fair, comparing this simple server with something like -Kafka or RabbitMQ is -laughable at the least. Those solutions are enterprise grade and have many -mechanisms there to ensure messages aren't lost and much more. Regardless of -these drawbacks, this method has been tested on a large website and worked until -now without any problems. So now, that we got that cleared up, let's continue.

Wiki definition: Publish/subscribe messaging, or pub/sub messaging, is a -form of asynchronous service-to-service communication used in serverless and -microservices architectures. In a pub/sub model, any message published to a -topic is immediately received by all the subscribers to the topic.

General goals

  • provide a simple server that relays messages to all the connected clients,
  • messages can be posted on specific topics,
  • messages get sent via Server-Sent -Events -to all the subscribers.

How exactly does the pub/sub model work?

The easiest way to explain this is with diagram bellow. Basic function is -simple. We have subscribers that receive messages, and we have publishers that -create and post messages. Similar model is also well know pattern that works on -a premise of consumers and producers, and they take similar roles.

How PubSub works

These are some naive characteristics we want to achieve:

  • producer is publishing messages to subscribe topic,
  • consumer is receiving messages from subscribed topic,
  • servers is also known as Broker,
  • broker does not store messages or tracks success,
  • broker uses -FIFO method -for delivering messages,
  • if consumer wants to receive messages from a topic, producer and consumer -topics must match,
  • consumer can subscribe to multiple topics,
  • producer can publish to multiple topics,
  • each message has a messageId.

Known drawbacks:

  • messages will not be stored in a persistent queue or unreceived messages like -DeadLetterQueue so old -messages could be lost on server restart,
  • Server-Sent -Events -opens a long-running connection between the client and the server so make sure -if your setup is load balanced that the load balancer in this case can have -long opened connection,
  • no system moderation due to the dynamic nature of creating queues.

Server-Sent Events

Read more about it on official specification -page.

Current browser support

Browser support

Check -https://caniuse.com/#feat=eventsource -for latest information about browser support.

Known issues

  • Firefox 52 and below do not support EventSource in web/shared workers
  • In Firefox prior to version 36 server-sent events do not reconnect -automatically in case of a connection interrupt (bug)
  • Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera -12+, Chrome 26+, Safari 7.0+.
  • Antivirus software may block the event streaming data chunks.

Source: https://caniuse.com/#feat=eventsource

Message format

The simplest message that can be sent is only with data attribute:

data: this is a simple message
-<blank line>
-

You can send message IDs to be used if the connection is dropped:

id: 33
-data: this is line one
-data: this is line two
-<blank line>
-

And you can specify your own event types (the above messages will all trigger -the message event):

id: 36
-event: price
-data: 103.34
-<blank line>
-

Server requirements

The important thing is how you send headers and which headers are sent by the -server that triggers browser to threat response as a EventStream.

Headers responsible for this are:

Content-Type: text/event-stream
-Cache-Control: no-cache
-Connection: keep-alive
-

Debugging with Google Chrome

Google Chrome provides build-in debugging and exploration tool for Server-Sent -Events -which is quite nice and available from Developer Tools under Network tab.

You can debug only client side events that get received and not the server -ones. For debugging server events add console.log to server.js code and -print out events.

Google Chrome Developer Tools EventStream

Server implementation

For the sake of this example we will use Node.js with -Express as our router since this is the easiest way to -get started and we will use already written SSE library for node -sse-pubsub so we don't reinvent the -wheel.

npm init --yes
-
-npm install express
-npm install body-parser
-npm install sse-pubsub
-

Basic implementation of a server (server.js):

const express = require('express');
-const bodyParser = require('body-parser');
-const SSETopic = require('sse-pubsub');
-
-const app = express();
-const port = process.env.PORT || 4000;
-
-// topics container
-const sseTopics = {};
-
-app.use(bodyParser.json());
-
-// open for all cors
-app.all('*', (req, res, next) => {
-  res.header('Access-Control-Allow-Origin', '*');
-  res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
-  next();
-});
-
-// preflight request error fix
-app.options('*', async (req, res) => {
-  res.header('Access-Control-Allow-Origin', '*');
-  res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
-  res.send('OK');
-});
-
-// serve the event streams
-app.get('/stream/:topic', async (req, res, next) => {
-  const topic = req.params.topic;
-
-  if (!(topic in sseTopics)) {
-    sseTopics[topic] = new SSETopic({
-      pingInterval: 0,
-      maxStreamDuration: 15000,
-    });
-  }
-
-  // subscribing client to topic
-  sseTopics[topic].subscribe(req, res);
-});
-
-// accepts new messages into topic
-app.post('/publish', async (req, res) => {
-  let body = req.body;
-  let status = 200;
-
-  console.log('Incoming message:', req.body);
-
-  if (
-    body.hasOwnProperty('topic') &&
-    body.hasOwnProperty('event') &&
-    body.hasOwnProperty('message')
-  ) {
-    const topic = req.body.topic;
-    const event = req.body.event;
-    const message = req.body.message;
-
-    if (topic in sseTopics) {
-      // sends message to all the subscribers
-      sseTopics[topic].publish(message, event);
-    }
-  } else {
-    status = 400;
-  }
-
-  res.status(status).send({
-    status,
-  });
-});
-
-// returns JSON object of all opened topics
-app.get('/status', async (req, res) => {
-  res.send(sseTopics);
-});
-
-// health-check endpoint
-app.get('/', async (req, res) => {
-  res.send('OK');
-});
-
-// return a 404 if no routes match
-app.use((req, res, next) => {
-  res.set('Cache-Control', 'private, no-store');
-  res.status(404).end('Not found');
-});
-
-// starts the server
-app.listen(port, () => {
-  console.log(`PubSub server running on http://localhost:${port}`);
-});
-

Our custom message format

Each message posted on a server must be in a specific format that out server -accepts. Having structure like this allows us to have multiple separated type of -events on each topic.

With this we can separate streams and only receive events that belong to the -topic.

One example would be, that we have index page and we want to receive messages -about new upvotes or new subscribers but we don't want to follow events for -other pages. This reduces clutter and overall network. And structure is much -nicer and maintanable.

{
-  "topic": "sample-topic",
-  "event": "sample-event",
-  "message": { "name": "John" }
-}
-

Publisher and subscriber clients

Publisher and subscriber in action

You can download the code and -follow along.

Publisher

As talked about above publisher is the one that send messages to the -broker/server. Message inside the payload can be whatever you want (string, -object, array). I would however personally avoid send large chunks of data like -blobs and such.

<!DOCTYPE html>
-<html lang="en">
-
-  <head>
-    <meta charset="UTF-8">
-    <meta name="viewport" content="width=device-width, initial-scale=1.0">
-    <title>Publisher</title>
-  </head>
-
-  <body>
-
-    <h1>Publisher</h1>
-
-    <fieldset>
-      <p>
-        <label>Server:</label>
-        <input type="text" id="server" value="http://localhost:4000">
-      </p>
-      <p>
-        <label>Topic:</label>
-        <input type="text" id="topic" value="sample-topic">
-      </p>
-      <p>
-        <label>Event:</label>
-        <input type="text" id="event" value="sample-event">
-      </p>
-      <p>
-        <label>Message:</label>
-        <input type="text" id="message" value='{"name": "John"}'>
-      </p>
-      <p>
-        <button type="button" id="button">Publish message to topic</button>
-      </p>
-    </fieldset>
-
-    <script>
-
-      const button = document.querySelector('#button');
-      const server = document.querySelector('#server');
-      const topic = document.querySelector('#topic');
-      const event = document.querySelector('#event');
-      const message = document.querySelector('#message');
-
-      button.addEventListener('click', async (evt) => {
-        const req = await fetch(`${server.value}/publish`, {
-          method: 'post',
-          headers: {
-            'Accept': 'application/json',
-            'Content-Type': 'application/json',
-          },
-          body: JSON.stringify({
-            topic: topic.value,
-            event: event.value,
-            message: JSON.parse(message.value),
-          }),
-        });
-
-        const res = await req.json();
-        console.log(res);
-      });
-
-    </script>
-
-  </body>
-
-</html>
-

Subscriber

Subscriber is responsible for receiving new messages that come from server via -publisher. The code bellow is very rudimentary but works and follows the -implementation guidelines for EventSource.

You can use either Developer Tools Console to see incoming messages or you can -defer to Debugging with Google Chrome section above to see all EventStream -messages.

Don't be alarmed if the subscriber gets disconnected from the server every so -often. The code we have here resets connection every 15s but it automatically -get reconnected and fetches all messages up to last received message id. This -setting can be adjusted in server.js file; search for the -maxStreamDuration variable.

<!DOCTYPE html>
-<html lang="en">
-
-  <head>
-    <meta charset="UTF-8">
-    <meta name="viewport" content="width=device-width, initial-scale=1.0">
-    <title>Subscriber</title>
-    <link rel="stylesheet" href="style.css">
-  </head>
-
-  <body>
-
-    <h1>Subscriber</h1>
-
-    <fieldset>
-      <p>
-        <label>Server:</label>
-        <input type="text" id="server" value="http://localhost:4000">
-      </p>
-      <p>
-        <label>Topic:</label>
-        <input type="text" id="topic" value="sample-topic">
-      </p>
-      <p>
-        <label>Event:</label>
-        <input type="text" id="event" value="sample-event">
-      </p>
-      <p>
-        <button type="button" id="button">Subscribe to topic</button>
-      </p>
-    </fieldset>
-
-    <script>
-
-      const button = document.querySelector('#button');
-      const server = document.querySelector('#server');
-      const topic = document.querySelector('#topic');
-      const event = document.querySelector('#event');
-
-      button.addEventListener('click', async (evt) => {
-
-        let es = new EventSource(`${server.value}/stream/${topic.value}`);
-
-        es.addEventListener(event.value, function (evt) {
-          console.log(`incoming message`, JSON.parse(evt.data));
-        });
-
-        es.addEventListener('open', function (evt) {
-          console.log('connected', evt);
-        });
-
-        es.addEventListener('error', function (evt) {
-          console.log('error', evt);
-        });
-
-      });
-
-    </script>
-
-  </body>
-
-</html>
-

Reading further