aboutsummaryrefslogtreecommitdiff
path: root/content/posts/2020-03-22-simple-sse-based-pubsub-server.md
diff options
context:
space:
mode:
authorMitja Felicijan <m@mitjafelicijan.com>2023-07-08 23:25:41 +0200
committerMitja Felicijan <m@mitjafelicijan.com>2023-07-08 23:25:41 +0200
commitcd6644ea4ddc78597934ab0ef5ba50e3c3daa927 (patch)
tree03de331a8db6386dfd6fa75155bfbcea6b4feaf3 /content/posts/2020-03-22-simple-sse-based-pubsub-server.md
parent84ed124529ffeee1590295b8de3a8faf51848680 (diff)
downloadmitjafelicijan.com-cd6644ea4ddc78597934ab0ef5ba50e3c3daa927.tar.gz
Moved to a simpler SSG
Diffstat (limited to 'content/posts/2020-03-22-simple-sse-based-pubsub-server.md')
-rw-r--r--content/posts/2020-03-22-simple-sse-based-pubsub-server.md453
1 files changed, 0 insertions, 453 deletions
diff --git a/content/posts/2020-03-22-simple-sse-based-pubsub-server.md b/content/posts/2020-03-22-simple-sse-based-pubsub-server.md
deleted file mode 100644
index 60745d0..0000000
--- a/content/posts/2020-03-22-simple-sse-based-pubsub-server.md
+++ /dev/null
@@ -1,453 +0,0 @@
1---
2title: Simple Server-Sent Events based PubSub Server
3url: simple-server-sent-events-based-pubsub-server.html
4date: 2020-03-22T12:00:00+02:00
5draft: false
6---
7
8## Before we continue ...
9
10Publisher Subscriber model is nothing new and there are many amazing solutions
11out there, so writing a new one would be a waste of time if other solutions
12wouldn't have quite complex install procedures and weren't so hard to maintain.
13But to be fair, comparing this simple server with something like
14[Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is
15laughable at the least. Those solutions are enterprise grade and have many
16mechanisms there to ensure messages aren't lost and much more. Regardless of
17these drawbacks, this method has been tested on a large website and worked until
18now without any problems. So now, that we got that cleared up, let's continue.
19
20***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a
21form of asynchronous service-to-service communication used in serverless and
22microservices architectures. In a pub/sub model, any message published to a
23topic is immediately received by all the subscribers to the topic.*
24
25## General goals
26
27- provide a simple server that relays messages to all the connected clients,
28- messages can be posted on specific topics,
29- messages get sent via [Server-Sent
30 Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
31 to all the subscribers.
32
33## How exactly does the pub/sub model work?
34
35The easiest way to explain this is with diagram bellow. Basic function is
36simple. We have subscribers that receive messages, and we have publishers that
37create and post messages. Similar model is also well know pattern that works on
38a premise of consumers and producers, and they take similar roles.
39
40![How PubSub works](/assets/simple-pubsub-server/pubsub-overview.png)
41
42**These are some naive characteristics we want to achieve:**
43
44- producer is publishing messages to subscribe topic,
45- consumer is receiving messages from subscribed topic,
46- servers is also known as Broker,
47- broker does not store messages or tracks success,
48- broker uses
49 [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method
50 for delivering messages,
51- if consumer wants to receive messages from a topic, producer and consumer
52 topics must match,
53- consumer can subscribe to multiple topics,
54- producer can publish to multiple topics,
55- each message has a messageId.
56
57**Known drawbacks:**
58
59- messages will not be stored in a persistent queue or unreceived messages like
60 [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old
61 messages could be lost on server restart,
62- [Server-Sent
63 Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
64 opens a long-running connection between the client and the server so make sure
65 if your setup is load balanced that the load balancer in this case can have
66 long opened connection,
67- no system moderation due to the dynamic nature of creating queues.
68
69## Server-Sent Events
70
71Read more about it on [official specification
72page](https://html.spec.whatwg.org/multipage/server-sent-events.html).
73
74### Current browser support
75
76![Browser support](/assets/simple-pubsub-server/caniuse.png)
77
78Check
79[https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
80for latest information about browser support.
81
82### Known issues
83
84- Firefox 52 and below do not support EventSource in web/shared workers
85- In Firefox prior to version 36 server-sent events do not reconnect
86 automatically in case of a connection interrupt (bug)
87- Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera
88 12+, Chrome 26+, Safari 7.0+.
89- Antivirus software may block the event streaming data chunks.
90
91Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
92
93### Message format
94
95The simplest message that can be sent is only with data attribute:
96
97```bash
98data: this is a simple message
99<blank line>
100```
101
102You can send message IDs to be used if the connection is dropped:
103
104```bash
105id: 33
106data: this is line one
107data: this is line two
108<blank line>
109```
110
111And you can specify your own event types (the above messages will all trigger
112the message event):
113
114```bash
115id: 36
116event: price
117data: 103.34
118<blank line>
119```
120
121### Server requirements
122
123The important thing is how you send headers and which headers are sent by the
124server that triggers browser to threat response as a EventStream.
125
126Headers responsible for this are:
127
128```bash
129Content-Type: text/event-stream
130Cache-Control: no-cache
131Connection: keep-alive
132```
133
134### Debugging with Google Chrome
135
136Google Chrome provides build-in debugging and exploration tool for [Server-Sent
137Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
138which is quite nice and available from Developer Tools under Network tab.
139
140> You can debug only client side events that get received and not the server
141> ones. For debugging server events add `console.log` to `server.js` code and
142> print out events.
143
144![Google Chrome Developer Tools EventStream](/assets/simple-pubsub-server/chrome-debugging.png)
145
146## Server implementation
147
148For the sake of this example we will use [Node.js](https://nodejs.org/en/) with
149[Express](https://expressjs.com) as our router since this is the easiest way to
150get started and we will use already written SSE library for node
151[sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the
152wheel.
153
154```bash
155npm init --yes
156
157npm install express
158npm install body-parser
159npm install sse-pubsub
160```
161
162Basic implementation of a server (`server.js`):
163
164```js
165const express = require('express');
166const bodyParser = require('body-parser');
167const SSETopic = require('sse-pubsub');
168
169const app = express();
170const port = process.env.PORT || 4000;
171
172// topics container
173const sseTopics = {};
174
175app.use(bodyParser.json());
176
177// open for all cors
178app.all('*', (req, res, next) => {
179 res.header('Access-Control-Allow-Origin', '*');
180 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
181 next();
182});
183
184// preflight request error fix
185app.options('*', async (req, res) => {
186 res.header('Access-Control-Allow-Origin', '*');
187 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
188 res.send('OK');
189});
190
191// serve the event streams
192app.get('/stream/:topic', async (req, res, next) => {
193 const topic = req.params.topic;
194
195 if (!(topic in sseTopics)) {
196 sseTopics[topic] = new SSETopic({
197 pingInterval: 0,
198 maxStreamDuration: 15000,
199 });
200 }
201
202 // subscribing client to topic
203 sseTopics[topic].subscribe(req, res);
204});
205
206// accepts new messages into topic
207app.post('/publish', async (req, res) => {
208 let body = req.body;
209 let status = 200;
210
211 console.log('Incoming message:', req.body);
212
213 if (
214 body.hasOwnProperty('topic') &&
215 body.hasOwnProperty('event') &&
216 body.hasOwnProperty('message')
217 ) {
218 const topic = req.body.topic;
219 const event = req.body.event;
220 const message = req.body.message;
221
222 if (topic in sseTopics) {
223 // sends message to all the subscribers
224 sseTopics[topic].publish(message, event);
225 }
226 } else {
227 status = 400;
228 }
229
230 res.status(status).send({
231 status,
232 });
233});
234
235// returns JSON object of all opened topics
236app.get('/status', async (req, res) => {
237 res.send(sseTopics);
238});
239
240// health-check endpoint
241app.get('/', async (req, res) => {
242 res.send('OK');
243});
244
245// return a 404 if no routes match
246app.use((req, res, next) => {
247 res.set('Cache-Control', 'private, no-store');
248 res.status(404).end('Not found');
249});
250
251// starts the server
252app.listen(port, () => {
253 console.log(`PubSub server running on http://localhost:${port}`);
254});
255```
256
257### Our custom message format
258
259Each message posted on a server must be in a specific format that out server
260accepts. Having structure like this allows us to have multiple separated type of
261events on each topic.
262
263With this we can separate streams and only receive events that belong to the
264topic.
265
266One example would be, that we have index page and we want to receive messages
267about new upvotes or new subscribers but we don't want to follow events for
268other pages. This reduces clutter and overall network. And structure is much
269nicer and maintanable.
270
271```json
272{
273 "topic": "sample-topic",
274 "event": "sample-event",
275 "message": { "name": "John" }
276}
277```
278
279## Publisher and subscriber clients
280
281### Publisher and subscriber in action
282
283<video src="/assets/simple-pubsub-server/clients.m4v" controls></video>
284
285You can download [the code](../simple-pubsub-server/sse-pubsub-server.zip) and
286follow along.
287
288### Publisher
289
290As talked about above publisher is the one that send messages to the
291broker/server. Message inside the payload can be whatever you want (string,
292object, array). I would however personally avoid send large chunks of data like
293blobs and such.
294
295```html
296<!DOCTYPE html>
297<html lang="en">
298
299 <head>
300 <meta charset="UTF-8">
301 <meta name="viewport" content="width=device-width, initial-scale=1.0">
302 <title>Publisher</title>
303 </head>
304
305 <body>
306
307 <h1>Publisher</h1>
308
309 <fieldset>
310 <p>
311 <label>Server:</label>
312 <input type="text" id="server" value="http://localhost:4000">
313 </p>
314 <p>
315 <label>Topic:</label>
316 <input type="text" id="topic" value="sample-topic">
317 </p>
318 <p>
319 <label>Event:</label>
320 <input type="text" id="event" value="sample-event">
321 </p>
322 <p>
323 <label>Message:</label>
324 <input type="text" id="message" value='{"name": "John"}'>
325 </p>
326 <p>
327 <button type="button" id="button">Publish message to topic</button>
328 </p>
329 </fieldset>
330
331 <script>
332
333 const button = document.querySelector('#button');
334 const server = document.querySelector('#server');
335 const topic = document.querySelector('#topic');
336 const event = document.querySelector('#event');
337 const message = document.querySelector('#message');
338
339 button.addEventListener('click', async (evt) => {
340 const req = await fetch(`${server.value}/publish`, {
341 method: 'post',
342 headers: {
343 'Accept': 'application/json',
344 'Content-Type': 'application/json',
345 },
346 body: JSON.stringify({
347 topic: topic.value,
348 event: event.value,
349 message: JSON.parse(message.value),
350 }),
351 });
352
353 const res = await req.json();
354 console.log(res);
355 });
356
357 </script>
358
359 </body>
360
361</html>
362```
363
364### Subscriber
365
366Subscriber is responsible for receiving new messages that come from server via
367publisher. The code bellow is very rudimentary but works and follows the
368implementation guidelines for EventSource.
369
370You can use either Developer Tools Console to see incoming messages or you can
371defer to Debugging with Google Chrome section above to see all EventStream
372messages.
373
374> Don't be alarmed if the subscriber gets disconnected from the server every so
375> often. The code we have here resets connection every 15s but it automatically
376> get reconnected and fetches all messages up to last received message id. This
377> setting can be adjusted in `server.js` file; search for the
378> `maxStreamDuration` variable.
379
380```html
381<!DOCTYPE html>
382<html lang="en">
383
384 <head>
385 <meta charset="UTF-8">
386 <meta name="viewport" content="width=device-width, initial-scale=1.0">
387 <title>Subscriber</title>
388 <link rel="stylesheet" href="style.css">
389 </head>
390
391 <body>
392
393 <h1>Subscriber</h1>
394
395 <fieldset>
396 <p>
397 <label>Server:</label>
398 <input type="text" id="server" value="http://localhost:4000">
399 </p>
400 <p>
401 <label>Topic:</label>
402 <input type="text" id="topic" value="sample-topic">
403 </p>
404 <p>
405 <label>Event:</label>
406 <input type="text" id="event" value="sample-event">
407 </p>
408 <p>
409 <button type="button" id="button">Subscribe to topic</button>
410 </p>
411 </fieldset>
412
413 <script>
414
415 const button = document.querySelector('#button');
416 const server = document.querySelector('#server');
417 const topic = document.querySelector('#topic');
418 const event = document.querySelector('#event');
419
420 button.addEventListener('click', async (evt) => {
421
422 let es = new EventSource(`${server.value}/stream/${topic.value}`);
423
424 es.addEventListener(event.value, function (evt) {
425 console.log(`incoming message`, JSON.parse(evt.data));
426 });
427
428 es.addEventListener('open', function (evt) {
429 console.log('connected', evt);
430 });
431
432 es.addEventListener('error', function (evt) {
433 console.log('error', evt);
434 });
435
436 });
437
438 </script>
439
440 </body>
441
442</html>
443```
444
445## Reading further
446
447- [Using server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
448- [Using SSE Instead Of WebSockets For Unidirectional Data Flow Over HTTP/2](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/)
449- [What is Server-Sent Events?](https://apifriends.com/api-streaming/server-sent-events/)
450- [An HTTP/2 extension for bidirectional messaging communication](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html)
451- [Introduction to HTTP/2](https://developers.google.com/web/fundamentals/performance/http2)
452- [The WebSocket API (WebSockets)](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API)
453