aboutsummaryrefslogtreecommitdiff
path: root/posts/2020-03-22-simple-sse-based-pubsub-server.md
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2021-01-24 01:42:03 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2021-01-24 01:42:03 +0100
commite07ab67bf95ea7e65828e373c731b6cdf984a7de (patch)
tree4fe471a1a8492149bb0b3e6ec726184e3bcf1647 /posts/2020-03-22-simple-sse-based-pubsub-server.md
parent36fb49bbef11294a93a53c363d32c2134f6b19b4 (diff)
downloadmitjafelicijan.com-e07ab67bf95ea7e65828e373c731b6cdf984a7de.tar.gz
Moved to altenator and DO
Diffstat (limited to 'posts/2020-03-22-simple-sse-based-pubsub-server.md')
-rw-r--r--posts/2020-03-22-simple-sse-based-pubsub-server.md398
1 files changed, 398 insertions, 0 deletions
diff --git a/posts/2020-03-22-simple-sse-based-pubsub-server.md b/posts/2020-03-22-simple-sse-based-pubsub-server.md
new file mode 100644
index 0000000..56a7dfa
--- /dev/null
+++ b/posts/2020-03-22-simple-sse-based-pubsub-server.md
@@ -0,0 +1,398 @@
1---
2Title: Simple Server-Sent Events based PubSub Server
3Description: Simple Server-Sent Events based PubSub Server
4Slug: simple-server-sent-events-based-pubsub-server
5Listing: true
6Created: 2020, March 22
7Tags: []
8---
9
10## Before we continue ...
11
12Publisher Subscriber model is nothing new and there are many amazing solutions out there, so writing a new one would be a waste of time if other solutions wouldn't have quite complex install procedures and weren't so hard to maintain. But to be fair, comparing this simple server with something like [Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is laughable at the least. Those solutions are enterprise grade and have many mechanisms there to ensure messages aren't lost and much more. Regardless of these drawbacks, this method has been tested on a large website and worked until now without any problems. So now, that we got that cleared up, let's continue.
13
14***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all the subscribers to the topic.*
15
16## General goals
17
18- provide a simple server that relays messages to all the connected clients,
19- messages can be posted on specific topics,
20- messages get sent via [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) to all the subscribers.
21
22## How exactly does the pub/sub model work?
23
24The easiest way to explain this is with diagram bellow. Basic function is simple. We have subscribers that receive messages, and we have publishers that create and post messages. Similar model is also well know pattern that works on a premise of consumers and producers, and they take similar roles.
25
26![How PubSub works](/assets/simple-pubsub-server/pubsub-overview.png)
27
28**These are some naive characteristics we want to achieve:**
29
30- producer is publishing messages to subscribe topic,
31- consumer is receiving messages from subscribed topic,
32- servers is also known as Broker,
33- broker does not store messages or tracks success,
34- broker uses [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method for delivering messages,
35- if consumer wants to receive messages from a topic, producer and consumer topics must match,
36- consumer can subscribe to multiple topics,
37- producer can publish to multiple topics,
38- each message has a messageId.
39
40**Known drawbacks:**
41
42- messages will not be stored in a persistent queue or unreceived messages like [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old messages could be lost on server restart,
43- [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) opens a long-running connection between the client and the server so make sure if your setup is load balanced that the load balancer in this case can have long opened connection,
44- no system moderation due to the dynamic nature of creating queues.
45
46## Server-Sent Events
47
48Read more about it on [official specification page](https://html.spec.whatwg.org/multipage/server-sent-events.html).
49
50### Current browser support
51
52![Browser support](../assets/simple-pubsub-server/caniuse.png)
53
54Check [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) for latest information about browser support.
55
56### Known issues
57
58- Firefox 52 and below do not support EventSource in web/shared workers
59- In Firefox prior to version 36 server-sent events do not reconnect automatically in case of a connection interrupt (bug)
60- Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera 12+, Chrome 26+, Safari 7.0+.
61- Antivirus software may block the event streaming data chunks.
62
63Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
64
65### Message format
66
67The simplest message that can be sent is only with data attribute:
68
69```bash
70data: this is a simple message
71<blank line>
72```
73
74You can send message IDs to be used if the connection is dropped:
75
76```bash
77id: 33
78data: this is line one
79data: this is line two
80<blank line>
81```
82
83And you can specify your own event types (the above messages will all trigger the message event):
84
85```bash
86id: 36
87event: price
88data: 103.34
89<blank line>
90```
91
92### Server requirements
93
94The important thing is how you send headers and which headers are sent by the server that triggers browser to threat response as a EventStream.
95
96Headers responsible for this are:
97
98```bash
99Content-Type: text/event-stream
100Cache-Control: no-cache
101Connection: keep-alive
102```
103
104### Debugging with Google Chrome
105
106Google Chrome provides build-in debugging and exploration tool for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) which is quite nice and available from Developer Tools under Network tab.
107
108> You can debug only client side events that get received and not the server ones. For debugging server events add `console.log` to `server.js` code and print out events.
109
110![Google Chrome Developer Tools EventStream](../assets/simple-pubsub-server/chrome-debugging.png)
111
112## Server implementation
113
114For the sake of this example we will use [Node.js](https://nodejs.org/en/) with [Express](https://expressjs.com) as our router since this is the easiest way to get started and we will use already written SSE library for node [sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the wheel.
115
116```bash
117npm init --yes
118
119npm install express
120npm install body-parser
121npm install sse-pubsub
122```
123
124Basic implementation of a server (`server.js`):
125
126```js
127const express = require('express');
128const bodyParser = require('body-parser');
129const SSETopic = require('sse-pubsub');
130
131const app = express();
132const port = process.env.PORT || 4000;
133
134// topics container
135const sseTopics = {};
136
137app.use(bodyParser.json());
138
139// open for all cors
140app.all('*', (req, res, next) => {
141 res.header('Access-Control-Allow-Origin', '*');
142 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
143 next();
144});
145
146// preflight request error fix
147app.options('*', async (req, res) => {
148 res.header('Access-Control-Allow-Origin', '*');
149 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
150 res.send('OK');
151});
152
153// serve the event streams
154app.get('/stream/:topic', async (req, res, next) => {
155 const topic = req.params.topic;
156
157 if (!(topic in sseTopics)) {
158 sseTopics[topic] = new SSETopic({
159 pingInterval: 0,
160 maxStreamDuration: 15000,
161 });
162 }
163
164 // subscribing client to topic
165 sseTopics[topic].subscribe(req, res);
166});
167
168// accepts new messages into topic
169app.post('/publish', async (req, res) => {
170 let body = req.body;
171 let status = 200;
172
173 console.log('Incoming message:', req.body);
174
175 if (
176 body.hasOwnProperty('topic') &&
177 body.hasOwnProperty('event') &&
178 body.hasOwnProperty('message')
179 ) {
180 const topic = req.body.topic;
181 const event = req.body.event;
182 const message = req.body.message;
183
184 if (topic in sseTopics) {
185 // sends message to all the subscribers
186 sseTopics[topic].publish(message, event);
187 }
188 } else {
189 status = 400;
190 }
191
192 res.status(status).send({
193 status,
194 });
195});
196
197// returns JSON object of all opened topics
198app.get('/status', async (req, res) => {
199 res.send(sseTopics);
200});
201
202// health-check endpoint
203app.get('/', async (req, res) => {
204 res.send('OK');
205});
206
207// return a 404 if no routes match
208app.use((req, res, next) => {
209 res.set('Cache-Control', 'private, no-store');
210 res.status(404).end('Not found');
211});
212
213// starts the server
214app.listen(port, () => {
215 console.log(`PubSub server running on http://localhost:${port}`);
216});
217```
218
219### Our custom message format
220
221Each message posted on a server must be in a specific format that out server accepts. Having structure like this allows us to have multiple separated type of events on each topic.
222
223With this we can separate streams and only receive events that belong to the topic.
224
225One example would be, that we have index page and we want to receive messages about new upvotes or new subscribers but we don't want to follow events for other pages. This reduces clutter and overall network. And structure is much nicer and maintanable.
226
227```json
228{
229 "topic": "sample-topic",
230 "event": "sample-event",
231 "message": { "name": "John" }
232}
233```
234
235## Publisher and subscriber clients
236
237### Publisher and subscriber in action
238
239<video src="/assets/simple-pubsub-server/clients.mp4" controls></video>
240
241You can download [the code](../assets/simple-pubsub-server/sse-pubsub-server.zip) and follow along.
242
243### Publisher
244
245As talked about above publisher is the one that send messages to the broker/server. Message inside the payload can be whatever you want (string, object, array). I would however personally avoid send large chunks of data like blobs and such.
246
247```html
248<!DOCTYPE html>
249<html lang="en">
250
251 <head>
252 <meta charset="UTF-8">
253 <meta name="viewport" content="width=device-width, initial-scale=1.0">
254 <title>Publisher</title>
255 </head>
256
257 <body>
258
259 <h1>Publisher</h1>
260
261 <fieldset>
262 <p>
263 <label>Server:</label>
264 <input type="text" id="server" value="http://localhost:4000">
265 </p>
266 <p>
267 <label>Topic:</label>
268 <input type="text" id="topic" value="sample-topic">
269 </p>
270 <p>
271 <label>Event:</label>
272 <input type="text" id="event" value="sample-event">
273 </p>
274 <p>
275 <label>Message:</label>
276 <input type="text" id="message" value='{"name": "John"}'>
277 </p>
278 <p>
279 <button type="button" id="button">Publish message to topic</button>
280 </p>
281 </fieldset>
282
283 <script>
284
285 const button = document.querySelector('#button');
286 const server = document.querySelector('#server');
287 const topic = document.querySelector('#topic');
288 const event = document.querySelector('#event');
289 const message = document.querySelector('#message');
290
291 button.addEventListener('click', async (evt) => {
292 const req = await fetch(`${server.value}/publish`, {
293 method: 'post',
294 headers: {
295 'Accept': 'application/json',
296 'Content-Type': 'application/json',
297 },
298 body: JSON.stringify({
299 topic: topic.value,
300 event: event.value,
301 message: JSON.parse(message.value),
302 }),
303 });
304
305 const res = await req.json();
306 console.log(res);
307 });
308
309 </script>
310
311 </body>
312
313</html>
314
315```
316
317### Subscriber
318
319Subscriber is responsible for receiving new messages that come from server via publisher. The code bellow is very rudimentary but works and follows the implementation guidelines for EventSource.
320
321You can use either Developer Tools Console to see incoming messages or you can defer to Debugging with Google Chrome section above to see all EventStream messages.
322
323> Don't be alarmed if the subscriber gets disconnected from the server every so often. The code we have here resets connection every 15s but it automatically get reconnected and fetches all messages up to last received message id. This setting can be adjusted in `server.js` file; search for the `maxStreamDuration` variable.
324
325```html
326<!DOCTYPE html>
327<html lang="en">
328
329 <head>
330 <meta charset="UTF-8">
331 <meta name="viewport" content="width=device-width, initial-scale=1.0">
332 <title>Subscriber</title>
333 <link rel="stylesheet" href="style.css">
334 </head>
335
336 <body>
337
338 <h1>Subscriber</h1>
339
340 <fieldset>
341 <p>
342 <label>Server:</label>
343 <input type="text" id="server" value="http://localhost:4000">
344 </p>
345 <p>
346 <label>Topic:</label>
347 <input type="text" id="topic" value="sample-topic">
348 </p>
349 <p>
350 <label>Event:</label>
351 <input type="text" id="event" value="sample-event">
352 </p>
353 <p>
354 <button type="button" id="button">Subscribe to topic</button>
355 </p>
356 </fieldset>
357
358 <script>
359
360 const button = document.querySelector('#button');
361 const server = document.querySelector('#server');
362 const topic = document.querySelector('#topic');
363 const event = document.querySelector('#event');
364
365 button.addEventListener('click', async (evt) => {
366
367 let es = new EventSource(`${server.value}/stream/${topic.value}`);
368
369 es.addEventListener(event.value, function (evt) {
370 console.log(`incoming message`, JSON.parse(evt.data));
371 });
372
373 es.addEventListener('open', function (evt) {
374 console.log('connected', evt);
375 });
376
377 es.addEventListener('error', function (evt) {
378 console.log('error', evt);
379 });
380
381 });
382
383 </script>
384
385 </body>
386
387</html>
388
389```
390
391## Reading further
392
393- [Using server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
394- [Using SSE Instead Of WebSockets For Unidirectional Data Flow Over HTTP/2](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/)
395- [What is Server-Sent Events?](https://apifriends.com/api-streaming/server-sent-events/)
396- [An HTTP/2 extension for bidirectional messaging communication](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html)
397- [Introduction to HTTP/2](https://developers.google.com/web/fundamentals/performance/http2)
398- [The WebSocket API (WebSockets)](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API)