aboutsummaryrefslogtreecommitdiff
path: root/_posts/2020-03-22-simple-sse-based-pubsub-server.md
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2023-11-01 22:54:27 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2023-11-01 22:54:27 +0100
commit2417a6b7603524dc5cd30d29b153f91024b9443d (patch)
tree9be5ea8e5baba96dd9159217da6badf6157fb595 /_posts/2020-03-22-simple-sse-based-pubsub-server.md
parent89ba3497f07a8ea43d209b583f39fcc286acc923 (diff)
downloadmitjafelicijan.com-2417a6b7603524dc5cd30d29b153f91024b9443d.tar.gz
Move to Jekyll
Diffstat (limited to '_posts/2020-03-22-simple-sse-based-pubsub-server.md')
-rw-r--r--_posts/2020-03-22-simple-sse-based-pubsub-server.md455
1 files changed, 455 insertions, 0 deletions
diff --git a/_posts/2020-03-22-simple-sse-based-pubsub-server.md b/_posts/2020-03-22-simple-sse-based-pubsub-server.md
new file mode 100644
index 0000000..23a3640
--- /dev/null
+++ b/_posts/2020-03-22-simple-sse-based-pubsub-server.md
@@ -0,0 +1,455 @@
1---
2title: Simple Server-Sent Events based PubSub Server
3permalink: /simple-server-sent-events-based-pubsub-server.html
4date: 2020-03-22T12:00:00+02:00
5layout: post
6type: post
7draft: false
8---
9
10## Before we continue ...
11
12Publisher Subscriber model is nothing new and there are many amazing solutions
13out there, so writing a new one would be a waste of time if other solutions
14wouldn't have quite complex install procedures and weren't so hard to maintain.
15But to be fair, comparing this simple server with something like
16[Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is
17laughable at the least. Those solutions are enterprise grade and have many
18mechanisms there to ensure messages aren't lost and much more. Regardless of
19these drawbacks, this method has been tested on a large website and worked until
20now without any problems. So now, that we got that cleared up, let's continue.
21
22***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a
23form of asynchronous service-to-service communication used in serverless and
24microservices architectures. In a pub/sub model, any message published to a
25topic is immediately received by all the subscribers to the topic.*
26
27## General goals
28
29- provide a simple server that relays messages to all the connected clients,
30- messages can be posted on specific topics,
31- messages get sent via [Server-Sent
32 Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
33 to all the subscribers.
34
35## How exactly does the pub/sub model work?
36
37The easiest way to explain this is with diagram bellow. Basic function is
38simple. We have subscribers that receive messages, and we have publishers that
39create and post messages. Similar model is also well know pattern that works on
40a premise of consumers and producers, and they take similar roles.
41
42![How PubSub works](/assets/posts/simple-pubsub-server/pubsub-overview.png)
43
44**These are some naive characteristics we want to achieve:**
45
46- producer is publishing messages to subscribe topic,
47- consumer is receiving messages from subscribed topic,
48- servers is also known as Broker,
49- broker does not store messages or tracks success,
50- broker uses
51 [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method
52 for delivering messages,
53- if consumer wants to receive messages from a topic, producer and consumer
54 topics must match,
55- consumer can subscribe to multiple topics,
56- producer can publish to multiple topics,
57- each message has a messageId.
58
59**Known drawbacks:**
60
61- messages will not be stored in a persistent queue or unreceived messages like
62 [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old
63 messages could be lost on server restart,
64- [Server-Sent
65 Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
66 opens a long-running connection between the client and the server so make sure
67 if your setup is load balanced that the load balancer in this case can have
68 long opened connection,
69- no system moderation due to the dynamic nature of creating queues.
70
71## Server-Sent Events
72
73Read more about it on [official specification
74page](https://html.spec.whatwg.org/multipage/server-sent-events.html).
75
76### Current browser support
77
78![Browser support](/assets/posts/simple-pubsub-server/caniuse.png)
79
80Check
81[https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
82for latest information about browser support.
83
84### Known issues
85
86- Firefox 52 and below do not support EventSource in web/shared workers
87- In Firefox prior to version 36 server-sent events do not reconnect
88 automatically in case of a connection interrupt (bug)
89- Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera
90 12+, Chrome 26+, Safari 7.0+.
91- Antivirus software may block the event streaming data chunks.
92
93Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
94
95### Message format
96
97The simplest message that can be sent is only with data attribute:
98
99```bash
100data: this is a simple message
101<blank line>
102```
103
104You can send message IDs to be used if the connection is dropped:
105
106```bash
107id: 33
108data: this is line one
109data: this is line two
110<blank line>
111```
112
113And you can specify your own event types (the above messages will all trigger
114the message event):
115
116```bash
117id: 36
118event: price
119data: 103.34
120<blank line>
121```
122
123### Server requirements
124
125The important thing is how you send headers and which headers are sent by the
126server that triggers browser to threat response as a EventStream.
127
128Headers responsible for this are:
129
130```bash
131Content-Type: text/event-stream
132Cache-Control: no-cache
133Connection: keep-alive
134```
135
136### Debugging with Google Chrome
137
138Google Chrome provides build-in debugging and exploration tool for [Server-Sent
139Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
140which is quite nice and available from Developer Tools under Network tab.
141
142> You can debug only client side events that get received and not the server
143> ones. For debugging server events add `console.log` to `server.js` code and
144> print out events.
145
146![Google Chrome Developer Tools EventStream](/assets/posts/simple-pubsub-server/chrome-debugging.png)
147
148## Server implementation
149
150For the sake of this example we will use [Node.js](https://nodejs.org/en/) with
151[Express](https://expressjs.com) as our router since this is the easiest way to
152get started and we will use already written SSE library for node
153[sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the
154wheel.
155
156```bash
157npm init --yes
158
159npm install express
160npm install body-parser
161npm install sse-pubsub
162```
163
164Basic implementation of a server (`server.js`):
165
166```js
167const express = require('express');
168const bodyParser = require('body-parser');
169const SSETopic = require('sse-pubsub');
170
171const app = express();
172const port = process.env.PORT || 4000;
173
174// topics container
175const sseTopics = {};
176
177app.use(bodyParser.json());
178
179// open for all cors
180app.all('*', (req, res, next) => {
181 res.header('Access-Control-Allow-Origin', '*');
182 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
183 next();
184});
185
186// preflight request error fix
187app.options('*', async (req, res) => {
188 res.header('Access-Control-Allow-Origin', '*');
189 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
190 res.send('OK');
191});
192
193// serve the event streams
194app.get('/stream/:topic', async (req, res, next) => {
195 const topic = req.params.topic;
196
197 if (!(topic in sseTopics)) {
198 sseTopics[topic] = new SSETopic({
199 pingInterval: 0,
200 maxStreamDuration: 15000,
201 });
202 }
203
204 // subscribing client to topic
205 sseTopics[topic].subscribe(req, res);
206});
207
208// accepts new messages into topic
209app.post('/publish', async (req, res) => {
210 let body = req.body;
211 let status = 200;
212
213 console.log('Incoming message:', req.body);
214
215 if (
216 body.hasOwnProperty('topic') &&
217 body.hasOwnProperty('event') &&
218 body.hasOwnProperty('message')
219 ) {
220 const topic = req.body.topic;
221 const event = req.body.event;
222 const message = req.body.message;
223
224 if (topic in sseTopics) {
225 // sends message to all the subscribers
226 sseTopics[topic].publish(message, event);
227 }
228 } else {
229 status = 400;
230 }
231
232 res.status(status).send({
233 status,
234 });
235});
236
237// returns JSON object of all opened topics
238app.get('/status', async (req, res) => {
239 res.send(sseTopics);
240});
241
242// health-check endpoint
243app.get('/', async (req, res) => {
244 res.send('OK');
245});
246
247// return a 404 if no routes match
248app.use((req, res, next) => {
249 res.set('Cache-Control', 'private, no-store');
250 res.status(404).end('Not found');
251});
252
253// starts the server
254app.listen(port, () => {
255 console.log(`PubSub server running on http://localhost:${port}`);
256});
257```
258
259### Our custom message format
260
261Each message posted on a server must be in a specific format that out server
262accepts. Having structure like this allows us to have multiple separated type of
263events on each topic.
264
265With this we can separate streams and only receive events that belong to the
266topic.
267
268One example would be, that we have index page and we want to receive messages
269about new upvotes or new subscribers but we don't want to follow events for
270other pages. This reduces clutter and overall network. And structure is much
271nicer and maintanable.
272
273```json
274{
275 "topic": "sample-topic",
276 "event": "sample-event",
277 "message": { "name": "John" }
278}
279```
280
281## Publisher and subscriber clients
282
283### Publisher and subscriber in action
284
285<video src="/assets/posts/simple-pubsub-server/clients.m4v" controls></video>
286
287You can download [the code](../simple-pubsub-server/sse-pubsub-server.zip) and
288follow along.
289
290### Publisher
291
292As talked about above publisher is the one that send messages to the
293broker/server. Message inside the payload can be whatever you want (string,
294object, array). I would however personally avoid send large chunks of data like
295blobs and such.
296
297```html
298<!DOCTYPE html>
299<html lang="en">
300
301 <head>
302 <meta charset="UTF-8">
303 <meta name="viewport" content="width=device-width, initial-scale=1.0">
304 <title>Publisher</title>
305 </head>
306
307 <body>
308
309 <h1>Publisher</h1>
310
311 <fieldset>
312 <p>
313 <label>Server:</label>
314 <input type="text" id="server" value="http://localhost:4000">
315 </p>
316 <p>
317 <label>Topic:</label>
318 <input type="text" id="topic" value="sample-topic">
319 </p>
320 <p>
321 <label>Event:</label>
322 <input type="text" id="event" value="sample-event">
323 </p>
324 <p>
325 <label>Message:</label>
326 <input type="text" id="message" value='{"name": "John"}'>
327 </p>
328 <p>
329 <button type="button" id="button">Publish message to topic</button>
330 </p>
331 </fieldset>
332
333 <script>
334
335 const button = document.querySelector('#button');
336 const server = document.querySelector('#server');
337 const topic = document.querySelector('#topic');
338 const event = document.querySelector('#event');
339 const message = document.querySelector('#message');
340
341 button.addEventListener('click', async (evt) => {
342 const req = await fetch(`${server.value}/publish`, {
343 method: 'post',
344 headers: {
345 'Accept': 'application/json',
346 'Content-Type': 'application/json',
347 },
348 body: JSON.stringify({
349 topic: topic.value,
350 event: event.value,
351 message: JSON.parse(message.value),
352 }),
353 });
354
355 const res = await req.json();
356 console.log(res);
357 });
358
359 </script>
360
361 </body>
362
363</html>
364```
365
366### Subscriber
367
368Subscriber is responsible for receiving new messages that come from server via
369publisher. The code bellow is very rudimentary but works and follows the
370implementation guidelines for EventSource.
371
372You can use either Developer Tools Console to see incoming messages or you can
373defer to Debugging with Google Chrome section above to see all EventStream
374messages.
375
376> Don't be alarmed if the subscriber gets disconnected from the server every so
377> often. The code we have here resets connection every 15s but it automatically
378> get reconnected and fetches all messages up to last received message id. This
379> setting can be adjusted in `server.js` file; search for the
380> `maxStreamDuration` variable.
381
382```html
383<!DOCTYPE html>
384<html lang="en">
385
386 <head>
387 <meta charset="UTF-8">
388 <meta name="viewport" content="width=device-width, initial-scale=1.0">
389 <title>Subscriber</title>
390 <link rel="stylesheet" href="style.css">
391 </head>
392
393 <body>
394
395 <h1>Subscriber</h1>
396
397 <fieldset>
398 <p>
399 <label>Server:</label>
400 <input type="text" id="server" value="http://localhost:4000">
401 </p>
402 <p>
403 <label>Topic:</label>
404 <input type="text" id="topic" value="sample-topic">
405 </p>
406 <p>
407 <label>Event:</label>
408 <input type="text" id="event" value="sample-event">
409 </p>
410 <p>
411 <button type="button" id="button">Subscribe to topic</button>
412 </p>
413 </fieldset>
414
415 <script>
416
417 const button = document.querySelector('#button');
418 const server = document.querySelector('#server');
419 const topic = document.querySelector('#topic');
420 const event = document.querySelector('#event');
421
422 button.addEventListener('click', async (evt) => {
423
424 let es = new EventSource(`${server.value}/stream/${topic.value}`);
425
426 es.addEventListener(event.value, function (evt) {
427 console.log(`incoming message`, JSON.parse(evt.data));
428 });
429
430 es.addEventListener('open', function (evt) {
431 console.log('connected', evt);
432 });
433
434 es.addEventListener('error', function (evt) {
435 console.log('error', evt);
436 });
437
438 });
439
440 </script>
441
442 </body>
443
444</html>
445```
446
447## Reading further
448
449- [Using server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
450- [Using SSE Instead Of WebSockets For Unidirectional Data Flow Over HTTP/2](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/)
451- [What is Server-Sent Events?](https://apifriends.com/api-streaming/server-sent-events/)
452- [An HTTP/2 extension for bidirectional messaging communication](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html)
453- [Introduction to HTTP/2](https://developers.google.com/web/fundamentals/performance/http2)
454- [The WebSocket API (WebSockets)](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API)
455