aboutsummaryrefslogtreecommitdiff
path: root/content
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2020-03-25 15:14:23 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2020-03-25 15:14:23 +0100
commit6b4550ab79b77f0889acad883bd7b5937c9c1299 (patch)
treeb2e9b8edd2952d9a83da517204ef164a1d0c7763 /content
parentd3534969bd96f6931c50fb5e6c39569257ff4883 (diff)
downloadmitjafelicijan.com-6b4550ab79b77f0889acad883bd7b5937c9c1299.tar.gz
Added new post - pubsub server
Diffstat (limited to 'content')
-rw-r--r--content/2020-03-25-simple-sse-based-pubsub-server.md398
1 files changed, 398 insertions, 0 deletions
diff --git a/content/2020-03-25-simple-sse-based-pubsub-server.md b/content/2020-03-25-simple-sse-based-pubsub-server.md
new file mode 100644
index 0000000..5706ad1
--- /dev/null
+++ b/content/2020-03-25-simple-sse-based-pubsub-server.md
@@ -0,0 +1,398 @@
1~ title: Simple Server-Sent Events based PubSub Server
2~ description: PubSub server made with Server-Sent Events
3~ slug: /simple-server-sent-events-based-pubsub-server.html
4~ date: 2020-03-25
5~ template: post
6~ hide: false
7
8## Before we continue ...
9
10Publisher Subscriber model is nothing new and there are many amazing solutions out there, so writing a new one would be a waste of time if other solutions wouldn't have quite complex install procedures and weren't so hard to maintain. But to be fair, comparing this simple server with something like [Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is laughable at the least. Those solutions are enterprise grade and have many mechanisms there to ensure messages aren't lost and much more. Regardless of these drawbacks, this method has been tested on a large website and worked until now without any problems. So now, that we got that cleared up, let's continue.
11
12***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all the subscribers to the topic.*
13
14## General goals
15
16- provide a simple server that relays messages to all the connected clients,
17- messages can be posted on specific topics,
18- messages get sent via [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) to all the subscribers.
19
20## How exactly does the pub/sub model work?
21
22The easiest way to explain this is with diagram bellow. Basic function is simple. We have subscribers that receive messages, and we have publishers that create and post messages. Similar model is also well know pattern that works on a premise of consumers and producers, and they take similar roles.
23
24![How PubSub works](/assets/simple-pubsub-server/pubsub-overview.png)
25
26**ⓘ** These are some naive characteristics we want to achieve:
27
28- producer is publishing messages to subscribe topic,
29- consumer is receiving messages from subscribed topic,
30- servers is also known as Broker,
31- broker does not store messages or tracks success,
32- broker uses [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method for delivering messages,
33- if consumer wants to receive messages from a topic, producer and consumer topics must match,
34- consumer can subscribe to multiple topics,
35- producer can publish to multiple topics,
36- each message has a messageId.
37
38**ⓘ** Known drawbacks:
39
40- messages will not be stored in a persistent queue or unreceived messages like [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old messages could be lost on server restart,
41- [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) opens a long-running connection between the client and the server so make sure if your setup is load balanced that the load balancer in this case can have long opened connection,
42- no system moderation due to the dynamic nature of creating queues.
43
44## Server-Sent Events
45
46Read more about it on [official specification page](https://html.spec.whatwg.org/multipage/server-sent-events.html).
47
48### Current browser support
49
50![Browser support](../assets/simple-pubsub-server/caniuse.png)
51
52Check [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) for latest information about browser support.
53
54### Known issues
55
56- Firefox 52 and below do not support EventSource in web/shared workers
57- In Firefox prior to version 36 server-sent events do not reconnect automatically in case of a connection interrupt (bug)
58- Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera 12+, Chrome 26+, Safari 7.0+.
59- Antivirus software may block the event streaming data chunks.
60
61Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource)
62
63### Message format
64
65The simplest message that can be sent is only with data attribute:
66
67```bash
68data: this is a simple message
69<blank line>
70```
71
72You can send message IDs to be used if the connection is dropped:
73
74```bash
75id: 33
76data: this is line one
77data: this is line two
78<blank line>
79```
80
81And you can specify your own event types (the above messages will all trigger the message event):
82
83```bash
84id: 36
85event: price
86data: 103.34
87<blank line>
88```
89
90### Server requirements
91
92The important thing is how you send headers and which headers are sent by the server that triggers browser to threat response as a EventStream.
93
94Headers responsible for this are:
95
96```bash
97Content-Type: text/event-stream
98Cache-Control: no-cache
99Connection: keep-alive
100```
101
102### Debugging with Google Chrome
103
104Google Chrome provides build-in debugging and exploration tool for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) which is quite nice and available from Developer Tools under Network tab.
105
106**ⓘ** You can debug only client side events that get received and not the server ones.
107
108![Google Chrome Developer Tools EventStream](../assets/simple-pubsub-server/chrome-debugging.png)
109
110## Server implementation
111
112For the sake of this example we will use [Node.js](https://nodejs.org/en/) with [Express](https://expressjs.com) as our router since this is the easiest way to get started and we will use already written SSE library for node [sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the wheel.
113
114```bash
115npm init --yes
116
117npm install express
118npm install body-parser
119npm install sse-pubsub
120```
121
122Basic implementation of a server (`server.js`):
123
124```js
125const express = require('express');
126const bodyParser = require('body-parser');
127const SSETopic = require('sse-pubsub');
128
129const app = express();
130const port = process.env.PORT || 4000;
131
132// topics container
133const sseTopics = {};
134
135app.use(bodyParser.json());
136
137// open for all cors
138app.all('*', (req, res, next) => {
139 res.header('Access-Control-Allow-Origin', '*');
140 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
141 next();
142});
143
144// preflight request error fix
145app.options('*', async (req, res) => {
146 res.header('Access-Control-Allow-Origin', '*');
147 res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type');
148 res.send('OK');
149});
150
151// serve the event streams
152app.get('/stream/:topic', async (req, res, next) => {
153 const topic = req.params.topic;
154
155 if (!(topic in sseTopics)) {
156 sseTopics[topic] = new SSETopic({
157 pingInterval: 0,
158 maxStreamDuration: 15000,
159 });
160 }
161
162 // subscribing client to topic
163 sseTopics[topic].subscribe(req, res);
164});
165
166// accepts new messages into topic
167app.post('/publish', async (req, res) => {
168 let body = req.body;
169 let status = 200;
170
171 console.log('Incoming message:', req.body);
172
173 if (
174 body.hasOwnProperty('topic') &&
175 body.hasOwnProperty('event') &&
176 body.hasOwnProperty('message')
177 ) {
178 const topic = req.body.topic;
179 const event = req.body.event;
180 const message = req.body.message;
181
182 if (topic in sseTopics) {
183 // sends message to all the subscribers
184 sseTopics[topic].publish(message, event);
185 }
186 } else {
187 status = 400;
188 }
189
190 res.status(status).send({
191 status,
192 });
193});
194
195// returns JSON object of all opened topics
196app.get('/status', async (req, res) => {
197 res.send(sseTopics);
198});
199
200// health-check endpoint
201app.get('/', async (req, res) => {
202 res.send('OK');
203});
204
205// return a 404 if no routes match
206app.use((req, res, next) => {
207 res.set('Cache-Control', 'private, no-store');
208 res.status(404).end('Not found');
209});
210
211// starts the server
212app.listen(port, () => {
213 console.log(`PubSub server running on http://localhost:${port}`);
214});
215```
216
217### Our custom message format
218
219Each message posted on a server must be in a specific format that out server accepts. Having structure like this allows us to have multiple separated type of events on each topic.
220
221With this we can separate streams and only receive events that belong to the topic.
222
223One example would be, that we have index page and we want to receive messages about new upvotes or new subscribers but we don't want to follow events for other pages. This reduces clutter and overall network. And structure is much nicer and maintanable.
224
225```json
226{
227 "topic": "sample-topic",
228 "event": "sample-event",
229 "message": { "name": "John" }
230}
231```
232
233## Publisher and subscriber clients
234
235### Publisher and subscriber in action
236
237<video src="/assets/simple-pubsub-server/clients.mp4" controls></video>
238
239You can download [the code](../assets/simple-pubsub-server/sse-pubsub-server.zip) and follow along.
240
241### Publisher
242
243As talked about above publisher is the one that send messages to the broker/server. Message inside the payload can be whatever you want (string, object, array). I would however personally avoid send large chunks of data like blobs and such.
244
245```html
246<!DOCTYPE html>
247<html lang="en">
248
249 <head>
250 <meta charset="UTF-8">
251 <meta name="viewport" content="width=device-width, initial-scale=1.0">
252 <title>Publisher</title>
253 </head>
254
255 <body>
256
257 <h1>Publisher</h1>
258
259 <fieldset>
260 <p>
261 <label>Server:</label>
262 <input type="text" id="server" value="http://localhost:4000">
263 </p>
264 <p>
265 <label>Topic:</label>
266 <input type="text" id="topic" value="sample-topic">
267 </p>
268 <p>
269 <label>Event:</label>
270 <input type="text" id="event" value="sample-event">
271 </p>
272 <p>
273 <label>Message:</label>
274 <input type="text" id="message" value='{"name": "John"}'>
275 </p>
276 <p>
277 <button type="button" id="button">Publish message to topic</button>
278 </p>
279 </fieldset>
280
281 <script>
282
283 const button = document.querySelector('#button');
284 const server = document.querySelector('#server');
285 const topic = document.querySelector('#topic');
286 const event = document.querySelector('#event');
287 const message = document.querySelector('#message');
288
289 button.addEventListener('click', async (evt) => {
290 const req = await fetch(`${server.value}/publish`, {
291 method: 'post',
292 headers: {
293 'Accept': 'application/json',
294 'Content-Type': 'application/json',
295 },
296 body: JSON.stringify({
297 topic: topic.value,
298 event: event.value,
299 message: JSON.parse(message.value),
300 }),
301 });
302
303 const res = await req.json();
304 console.log(res);
305 });
306
307 </script>
308
309 </body>
310
311</html>
312
313```
314
315### Subscriber
316
317Subscriber is responsible for receiving new messages that come from server via publisher. The code bellow is very rudimentary but works and follows the implementation guidelines for EventSource.
318
319You can use either Developer Tools Console to see incoming messages or you can defer to Debugging with Google Chrome section above to see all EventStream messages.
320
321**ⓘ** Don't be alarmed if the subscriber gets disconnected from the server every so often. The code we have here resets connection every 15s but it automatically get reconnected and fetches all messages up to last received message id. This setting can be adjusted in `server.js` file; search for the `maxStreamDuration` variable.
322
323```html
324<!DOCTYPE html>
325<html lang="en">
326
327 <head>
328 <meta charset="UTF-8">
329 <meta name="viewport" content="width=device-width, initial-scale=1.0">
330 <title>Subscriber</title>
331 <link rel="stylesheet" href="style.css">
332 </head>
333
334 <body>
335
336 <h1>Subscriber</h1>
337
338 <fieldset>
339 <p>
340 <label>Server:</label>
341 <input type="text" id="server" value="http://localhost:4000">
342 </p>
343 <p>
344 <label>Topic:</label>
345 <input type="text" id="topic" value="sample-topic">
346 </p>
347 <p>
348 <label>Event:</label>
349 <input type="text" id="event" value="sample-event">
350 </p>
351 <p>
352 <button type="button" id="button">Subscribe to topic</button>
353 </p>
354 </fieldset>
355
356 <script>
357
358 const button = document.querySelector('#button');
359 const server = document.querySelector('#server');
360 const topic = document.querySelector('#topic');
361 const event = document.querySelector('#event');
362
363 button.addEventListener('click', async (evt) => {
364
365 let es = new EventSource(`${server.value}/stream/${topic.value}`);
366
367 es.addEventListener(event.value, function (evt) {
368 console.log(`incoming message`, JSON.parse(evt.data));
369 });
370
371 es.addEventListener('open', function (evt) {
372 console.log('connected', evt);
373 });
374
375 es.addEventListener('error', function (evt) {
376 console.log('error', evt);
377 });
378
379 });
380
381 </script>
382
383 </body>
384
385</html>
386
387```
388
389
390
391## Reading further
392
393- [https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
394- [https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/)
395- [https://apifriends.com/api-streaming/server-sent-events/](https://apifriends.com/api-streaming/server-sent-events/)
396- [https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html)
397- [https://developers.google.com/web/fundamentals/performance/http2](https://developers.google.com/web/fundamentals/performance/http2)
398- [https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API)