diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2024-02-23 10:35:22 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2024-02-23 10:35:22 +0100 |
| commit | 4abcce013c9ee3053badf2abda77190233066676 (patch) | |
| tree | 450de7e8fed3c3c7501a9d2e2eb60a676bdfa09e /_posts/2020-03-22-simple-sse-based-pubsub-server.md | |
| parent | cdf50cb2e3051200c6ea0628c318d66220b7d1a1 (diff) | |
| download | mitjafelicijan.com-4abcce013c9ee3053badf2abda77190233066676.tar.gz | |
Testing thoughts page
Diffstat (limited to '_posts/2020-03-22-simple-sse-based-pubsub-server.md')
| -rw-r--r-- | _posts/2020-03-22-simple-sse-based-pubsub-server.md | 455 |
1 files changed, 0 insertions, 455 deletions
diff --git a/_posts/2020-03-22-simple-sse-based-pubsub-server.md b/_posts/2020-03-22-simple-sse-based-pubsub-server.md deleted file mode 100644 index ffb7285..0000000 --- a/_posts/2020-03-22-simple-sse-based-pubsub-server.md +++ /dev/null | |||
| @@ -1,455 +0,0 @@ | |||
| 1 | --- | ||
| 2 | title: Simple Server-Sent Events based PubSub Server | ||
| 3 | permalink: /simple-server-sent-events-based-pubsub-server.html | ||
| 4 | date: 2020-03-22T12:00:00+02:00 | ||
| 5 | layout: post | ||
| 6 | type: post | ||
| 7 | draft: false | ||
| 8 | --- | ||
| 9 | |||
| 10 | ## Before we continue ... | ||
| 11 | |||
| 12 | Publisher Subscriber model is nothing new and there are many amazing solutions | ||
| 13 | out there, so writing a new one would be a waste of time if other solutions | ||
| 14 | wouldn't have quite complex install procedures and weren't so hard to maintain. | ||
| 15 | But to be fair, comparing this simple server with something like | ||
| 16 | [Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is | ||
| 17 | laughable at the least. Those solutions are enterprise grade and have many | ||
| 18 | mechanisms there to ensure messages aren't lost and much more. Regardless of | ||
| 19 | these drawbacks, this method has been tested on a large website and worked until | ||
| 20 | now without any problems. So now, that we got that cleared up, let's continue. | ||
| 21 | |||
| 22 | ***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a | ||
| 23 | form of asynchronous service-to-service communication used in serverless and | ||
| 24 | microservices architectures. In a pub/sub model, any message published to a | ||
| 25 | topic is immediately received by all the subscribers to the topic.* | ||
| 26 | |||
| 27 | ## General goals | ||
| 28 | |||
| 29 | - provide a simple server that relays messages to all the connected clients, | ||
| 30 | - messages can be posted on specific topics, | ||
| 31 | - messages get sent via [Server-Sent | ||
| 32 | Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 33 | to all the subscribers. | ||
| 34 | |||
| 35 | ## How exactly does the pub/sub model work? | ||
| 36 | |||
| 37 | The easiest way to explain this is with diagram bellow. Basic function is | ||
| 38 | simple. We have subscribers that receive messages, and we have publishers that | ||
| 39 | create and post messages. Similar model is also well know pattern that works on | ||
| 40 | a premise of consumers and producers, and they take similar roles. | ||
| 41 | |||
| 42 | {:loading="lazy"} | ||
| 43 | |||
| 44 | **These are some naive characteristics we want to achieve:** | ||
| 45 | |||
| 46 | - producer is publishing messages to subscribe topic, | ||
| 47 | - consumer is receiving messages from subscribed topic, | ||
| 48 | - servers is also known as Broker, | ||
| 49 | - broker does not store messages or tracks success, | ||
| 50 | - broker uses | ||
| 51 | [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method | ||
| 52 | for delivering messages, | ||
| 53 | - if consumer wants to receive messages from a topic, producer and consumer | ||
| 54 | topics must match, | ||
| 55 | - consumer can subscribe to multiple topics, | ||
| 56 | - producer can publish to multiple topics, | ||
| 57 | - each message has a messageId. | ||
| 58 | |||
| 59 | **Known drawbacks:** | ||
| 60 | |||
| 61 | - messages will not be stored in a persistent queue or unreceived messages like | ||
| 62 | [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old | ||
| 63 | messages could be lost on server restart, | ||
| 64 | - [Server-Sent | ||
| 65 | Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 66 | opens a long-running connection between the client and the server so make sure | ||
| 67 | if your setup is load balanced that the load balancer in this case can have | ||
| 68 | long opened connection, | ||
| 69 | - no system moderation due to the dynamic nature of creating queues. | ||
| 70 | |||
| 71 | ## Server-Sent Events | ||
| 72 | |||
| 73 | Read more about it on [official specification | ||
| 74 | page](https://html.spec.whatwg.org/multipage/server-sent-events.html). | ||
| 75 | |||
| 76 | ### Current browser support | ||
| 77 | |||
| 78 | {:loading="lazy"} | ||
| 79 | |||
| 80 | Check | ||
| 81 | [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) | ||
| 82 | for latest information about browser support. | ||
| 83 | |||
| 84 | ### Known issues | ||
| 85 | |||
| 86 | - Firefox 52 and below do not support EventSource in web/shared workers | ||
| 87 | - In Firefox prior to version 36 server-sent events do not reconnect | ||
| 88 | automatically in case of a connection interrupt (bug) | ||
| 89 | - Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera | ||
| 90 | 12+, Chrome 26+, Safari 7.0+. | ||
| 91 | - Antivirus software may block the event streaming data chunks. | ||
| 92 | |||
| 93 | Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) | ||
| 94 | |||
| 95 | ### Message format | ||
| 96 | |||
| 97 | The simplest message that can be sent is only with data attribute: | ||
| 98 | |||
| 99 | ```bash | ||
| 100 | data: this is a simple message | ||
| 101 | <blank line> | ||
| 102 | ``` | ||
| 103 | |||
| 104 | You can send message IDs to be used if the connection is dropped: | ||
| 105 | |||
| 106 | ```bash | ||
| 107 | id: 33 | ||
| 108 | data: this is line one | ||
| 109 | data: this is line two | ||
| 110 | <blank line> | ||
| 111 | ``` | ||
| 112 | |||
| 113 | And you can specify your own event types (the above messages will all trigger | ||
| 114 | the message event): | ||
| 115 | |||
| 116 | ```bash | ||
| 117 | id: 36 | ||
| 118 | event: price | ||
| 119 | data: 103.34 | ||
| 120 | <blank line> | ||
| 121 | ``` | ||
| 122 | |||
| 123 | ### Server requirements | ||
| 124 | |||
| 125 | The important thing is how you send headers and which headers are sent by the | ||
| 126 | server that triggers browser to threat response as a EventStream. | ||
| 127 | |||
| 128 | Headers responsible for this are: | ||
| 129 | |||
| 130 | ```bash | ||
| 131 | Content-Type: text/event-stream | ||
| 132 | Cache-Control: no-cache | ||
| 133 | Connection: keep-alive | ||
| 134 | ``` | ||
| 135 | |||
| 136 | ### Debugging with Google Chrome | ||
| 137 | |||
| 138 | Google Chrome provides build-in debugging and exploration tool for [Server-Sent | ||
| 139 | Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 140 | which is quite nice and available from Developer Tools under Network tab. | ||
| 141 | |||
| 142 | > You can debug only client side events that get received and not the server | ||
| 143 | > ones. For debugging server events add `console.log` to `server.js` code and | ||
| 144 | > print out events. | ||
| 145 | |||
| 146 | {:loading="lazy"} | ||
| 147 | |||
| 148 | ## Server implementation | ||
| 149 | |||
| 150 | For the sake of this example we will use [Node.js](https://nodejs.org/en/) with | ||
| 151 | [Express](https://expressjs.com) as our router since this is the easiest way to | ||
| 152 | get started and we will use already written SSE library for node | ||
| 153 | [sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the | ||
| 154 | wheel. | ||
| 155 | |||
| 156 | ```bash | ||
| 157 | npm init --yes | ||
| 158 | |||
| 159 | npm install express | ||
| 160 | npm install body-parser | ||
| 161 | npm install sse-pubsub | ||
| 162 | ``` | ||
| 163 | |||
| 164 | Basic implementation of a server (`server.js`): | ||
| 165 | |||
| 166 | ```js | ||
| 167 | const express = require('express'); | ||
| 168 | const bodyParser = require('body-parser'); | ||
| 169 | const SSETopic = require('sse-pubsub'); | ||
| 170 | |||
| 171 | const app = express(); | ||
| 172 | const port = process.env.PORT || 4000; | ||
| 173 | |||
| 174 | // topics container | ||
| 175 | const sseTopics = {}; | ||
| 176 | |||
| 177 | app.use(bodyParser.json()); | ||
| 178 | |||
| 179 | // open for all cors | ||
| 180 | app.all('*', (req, res, next) => { | ||
| 181 | res.header('Access-Control-Allow-Origin', '*'); | ||
| 182 | res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type'); | ||
| 183 | next(); | ||
| 184 | }); | ||
| 185 | |||
| 186 | // preflight request error fix | ||
| 187 | app.options('*', async (req, res) => { | ||
| 188 | res.header('Access-Control-Allow-Origin', '*'); | ||
| 189 | res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type'); | ||
| 190 | res.send('OK'); | ||
| 191 | }); | ||
| 192 | |||
| 193 | // serve the event streams | ||
| 194 | app.get('/stream/:topic', async (req, res, next) => { | ||
| 195 | const topic = req.params.topic; | ||
| 196 | |||
| 197 | if (!(topic in sseTopics)) { | ||
| 198 | sseTopics[topic] = new SSETopic({ | ||
| 199 | pingInterval: 0, | ||
| 200 | maxStreamDuration: 15000, | ||
| 201 | }); | ||
| 202 | } | ||
| 203 | |||
| 204 | // subscribing client to topic | ||
| 205 | sseTopics[topic].subscribe(req, res); | ||
| 206 | }); | ||
| 207 | |||
| 208 | // accepts new messages into topic | ||
| 209 | app.post('/publish', async (req, res) => { | ||
| 210 | let body = req.body; | ||
| 211 | let status = 200; | ||
| 212 | |||
| 213 | console.log('Incoming message:', req.body); | ||
| 214 | |||
| 215 | if ( | ||
| 216 | body.hasOwnProperty('topic') && | ||
| 217 | body.hasOwnProperty('event') && | ||
| 218 | body.hasOwnProperty('message') | ||
| 219 | ) { | ||
| 220 | const topic = req.body.topic; | ||
| 221 | const event = req.body.event; | ||
| 222 | const message = req.body.message; | ||
| 223 | |||
| 224 | if (topic in sseTopics) { | ||
| 225 | // sends message to all the subscribers | ||
| 226 | sseTopics[topic].publish(message, event); | ||
| 227 | } | ||
| 228 | } else { | ||
| 229 | status = 400; | ||
| 230 | } | ||
| 231 | |||
| 232 | res.status(status).send({ | ||
| 233 | status, | ||
| 234 | }); | ||
| 235 | }); | ||
| 236 | |||
| 237 | // returns JSON object of all opened topics | ||
| 238 | app.get('/status', async (req, res) => { | ||
| 239 | res.send(sseTopics); | ||
| 240 | }); | ||
| 241 | |||
| 242 | // health-check endpoint | ||
| 243 | app.get('/', async (req, res) => { | ||
| 244 | res.send('OK'); | ||
| 245 | }); | ||
| 246 | |||
| 247 | // return a 404 if no routes match | ||
| 248 | app.use((req, res, next) => { | ||
| 249 | res.set('Cache-Control', 'private, no-store'); | ||
| 250 | res.status(404).end('Not found'); | ||
| 251 | }); | ||
| 252 | |||
| 253 | // starts the server | ||
| 254 | app.listen(port, () => { | ||
| 255 | console.log(`PubSub server running on http://localhost:${port}`); | ||
| 256 | }); | ||
| 257 | ``` | ||
| 258 | |||
| 259 | ### Our custom message format | ||
| 260 | |||
| 261 | Each message posted on a server must be in a specific format that out server | ||
| 262 | accepts. Having structure like this allows us to have multiple separated type of | ||
| 263 | events on each topic. | ||
| 264 | |||
| 265 | With this we can separate streams and only receive events that belong to the | ||
| 266 | topic. | ||
| 267 | |||
| 268 | One example would be, that we have index page and we want to receive messages | ||
| 269 | about new upvotes or new subscribers but we don't want to follow events for | ||
| 270 | other pages. This reduces clutter and overall network. And structure is much | ||
| 271 | nicer and maintanable. | ||
| 272 | |||
| 273 | ```json | ||
| 274 | { | ||
| 275 | "topic": "sample-topic", | ||
| 276 | "event": "sample-event", | ||
| 277 | "message": { "name": "John" } | ||
| 278 | } | ||
| 279 | ``` | ||
| 280 | |||
| 281 | ## Publisher and subscriber clients | ||
| 282 | |||
| 283 | ### Publisher and subscriber in action | ||
| 284 | |||
| 285 | <video src="/assets/posts/simple-pubsub-server/clients.m4v" controls></video> | ||
| 286 | |||
| 287 | You can download [the code](../simple-pubsub-server/sse-pubsub-server.zip) and | ||
| 288 | follow along. | ||
| 289 | |||
| 290 | ### Publisher | ||
| 291 | |||
| 292 | As talked about above publisher is the one that send messages to the | ||
| 293 | broker/server. Message inside the payload can be whatever you want (string, | ||
| 294 | object, array). I would however personally avoid send large chunks of data like | ||
| 295 | blobs and such. | ||
| 296 | |||
| 297 | ```html | ||
| 298 | <!DOCTYPE html> | ||
| 299 | <html lang="en"> | ||
| 300 | |||
| 301 | <head> | ||
| 302 | <meta charset="UTF-8"> | ||
| 303 | <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||
| 304 | <title>Publisher</title> | ||
| 305 | </head> | ||
| 306 | |||
| 307 | <body> | ||
| 308 | |||
| 309 | <h1>Publisher</h1> | ||
| 310 | |||
| 311 | <fieldset> | ||
| 312 | <p> | ||
| 313 | <label>Server:</label> | ||
| 314 | <input type="text" id="server" value="http://localhost:4000"> | ||
| 315 | </p> | ||
| 316 | <p> | ||
| 317 | <label>Topic:</label> | ||
| 318 | <input type="text" id="topic" value="sample-topic"> | ||
| 319 | </p> | ||
| 320 | <p> | ||
| 321 | <label>Event:</label> | ||
| 322 | <input type="text" id="event" value="sample-event"> | ||
| 323 | </p> | ||
| 324 | <p> | ||
| 325 | <label>Message:</label> | ||
| 326 | <input type="text" id="message" value='{"name": "John"}'> | ||
| 327 | </p> | ||
| 328 | <p> | ||
| 329 | <button type="button" id="button">Publish message to topic</button> | ||
| 330 | </p> | ||
| 331 | </fieldset> | ||
| 332 | |||
| 333 | <script> | ||
| 334 | |||
| 335 | const button = document.querySelector('#button'); | ||
| 336 | const server = document.querySelector('#server'); | ||
| 337 | const topic = document.querySelector('#topic'); | ||
| 338 | const event = document.querySelector('#event'); | ||
| 339 | const message = document.querySelector('#message'); | ||
| 340 | |||
| 341 | button.addEventListener('click', async (evt) => { | ||
| 342 | const req = await fetch(`${server.value}/publish`, { | ||
| 343 | method: 'post', | ||
| 344 | headers: { | ||
| 345 | 'Accept': 'application/json', | ||
| 346 | 'Content-Type': 'application/json', | ||
| 347 | }, | ||
| 348 | body: JSON.stringify({ | ||
| 349 | topic: topic.value, | ||
| 350 | event: event.value, | ||
| 351 | message: JSON.parse(message.value), | ||
| 352 | }), | ||
| 353 | }); | ||
| 354 | |||
| 355 | const res = await req.json(); | ||
| 356 | console.log(res); | ||
| 357 | }); | ||
| 358 | |||
| 359 | </script> | ||
| 360 | |||
| 361 | </body> | ||
| 362 | |||
| 363 | </html> | ||
| 364 | ``` | ||
| 365 | |||
| 366 | ### Subscriber | ||
| 367 | |||
| 368 | Subscriber is responsible for receiving new messages that come from server via | ||
| 369 | publisher. The code bellow is very rudimentary but works and follows the | ||
| 370 | implementation guidelines for EventSource. | ||
| 371 | |||
| 372 | You can use either Developer Tools Console to see incoming messages or you can | ||
| 373 | defer to Debugging with Google Chrome section above to see all EventStream | ||
| 374 | messages. | ||
| 375 | |||
| 376 | > Don't be alarmed if the subscriber gets disconnected from the server every so | ||
| 377 | > often. The code we have here resets connection every 15s but it automatically | ||
| 378 | > get reconnected and fetches all messages up to last received message id. This | ||
| 379 | > setting can be adjusted in `server.js` file; search for the | ||
| 380 | > `maxStreamDuration` variable. | ||
| 381 | |||
| 382 | ```html | ||
| 383 | <!DOCTYPE html> | ||
| 384 | <html lang="en"> | ||
| 385 | |||
| 386 | <head> | ||
| 387 | <meta charset="UTF-8"> | ||
| 388 | <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||
| 389 | <title>Subscriber</title> | ||
| 390 | <link rel="stylesheet" href="style.css"> | ||
| 391 | </head> | ||
| 392 | |||
| 393 | <body> | ||
| 394 | |||
| 395 | <h1>Subscriber</h1> | ||
| 396 | |||
| 397 | <fieldset> | ||
| 398 | <p> | ||
| 399 | <label>Server:</label> | ||
| 400 | <input type="text" id="server" value="http://localhost:4000"> | ||
| 401 | </p> | ||
| 402 | <p> | ||
| 403 | <label>Topic:</label> | ||
| 404 | <input type="text" id="topic" value="sample-topic"> | ||
| 405 | </p> | ||
| 406 | <p> | ||
| 407 | <label>Event:</label> | ||
| 408 | <input type="text" id="event" value="sample-event"> | ||
| 409 | </p> | ||
| 410 | <p> | ||
| 411 | <button type="button" id="button">Subscribe to topic</button> | ||
| 412 | </p> | ||
| 413 | </fieldset> | ||
| 414 | |||
| 415 | <script> | ||
| 416 | |||
| 417 | const button = document.querySelector('#button'); | ||
| 418 | const server = document.querySelector('#server'); | ||
| 419 | const topic = document.querySelector('#topic'); | ||
| 420 | const event = document.querySelector('#event'); | ||
| 421 | |||
| 422 | button.addEventListener('click', async (evt) => { | ||
| 423 | |||
| 424 | let es = new EventSource(`${server.value}/stream/${topic.value}`); | ||
| 425 | |||
| 426 | es.addEventListener(event.value, function (evt) { | ||
| 427 | console.log(`incoming message`, JSON.parse(evt.data)); | ||
| 428 | }); | ||
| 429 | |||
| 430 | es.addEventListener('open', function (evt) { | ||
| 431 | console.log('connected', evt); | ||
| 432 | }); | ||
| 433 | |||
| 434 | es.addEventListener('error', function (evt) { | ||
| 435 | console.log('error', evt); | ||
| 436 | }); | ||
| 437 | |||
| 438 | }); | ||
| 439 | |||
| 440 | </script> | ||
| 441 | |||
| 442 | </body> | ||
| 443 | |||
| 444 | </html> | ||
| 445 | ``` | ||
| 446 | |||
| 447 | ## Reading further | ||
| 448 | |||
| 449 | - [Using server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 450 | - [Using SSE Instead Of WebSockets For Unidirectional Data Flow Over HTTP/2](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/) | ||
| 451 | - [What is Server-Sent Events?](https://apifriends.com/api-streaming/server-sent-events/) | ||
| 452 | - [An HTTP/2 extension for bidirectional messaging communication](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html) | ||
| 453 | - [Introduction to HTTP/2](https://developers.google.com/web/fundamentals/performance/http2) | ||
| 454 | - [The WebSocket API (WebSockets)](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) | ||
| 455 | |||
