aboutsummaryrefslogtreecommitdiff
path: root/content/posts/2020-03-22-simple-sse-based-pubsub-server.md
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2024-03-10 14:59:14 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2024-03-10 14:59:14 +0100
commit1100562e29f6476448b656dbddd4cf22505523f6 (patch)
tree442eec492199104bd49dfd74474ce89ade8fcac9 /content/posts/2020-03-22-simple-sse-based-pubsub-server.md
parenta40d80be378e46a6c490e1b99b0d8f4acd968503 (diff)
downloadmitjafelicijan.com-1100562e29f6476448b656dbddd4cf22505523f6.tar.gz
Move back to JBMAFP
Diffstat (limited to 'content/posts/2020-03-22-simple-sse-based-pubsub-server.md')
-rw-r--r--content/posts/2020-03-22-simple-sse-based-pubsub-server.md454
1 files changed, 454 insertions, 0 deletions
diff --git a/content/posts/2020-03-22-simple-sse-based-pubsub-server.md b/content/posts/2020-03-22-simple-sse-based-pubsub-server.md
new file mode 100644
index 0000000..40909b4
--- /dev/null
+++ b/content/posts/2020-03-22-simple-sse-based-pubsub-server.md
@@ -0,0 +1,454 @@
1---
2title: Simple Server-Sent Events based PubSub Server
3url: /simple-server-sent-events-based-pubsub-server.html
4date: 2020-03-22T12:00:00+02:00
5type: post
6draft: false
7---
8
9## Before we continue ...
10
11Publisher Subscriber model is nothing new and there are many amazing solutions
12out there, so writing a new one would be a waste of time if other solutions
13wouldn't have quite complex install procedures and weren't so hard to maintain.
14But to be fair, comparing this simple server with something like
15[Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is
16laughable at the least. Those solutions are enterprise grade and have many
17mechanisms there to ensure messages aren't lost and much more. Regardless of
18these drawbacks, this method has been tested on a large website and worked until
19now without any problems. So now, that we got that cleared up, let's continue.
20
21***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a
22form of asynchronous service-to-service communication used in serverless and
23microservices architectures. In a pub/sub model, any message published to a
24topic is immediately received by all the subscribers to the topic.*
25
26## General goals
27
28- provide a simple server that relays messages to all the connected clients,
29- messages can be posted on specific topics,
30- messages get sent via [Server-Sent
31 Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
32 to all the subscribers.
33
34## How exactly does the pub/sub model work?
35
36The easiest way to explain this is with diagram bellow. Basic function is
37simple. We have subscribers that receive messages, and we have publishers that
38create and post messages. Similar model is also well know pattern that works on
39a premise of consumers and producers, and they take similar roles.
40
41![How PubSub works](/assets/posts/simple-pubsub-server/pubsub-overview.png)
42
43**These are some naive characteristics we want to achieve:**
44
45- producer is publishing messages to subscribe topic,
46- consumer is receiving messages from subscribed topic,
47- servers is also known as Broker,
48- broker does not store messages or tracks success,
49- broker uses
50 [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method
51 for delivering messages,
52- if consumer wants to receive messages from a topic, producer and consumer
53 topics must match,
54- consumer can subscribe to multiple topics,
55- producer can publish to multiple topics,
56- each message has a messageId.
57
58**Known drawbacks:**
59
60- messages will not be stored in a persistent queue or unreceived messages like
61 [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old
62 messages could be lost on server restart,
63- [Server-Sent
64 Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
65 opens a long-running connection between the client and the server so make sure
66 if your setup is load balanced that the load balancer in this case can have
67 long opened connection,
68- no system moderation due to the dynamic nature of creating queues.
69
70## Server-Sent Events
71
72Read more about it on [official specification
73page](https://html.spec.whatwg.org/multipage/server-sent-events.html).
74
75### Current browser support
76
77![Browser support](/assets/posts/simple-pubsub-server/caniuse.png)
78
79Check
80[https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
81for latest information about browser support.
82
83### Known issues
84
85- Firefox 52 and below do not support EventSource in web/shared workers
86- In Firefox prior to version 36 server-sent events do not reconnect
87 automatically in case of a connection interrupt (bug)
88- Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera
89 12+, Chrome 26+, Safari 7.0+.
90- Antivirus software may block the event streaming data chunks.
91
92Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
93
94### Message format
95
96The simplest message that can be sent is only with data attribute:
97
98```bash
99data: this is a simple message
100<blank line>
101```
102
103You can send message IDs to be used if the connection is dropped:
104
105```bash
106id: 33
107data: this is line one
108data: this is line two
109<blank line>
110```
111
112And you can specify your own event types (the above messages will all trigger
113the message event):
114
115```bash
116id: 36
117event: price
118data: 103.34
119<blank line>
120```
121
122### Server requirements
123
124The important thing is how you send headers and which headers are sent by the
125server that triggers browser to threat response as a EventStream.
126
127Headers responsible for this are:
128
129```bash
130Content-Type: text/event-stream
131Cache-Control: no-cache
132Connection: keep-alive
133```
134
135### Debugging with Google Chrome
136
137Google Chrome provides build-in debugging and exploration tool for [Server-Sent
138Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
139which is quite nice and available from Developer Tools under Network tab.
140
141> You can debug only client side events that get received and not the server
142> ones. For debugging server events add `console.log` to `server.js` code and
143> print out events.
144
145![Google Chrome Developer Tools EventStream](/assets/posts/simple-pubsub-server/chrome-debugging.png)
146
147## Server implementation
148
149For the sake of this example we will use [Node.js](https://nodejs.org/en/) with
150[Express](https://expressjs.com) as our router since this is the easiest way to
151get started and we will use already written SSE library for node
152[sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the
153wheel.
154
155```bash
156npm init --yes
157
158npm install express
159npm install body-parser
160npm install sse-pubsub
161```
162
163Basic implementation of a server (`server.js`):
164
165```js
166const express = require('express');
167const bodyParser = require('body-parser');
168const SSETopic = require('sse-pubsub');
169
170const app = express();
171const port = process.env.PORT || 4000;
172
173// topics container
174const sseTopics = {};
175
176app.use(bodyParser.json());
177
178// open for all cors
179app.all('*', (req, res, next) => {
180 res.header('Access-Control-Allow-Origin', '*');
181 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
182 next();
183});
184
185// preflight request error fix
186app.options('*', async (req, res) => {
187 res.header('Access-Control-Allow-Origin', '*');
188 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
189 res.send('OK');
190});
191
192// serve the event streams
193app.get('/stream/:topic', async (req, res, next) => {
194 const topic = req.params.topic;
195
196 if (!(topic in sseTopics)) {
197 sseTopics[topic] = new SSETopic({
198 pingInterval: 0,
199 maxStreamDuration: 15000,
200 });
201 }
202
203 // subscribing client to topic
204 sseTopics[topic].subscribe(req, res);
205});
206
207// accepts new messages into topic
208app.post('/publish', async (req, res) => {
209 let body = req.body;
210 let status = 200;
211
212 console.log('Incoming message:', req.body);
213
214 if (
215 body.hasOwnProperty('topic') &&
216 body.hasOwnProperty('event') &&
217 body.hasOwnProperty('message')
218 ) {
219 const topic = req.body.topic;
220 const event = req.body.event;
221 const message = req.body.message;
222
223 if (topic in sseTopics) {
224 // sends message to all the subscribers
225 sseTopics[topic].publish(message, event);
226 }
227 } else {
228 status = 400;
229 }
230
231 res.status(status).send({
232 status,
233 });
234});
235
236// returns JSON object of all opened topics
237app.get('/status', async (req, res) => {
238 res.send(sseTopics);
239});
240
241// health-check endpoint
242app.get('/', async (req, res) => {
243 res.send('OK');
244});
245
246// return a 404 if no routes match
247app.use((req, res, next) => {
248 res.set('Cache-Control', 'private, no-store');
249 res.status(404).end('Not found');
250});
251
252// starts the server
253app.listen(port, () => {
254 console.log(`PubSub server running on http://localhost:${port}`);
255});
256```
257
258### Our custom message format
259
260Each message posted on a server must be in a specific format that out server
261accepts. Having structure like this allows us to have multiple separated type of
262events on each topic.
263
264With this we can separate streams and only receive events that belong to the
265topic.
266
267One example would be, that we have index page and we want to receive messages
268about new upvotes or new subscribers but we don't want to follow events for
269other pages. This reduces clutter and overall network. And structure is much
270nicer and maintanable.
271
272```json
273{
274 "topic": "sample-topic",
275 "event": "sample-event",
276 "message": { "name": "John" }
277}
278```
279
280## Publisher and subscriber clients
281
282### Publisher and subscriber in action
283
284<video src="/assets/posts/simple-pubsub-server/clients.m4v" controls></video>
285
286You can download [the code](../simple-pubsub-server/sse-pubsub-server.zip) and
287follow along.
288
289### Publisher
290
291As talked about above publisher is the one that send messages to the
292broker/server. Message inside the payload can be whatever you want (string,
293object, array). I would however personally avoid send large chunks of data like
294blobs and such.
295
296```html
297<!DOCTYPE html>
298<html lang="en">
299
300 <head>
301 <meta charset="UTF-8">
302 <meta name="viewport" content="width=device-width, initial-scale=1.0">
303 <title>Publisher</title>
304 </head>
305
306 <body>
307
308 <h1>Publisher</h1>
309
310 <fieldset>
311 <p>
312 <label>Server:</label>
313 <input type="text" id="server" value="http://localhost:4000">
314 </p>
315 <p>
316 <label>Topic:</label>
317 <input type="text" id="topic" value="sample-topic">
318 </p>
319 <p>
320 <label>Event:</label>
321 <input type="text" id="event" value="sample-event">
322 </p>
323 <p>
324 <label>Message:</label>
325 <input type="text" id="message" value='{"name": "John"}'>
326 </p>
327 <p>
328 <button type="button" id="button">Publish message to topic</button>
329 </p>
330 </fieldset>
331
332 <script>
333
334 const button = document.querySelector('#button');
335 const server = document.querySelector('#server');
336 const topic = document.querySelector('#topic');
337 const event = document.querySelector('#event');
338 const message = document.querySelector('#message');
339
340 button.addEventListener('click', async (evt) => {
341 const req = await fetch(`${server.value}/publish`, {
342 method: 'post',
343 headers: {
344 'Accept': 'application/json',
345 'Content-Type': 'application/json',
346 },
347 body: JSON.stringify({
348 topic: topic.value,
349 event: event.value,
350 message: JSON.parse(message.value),
351 }),
352 });
353
354 const res = await req.json();
355 console.log(res);
356 });
357
358 </script>
359
360 </body>
361
362</html>
363```
364
365### Subscriber
366
367Subscriber is responsible for receiving new messages that come from server via
368publisher. The code bellow is very rudimentary but works and follows the
369implementation guidelines for EventSource.
370
371You can use either Developer Tools Console to see incoming messages or you can
372defer to Debugging with Google Chrome section above to see all EventStream
373messages.
374
375> Don't be alarmed if the subscriber gets disconnected from the server every so
376> often. The code we have here resets connection every 15s but it automatically
377> get reconnected and fetches all messages up to last received message id. This
378> setting can be adjusted in `server.js` file; search for the
379> `maxStreamDuration` variable.
380
381```html
382<!DOCTYPE html>
383<html lang="en">
384
385 <head>
386 <meta charset="UTF-8">
387 <meta name="viewport" content="width=device-width, initial-scale=1.0">
388 <title>Subscriber</title>
389 <link rel="stylesheet" href="style.css">
390 </head>
391
392 <body>
393
394 <h1>Subscriber</h1>
395
396 <fieldset>
397 <p>
398 <label>Server:</label>
399 <input type="text" id="server" value="http://localhost:4000">
400 </p>
401 <p>
402 <label>Topic:</label>
403 <input type="text" id="topic" value="sample-topic">
404 </p>
405 <p>
406 <label>Event:</label>
407 <input type="text" id="event" value="sample-event">
408 </p>
409 <p>
410 <button type="button" id="button">Subscribe to topic</button>
411 </p>
412 </fieldset>
413
414 <script>
415
416 const button = document.querySelector('#button');
417 const server = document.querySelector('#server');
418 const topic = document.querySelector('#topic');
419 const event = document.querySelector('#event');
420
421 button.addEventListener('click', async (evt) => {
422
423 let es = new EventSource(`${server.value}/stream/${topic.value}`);
424
425 es.addEventListener(event.value, function (evt) {
426 console.log(`incoming message`, JSON.parse(evt.data));
427 });
428
429 es.addEventListener('open', function (evt) {
430 console.log('connected', evt);
431 });
432
433 es.addEventListener('error', function (evt) {
434 console.log('error', evt);
435 });
436
437 });
438
439 </script>
440
441 </body>
442
443</html>
444```
445
446## Reading further
447
448- [Using server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
449- [Using SSE Instead Of WebSockets For Unidirectional Data Flow Over HTTP/2](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/)
450- [What is Server-Sent Events?](https://apifriends.com/api-streaming/server-sent-events/)
451- [An HTTP/2 extension for bidirectional messaging communication](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html)
452- [Introduction to HTTP/2](https://developers.google.com/web/fundamentals/performance/http2)
453- [The WebSocket API (WebSockets)](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API)
454