diff options
| author | Mitja Felicijan <m@mitjafelicijan.com> | 2023-07-12 18:35:08 +0200 |
|---|---|---|
| committer | Mitja Felicijan <m@mitjafelicijan.com> | 2023-07-12 18:35:08 +0200 |
| commit | 23a56bd50b04211da3cab45f72c3390711b2416b (patch) | |
| tree | ab9a4a0136b4cce06dba7d853e296f682f807dbb /content/posts/2020-03-22-simple-sse-based-pubsub-server.md | |
| parent | cecb4b48a39a3558979b9c4b50e45bf605a3684e (diff) | |
| download | mitjafelicijan.com-23a56bd50b04211da3cab45f72c3390711b2416b.tar.gz | |
Moved notes and posts into subfolders
Diffstat (limited to 'content/posts/2020-03-22-simple-sse-based-pubsub-server.md')
| -rw-r--r-- | content/posts/2020-03-22-simple-sse-based-pubsub-server.md | 454 |
1 files changed, 454 insertions, 0 deletions
diff --git a/content/posts/2020-03-22-simple-sse-based-pubsub-server.md b/content/posts/2020-03-22-simple-sse-based-pubsub-server.md new file mode 100644 index 0000000..8e46138 --- /dev/null +++ b/content/posts/2020-03-22-simple-sse-based-pubsub-server.md | |||
| @@ -0,0 +1,454 @@ | |||
| 1 | --- | ||
| 2 | title: Simple Server-Sent Events based PubSub Server | ||
| 3 | url: simple-server-sent-events-based-pubsub-server.html | ||
| 4 | date: 2020-03-22T12:00:00+02:00 | ||
| 5 | type: post | ||
| 6 | draft: false | ||
| 7 | --- | ||
| 8 | |||
| 9 | ## Before we continue ... | ||
| 10 | |||
| 11 | Publisher Subscriber model is nothing new and there are many amazing solutions | ||
| 12 | out there, so writing a new one would be a waste of time if other solutions | ||
| 13 | wouldn't have quite complex install procedures and weren't so hard to maintain. | ||
| 14 | But to be fair, comparing this simple server with something like | ||
| 15 | [Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is | ||
| 16 | laughable at the least. Those solutions are enterprise grade and have many | ||
| 17 | mechanisms there to ensure messages aren't lost and much more. Regardless of | ||
| 18 | these drawbacks, this method has been tested on a large website and worked until | ||
| 19 | now without any problems. So now, that we got that cleared up, let's continue. | ||
| 20 | |||
| 21 | ***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a | ||
| 22 | form of asynchronous service-to-service communication used in serverless and | ||
| 23 | microservices architectures. In a pub/sub model, any message published to a | ||
| 24 | topic is immediately received by all the subscribers to the topic.* | ||
| 25 | |||
| 26 | ## General goals | ||
| 27 | |||
| 28 | - provide a simple server that relays messages to all the connected clients, | ||
| 29 | - messages can be posted on specific topics, | ||
| 30 | - messages get sent via [Server-Sent | ||
| 31 | Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 32 | to all the subscribers. | ||
| 33 | |||
| 34 | ## How exactly does the pub/sub model work? | ||
| 35 | |||
| 36 | The easiest way to explain this is with diagram bellow. Basic function is | ||
| 37 | simple. We have subscribers that receive messages, and we have publishers that | ||
| 38 | create and post messages. Similar model is also well know pattern that works on | ||
| 39 | a premise of consumers and producers, and they take similar roles. | ||
| 40 | |||
| 41 |  | ||
| 42 | |||
| 43 | **These are some naive characteristics we want to achieve:** | ||
| 44 | |||
| 45 | - producer is publishing messages to subscribe topic, | ||
| 46 | - consumer is receiving messages from subscribed topic, | ||
| 47 | - servers is also known as Broker, | ||
| 48 | - broker does not store messages or tracks success, | ||
| 49 | - broker uses | ||
| 50 | [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method | ||
| 51 | for delivering messages, | ||
| 52 | - if consumer wants to receive messages from a topic, producer and consumer | ||
| 53 | topics must match, | ||
| 54 | - consumer can subscribe to multiple topics, | ||
| 55 | - producer can publish to multiple topics, | ||
| 56 | - each message has a messageId. | ||
| 57 | |||
| 58 | **Known drawbacks:** | ||
| 59 | |||
| 60 | - messages will not be stored in a persistent queue or unreceived messages like | ||
| 61 | [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old | ||
| 62 | messages could be lost on server restart, | ||
| 63 | - [Server-Sent | ||
| 64 | Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 65 | opens a long-running connection between the client and the server so make sure | ||
| 66 | if your setup is load balanced that the load balancer in this case can have | ||
| 67 | long opened connection, | ||
| 68 | - no system moderation due to the dynamic nature of creating queues. | ||
| 69 | |||
| 70 | ## Server-Sent Events | ||
| 71 | |||
| 72 | Read more about it on [official specification | ||
| 73 | page](https://html.spec.whatwg.org/multipage/server-sent-events.html). | ||
| 74 | |||
| 75 | ### Current browser support | ||
| 76 | |||
| 77 |  | ||
| 78 | |||
| 79 | Check | ||
| 80 | [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) | ||
| 81 | for latest information about browser support. | ||
| 82 | |||
| 83 | ### Known issues | ||
| 84 | |||
| 85 | - Firefox 52 and below do not support EventSource in web/shared workers | ||
| 86 | - In Firefox prior to version 36 server-sent events do not reconnect | ||
| 87 | automatically in case of a connection interrupt (bug) | ||
| 88 | - Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera | ||
| 89 | 12+, Chrome 26+, Safari 7.0+. | ||
| 90 | - Antivirus software may block the event streaming data chunks. | ||
| 91 | |||
| 92 | Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) | ||
| 93 | |||
| 94 | ### Message format | ||
| 95 | |||
| 96 | The simplest message that can be sent is only with data attribute: | ||
| 97 | |||
| 98 | ```bash | ||
| 99 | data: this is a simple message | ||
| 100 | <blank line> | ||
| 101 | ``` | ||
| 102 | |||
| 103 | You can send message IDs to be used if the connection is dropped: | ||
| 104 | |||
| 105 | ```bash | ||
| 106 | id: 33 | ||
| 107 | data: this is line one | ||
| 108 | data: this is line two | ||
| 109 | <blank line> | ||
| 110 | ``` | ||
| 111 | |||
| 112 | And you can specify your own event types (the above messages will all trigger | ||
| 113 | the message event): | ||
| 114 | |||
| 115 | ```bash | ||
| 116 | id: 36 | ||
| 117 | event: price | ||
| 118 | data: 103.34 | ||
| 119 | <blank line> | ||
| 120 | ``` | ||
| 121 | |||
| 122 | ### Server requirements | ||
| 123 | |||
| 124 | The important thing is how you send headers and which headers are sent by the | ||
| 125 | server that triggers browser to threat response as a EventStream. | ||
| 126 | |||
| 127 | Headers responsible for this are: | ||
| 128 | |||
| 129 | ```bash | ||
| 130 | Content-Type: text/event-stream | ||
| 131 | Cache-Control: no-cache | ||
| 132 | Connection: keep-alive | ||
| 133 | ``` | ||
| 134 | |||
| 135 | ### Debugging with Google Chrome | ||
| 136 | |||
| 137 | Google Chrome provides build-in debugging and exploration tool for [Server-Sent | ||
| 138 | Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 139 | which is quite nice and available from Developer Tools under Network tab. | ||
| 140 | |||
| 141 | > You can debug only client side events that get received and not the server | ||
| 142 | > ones. For debugging server events add `console.log` to `server.js` code and | ||
| 143 | > print out events. | ||
| 144 | |||
| 145 |  | ||
| 146 | |||
| 147 | ## Server implementation | ||
| 148 | |||
| 149 | For the sake of this example we will use [Node.js](https://nodejs.org/en/) with | ||
| 150 | [Express](https://expressjs.com) as our router since this is the easiest way to | ||
| 151 | get started and we will use already written SSE library for node | ||
| 152 | [sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the | ||
| 153 | wheel. | ||
| 154 | |||
| 155 | ```bash | ||
| 156 | npm init --yes | ||
| 157 | |||
| 158 | npm install express | ||
| 159 | npm install body-parser | ||
| 160 | npm install sse-pubsub | ||
| 161 | ``` | ||
| 162 | |||
| 163 | Basic implementation of a server (`server.js`): | ||
| 164 | |||
| 165 | ```js | ||
| 166 | const express = require('express'); | ||
| 167 | const bodyParser = require('body-parser'); | ||
| 168 | const SSETopic = require('sse-pubsub'); | ||
| 169 | |||
| 170 | const app = express(); | ||
| 171 | const port = process.env.PORT || 4000; | ||
| 172 | |||
| 173 | // topics container | ||
| 174 | const sseTopics = {}; | ||
| 175 | |||
| 176 | app.use(bodyParser.json()); | ||
| 177 | |||
| 178 | // open for all cors | ||
| 179 | app.all('*', (req, res, next) => { | ||
| 180 | res.header('Access-Control-Allow-Origin', '*'); | ||
| 181 | res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type'); | ||
| 182 | next(); | ||
| 183 | }); | ||
| 184 | |||
| 185 | // preflight request error fix | ||
| 186 | app.options('*', async (req, res) => { | ||
| 187 | res.header('Access-Control-Allow-Origin', '*'); | ||
| 188 | res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type'); | ||
| 189 | res.send('OK'); | ||
| 190 | }); | ||
| 191 | |||
| 192 | // serve the event streams | ||
| 193 | app.get('/stream/:topic', async (req, res, next) => { | ||
| 194 | const topic = req.params.topic; | ||
| 195 | |||
| 196 | if (!(topic in sseTopics)) { | ||
| 197 | sseTopics[topic] = new SSETopic({ | ||
| 198 | pingInterval: 0, | ||
| 199 | maxStreamDuration: 15000, | ||
| 200 | }); | ||
| 201 | } | ||
| 202 | |||
| 203 | // subscribing client to topic | ||
| 204 | sseTopics[topic].subscribe(req, res); | ||
| 205 | }); | ||
| 206 | |||
| 207 | // accepts new messages into topic | ||
| 208 | app.post('/publish', async (req, res) => { | ||
| 209 | let body = req.body; | ||
| 210 | let status = 200; | ||
| 211 | |||
| 212 | console.log('Incoming message:', req.body); | ||
| 213 | |||
| 214 | if ( | ||
| 215 | body.hasOwnProperty('topic') && | ||
| 216 | body.hasOwnProperty('event') && | ||
| 217 | body.hasOwnProperty('message') | ||
| 218 | ) { | ||
| 219 | const topic = req.body.topic; | ||
| 220 | const event = req.body.event; | ||
| 221 | const message = req.body.message; | ||
| 222 | |||
| 223 | if (topic in sseTopics) { | ||
| 224 | // sends message to all the subscribers | ||
| 225 | sseTopics[topic].publish(message, event); | ||
| 226 | } | ||
| 227 | } else { | ||
| 228 | status = 400; | ||
| 229 | } | ||
| 230 | |||
| 231 | res.status(status).send({ | ||
| 232 | status, | ||
| 233 | }); | ||
| 234 | }); | ||
| 235 | |||
| 236 | // returns JSON object of all opened topics | ||
| 237 | app.get('/status', async (req, res) => { | ||
| 238 | res.send(sseTopics); | ||
| 239 | }); | ||
| 240 | |||
| 241 | // health-check endpoint | ||
| 242 | app.get('/', async (req, res) => { | ||
| 243 | res.send('OK'); | ||
| 244 | }); | ||
| 245 | |||
| 246 | // return a 404 if no routes match | ||
| 247 | app.use((req, res, next) => { | ||
| 248 | res.set('Cache-Control', 'private, no-store'); | ||
| 249 | res.status(404).end('Not found'); | ||
| 250 | }); | ||
| 251 | |||
| 252 | // starts the server | ||
| 253 | app.listen(port, () => { | ||
| 254 | console.log(`PubSub server running on http://localhost:${port}`); | ||
| 255 | }); | ||
| 256 | ``` | ||
| 257 | |||
| 258 | ### Our custom message format | ||
| 259 | |||
| 260 | Each message posted on a server must be in a specific format that out server | ||
| 261 | accepts. Having structure like this allows us to have multiple separated type of | ||
| 262 | events on each topic. | ||
| 263 | |||
| 264 | With this we can separate streams and only receive events that belong to the | ||
| 265 | topic. | ||
| 266 | |||
| 267 | One example would be, that we have index page and we want to receive messages | ||
| 268 | about new upvotes or new subscribers but we don't want to follow events for | ||
| 269 | other pages. This reduces clutter and overall network. And structure is much | ||
| 270 | nicer and maintanable. | ||
| 271 | |||
| 272 | ```json | ||
| 273 | { | ||
| 274 | "topic": "sample-topic", | ||
| 275 | "event": "sample-event", | ||
| 276 | "message": { "name": "John" } | ||
| 277 | } | ||
| 278 | ``` | ||
| 279 | |||
| 280 | ## Publisher and subscriber clients | ||
| 281 | |||
| 282 | ### Publisher and subscriber in action | ||
| 283 | |||
| 284 | <video src="/assets/simple-pubsub-server/clients.m4v" controls></video> | ||
| 285 | |||
| 286 | You can download [the code](../simple-pubsub-server/sse-pubsub-server.zip) and | ||
| 287 | follow along. | ||
| 288 | |||
| 289 | ### Publisher | ||
| 290 | |||
| 291 | As talked about above publisher is the one that send messages to the | ||
| 292 | broker/server. Message inside the payload can be whatever you want (string, | ||
| 293 | object, array). I would however personally avoid send large chunks of data like | ||
| 294 | blobs and such. | ||
| 295 | |||
| 296 | ```html | ||
| 297 | <!DOCTYPE html> | ||
| 298 | <html lang="en"> | ||
| 299 | |||
| 300 | <head> | ||
| 301 | <meta charset="UTF-8"> | ||
| 302 | <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||
| 303 | <title>Publisher</title> | ||
| 304 | </head> | ||
| 305 | |||
| 306 | <body> | ||
| 307 | |||
| 308 | <h1>Publisher</h1> | ||
| 309 | |||
| 310 | <fieldset> | ||
| 311 | <p> | ||
| 312 | <label>Server:</label> | ||
| 313 | <input type="text" id="server" value="http://localhost:4000"> | ||
| 314 | </p> | ||
| 315 | <p> | ||
| 316 | <label>Topic:</label> | ||
| 317 | <input type="text" id="topic" value="sample-topic"> | ||
| 318 | </p> | ||
| 319 | <p> | ||
| 320 | <label>Event:</label> | ||
| 321 | <input type="text" id="event" value="sample-event"> | ||
| 322 | </p> | ||
| 323 | <p> | ||
| 324 | <label>Message:</label> | ||
| 325 | <input type="text" id="message" value='{"name": "John"}'> | ||
| 326 | </p> | ||
| 327 | <p> | ||
| 328 | <button type="button" id="button">Publish message to topic</button> | ||
| 329 | </p> | ||
| 330 | </fieldset> | ||
| 331 | |||
| 332 | <script> | ||
| 333 | |||
| 334 | const button = document.querySelector('#button'); | ||
| 335 | const server = document.querySelector('#server'); | ||
| 336 | const topic = document.querySelector('#topic'); | ||
| 337 | const event = document.querySelector('#event'); | ||
| 338 | const message = document.querySelector('#message'); | ||
| 339 | |||
| 340 | button.addEventListener('click', async (evt) => { | ||
| 341 | const req = await fetch(`${server.value}/publish`, { | ||
| 342 | method: 'post', | ||
| 343 | headers: { | ||
| 344 | 'Accept': 'application/json', | ||
| 345 | 'Content-Type': 'application/json', | ||
| 346 | }, | ||
| 347 | body: JSON.stringify({ | ||
| 348 | topic: topic.value, | ||
| 349 | event: event.value, | ||
| 350 | message: JSON.parse(message.value), | ||
| 351 | }), | ||
| 352 | }); | ||
| 353 | |||
| 354 | const res = await req.json(); | ||
| 355 | console.log(res); | ||
| 356 | }); | ||
| 357 | |||
| 358 | </script> | ||
| 359 | |||
| 360 | </body> | ||
| 361 | |||
| 362 | </html> | ||
| 363 | ``` | ||
| 364 | |||
| 365 | ### Subscriber | ||
| 366 | |||
| 367 | Subscriber is responsible for receiving new messages that come from server via | ||
| 368 | publisher. The code bellow is very rudimentary but works and follows the | ||
| 369 | implementation guidelines for EventSource. | ||
| 370 | |||
| 371 | You can use either Developer Tools Console to see incoming messages or you can | ||
| 372 | defer to Debugging with Google Chrome section above to see all EventStream | ||
| 373 | messages. | ||
| 374 | |||
| 375 | > Don't be alarmed if the subscriber gets disconnected from the server every so | ||
| 376 | > often. The code we have here resets connection every 15s but it automatically | ||
| 377 | > get reconnected and fetches all messages up to last received message id. This | ||
| 378 | > setting can be adjusted in `server.js` file; search for the | ||
| 379 | > `maxStreamDuration` variable. | ||
| 380 | |||
| 381 | ```html | ||
| 382 | <!DOCTYPE html> | ||
| 383 | <html lang="en"> | ||
| 384 | |||
| 385 | <head> | ||
| 386 | <meta charset="UTF-8"> | ||
| 387 | <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||
| 388 | <title>Subscriber</title> | ||
| 389 | <link rel="stylesheet" href="style.css"> | ||
| 390 | </head> | ||
| 391 | |||
| 392 | <body> | ||
| 393 | |||
| 394 | <h1>Subscriber</h1> | ||
| 395 | |||
| 396 | <fieldset> | ||
| 397 | <p> | ||
| 398 | <label>Server:</label> | ||
| 399 | <input type="text" id="server" value="http://localhost:4000"> | ||
| 400 | </p> | ||
| 401 | <p> | ||
| 402 | <label>Topic:</label> | ||
| 403 | <input type="text" id="topic" value="sample-topic"> | ||
| 404 | </p> | ||
| 405 | <p> | ||
| 406 | <label>Event:</label> | ||
| 407 | <input type="text" id="event" value="sample-event"> | ||
| 408 | </p> | ||
| 409 | <p> | ||
| 410 | <button type="button" id="button">Subscribe to topic</button> | ||
| 411 | </p> | ||
| 412 | </fieldset> | ||
| 413 | |||
| 414 | <script> | ||
| 415 | |||
| 416 | const button = document.querySelector('#button'); | ||
| 417 | const server = document.querySelector('#server'); | ||
| 418 | const topic = document.querySelector('#topic'); | ||
| 419 | const event = document.querySelector('#event'); | ||
| 420 | |||
| 421 | button.addEventListener('click', async (evt) => { | ||
| 422 | |||
| 423 | let es = new EventSource(`${server.value}/stream/${topic.value}`); | ||
| 424 | |||
| 425 | es.addEventListener(event.value, function (evt) { | ||
| 426 | console.log(`incoming message`, JSON.parse(evt.data)); | ||
| 427 | }); | ||
| 428 | |||
| 429 | es.addEventListener('open', function (evt) { | ||
| 430 | console.log('connected', evt); | ||
| 431 | }); | ||
| 432 | |||
| 433 | es.addEventListener('error', function (evt) { | ||
| 434 | console.log('error', evt); | ||
| 435 | }); | ||
| 436 | |||
| 437 | }); | ||
| 438 | |||
| 439 | </script> | ||
| 440 | |||
| 441 | </body> | ||
| 442 | |||
| 443 | </html> | ||
| 444 | ``` | ||
| 445 | |||
| 446 | ## Reading further | ||
| 447 | |||
| 448 | - [Using server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 449 | - [Using SSE Instead Of WebSockets For Unidirectional Data Flow Over HTTP/2](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/) | ||
| 450 | - [What is Server-Sent Events?](https://apifriends.com/api-streaming/server-sent-events/) | ||
| 451 | - [An HTTP/2 extension for bidirectional messaging communication](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html) | ||
| 452 | - [Introduction to HTTP/2](https://developers.google.com/web/fundamentals/performance/http2) | ||
| 453 | - [The WebSocket API (WebSockets)](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) | ||
| 454 | |||
