diff options
Diffstat (limited to 'content/2020-03-22-simple-sse-based-pubsub-server.md')
| -rw-r--r-- | content/2020-03-22-simple-sse-based-pubsub-server.md | 398 |
1 files changed, 398 insertions, 0 deletions
diff --git a/content/2020-03-22-simple-sse-based-pubsub-server.md b/content/2020-03-22-simple-sse-based-pubsub-server.md new file mode 100644 index 0000000..8f82921 --- /dev/null +++ b/content/2020-03-22-simple-sse-based-pubsub-server.md | |||
| @@ -0,0 +1,398 @@ | |||
| 1 | ~ title: Simple Server-Sent Events based PubSub Server | ||
| 2 | ~ description: PubSub server made with Server-Sent Events | ||
| 3 | ~ slug: /simple-server-sent-events-based-pubsub-server.html | ||
| 4 | ~ date: 2020-03-22 | ||
| 5 | ~ template: post | ||
| 6 | ~ hide: false | ||
| 7 | |||
| 8 | ## Before we continue ... | ||
| 9 | |||
| 10 | Publisher Subscriber model is nothing new and there are many amazing solutions out there, so writing a new one would be a waste of time if other solutions wouldn't have quite complex install procedures and weren't so hard to maintain. But to be fair, comparing this simple server with something like [Kafka](https://kafka.apache.org/) or [RabbitMQ](https://www.rabbitmq.com/) is laughable at the least. Those solutions are enterprise grade and have many mechanisms there to ensure messages aren't lost and much more. Regardless of these drawbacks, this method has been tested on a large website and worked until now without any problems. So now, that we got that cleared up, let's continue. | ||
| 11 | |||
| 12 | ***Wiki definition:** Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all the subscribers to the topic.* | ||
| 13 | |||
| 14 | ## General goals | ||
| 15 | |||
| 16 | - provide a simple server that relays messages to all the connected clients, | ||
| 17 | - messages can be posted on specific topics, | ||
| 18 | - messages get sent via [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) to all the subscribers. | ||
| 19 | |||
| 20 | ## How exactly does the pub/sub model work? | ||
| 21 | |||
| 22 | The easiest way to explain this is with diagram bellow. Basic function is simple. We have subscribers that receive messages, and we have publishers that create and post messages. Similar model is also well know pattern that works on a premise of consumers and producers, and they take similar roles. | ||
| 23 | |||
| 24 |  | ||
| 25 | |||
| 26 | **ⓘ** These are some naive characteristics we want to achieve: | ||
| 27 | |||
| 28 | - producer is publishing messages to subscribe topic, | ||
| 29 | - consumer is receiving messages from subscribed topic, | ||
| 30 | - servers is also known as Broker, | ||
| 31 | - broker does not store messages or tracks success, | ||
| 32 | - broker uses [FIFO](https://en.wikipedia.org/wiki/FIFO_(computing_and_electronics)) method for delivering messages, | ||
| 33 | - if consumer wants to receive messages from a topic, producer and consumer topics must match, | ||
| 34 | - consumer can subscribe to multiple topics, | ||
| 35 | - producer can publish to multiple topics, | ||
| 36 | - each message has a messageId. | ||
| 37 | |||
| 38 | **ⓘ** Known drawbacks: | ||
| 39 | |||
| 40 | - messages will not be stored in a persistent queue or unreceived messages like [DeadLetterQueue](https://en.wikipedia.org/wiki/Dead_letter_queue) so old messages could be lost on server restart, | ||
| 41 | - [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) opens a long-running connection between the client and the server so make sure if your setup is load balanced that the load balancer in this case can have long opened connection, | ||
| 42 | - no system moderation due to the dynamic nature of creating queues. | ||
| 43 | |||
| 44 | ## Server-Sent Events | ||
| 45 | |||
| 46 | Read more about it on [official specification page](https://html.spec.whatwg.org/multipage/server-sent-events.html). | ||
| 47 | |||
| 48 | ### Current browser support | ||
| 49 | |||
| 50 |  | ||
| 51 | |||
| 52 | Check [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) for latest information about browser support. | ||
| 53 | |||
| 54 | ### Known issues | ||
| 55 | |||
| 56 | - Firefox 52 and below do not support EventSource in web/shared workers | ||
| 57 | - In Firefox prior to version 36 server-sent events do not reconnect automatically in case of a connection interrupt (bug) | ||
| 58 | - Reportedly, CORS in EventSource is currently supported in Firefox 10+, Opera 12+, Chrome 26+, Safari 7.0+. | ||
| 59 | - Antivirus software may block the event streaming data chunks. | ||
| 60 | |||
| 61 | Source: [https://caniuse.com/#feat=eventsource](https://caniuse.com/#feat=eventsource) | ||
| 62 | |||
| 63 | ### Message format | ||
| 64 | |||
| 65 | The simplest message that can be sent is only with data attribute: | ||
| 66 | |||
| 67 | ```bash | ||
| 68 | data: this is a simple message | ||
| 69 | <blank line> | ||
| 70 | ``` | ||
| 71 | |||
| 72 | You can send message IDs to be used if the connection is dropped: | ||
| 73 | |||
| 74 | ```bash | ||
| 75 | id: 33 | ||
| 76 | data: this is line one | ||
| 77 | data: this is line two | ||
| 78 | <blank line> | ||
| 79 | ``` | ||
| 80 | |||
| 81 | And you can specify your own event types (the above messages will all trigger the message event): | ||
| 82 | |||
| 83 | ```bash | ||
| 84 | id: 36 | ||
| 85 | event: price | ||
| 86 | data: 103.34 | ||
| 87 | <blank line> | ||
| 88 | ``` | ||
| 89 | |||
| 90 | ### Server requirements | ||
| 91 | |||
| 92 | The important thing is how you send headers and which headers are sent by the server that triggers browser to threat response as a EventStream. | ||
| 93 | |||
| 94 | Headers responsible for this are: | ||
| 95 | |||
| 96 | ```bash | ||
| 97 | Content-Type: text/event-stream | ||
| 98 | Cache-Control: no-cache | ||
| 99 | Connection: keep-alive | ||
| 100 | ``` | ||
| 101 | |||
| 102 | ### Debugging with Google Chrome | ||
| 103 | |||
| 104 | Google Chrome provides build-in debugging and exploration tool for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) which is quite nice and available from Developer Tools under Network tab. | ||
| 105 | |||
| 106 | **ⓘ** You can debug only client side events that get received and not the server ones. | ||
| 107 | |||
| 108 |  | ||
| 109 | |||
| 110 | ## Server implementation | ||
| 111 | |||
| 112 | For the sake of this example we will use [Node.js](https://nodejs.org/en/) with [Express](https://expressjs.com) as our router since this is the easiest way to get started and we will use already written SSE library for node [sse-pubsub](https://www.npmjs.com/package/sse-pubsub) so we don't reinvent the wheel. | ||
| 113 | |||
| 114 | ```bash | ||
| 115 | npm init --yes | ||
| 116 | |||
| 117 | npm install express | ||
| 118 | npm install body-parser | ||
| 119 | npm install sse-pubsub | ||
| 120 | ``` | ||
| 121 | |||
| 122 | Basic implementation of a server (`server.js`): | ||
| 123 | |||
| 124 | ```js | ||
| 125 | const express = require('express'); | ||
| 126 | const bodyParser = require('body-parser'); | ||
| 127 | const SSETopic = require('sse-pubsub'); | ||
| 128 | |||
| 129 | const app = express(); | ||
| 130 | const port = process.env.PORT || 4000; | ||
| 131 | |||
| 132 | // topics container | ||
| 133 | const sseTopics = {}; | ||
| 134 | |||
| 135 | app.use(bodyParser.json()); | ||
| 136 | |||
| 137 | // open for all cors | ||
| 138 | app.all('*', (req, res, next) => { | ||
| 139 | res.header('Access-Control-Allow-Origin', '*'); | ||
| 140 | res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type'); | ||
| 141 | next(); | ||
| 142 | }); | ||
| 143 | |||
| 144 | // preflight request error fix | ||
| 145 | app.options('*', async (req, res) => { | ||
| 146 | res.header('Access-Control-Allow-Origin', '*'); | ||
| 147 | res.header('Access-Control-Allow-Headers', 'X-Requested-With, Content-Type'); | ||
| 148 | res.send('OK'); | ||
| 149 | }); | ||
| 150 | |||
| 151 | // serve the event streams | ||
| 152 | app.get('/stream/:topic', async (req, res, next) => { | ||
| 153 | const topic = req.params.topic; | ||
| 154 | |||
| 155 | if (!(topic in sseTopics)) { | ||
| 156 | sseTopics[topic] = new SSETopic({ | ||
| 157 | pingInterval: 0, | ||
| 158 | maxStreamDuration: 15000, | ||
| 159 | }); | ||
| 160 | } | ||
| 161 | |||
| 162 | // subscribing client to topic | ||
| 163 | sseTopics[topic].subscribe(req, res); | ||
| 164 | }); | ||
| 165 | |||
| 166 | // accepts new messages into topic | ||
| 167 | app.post('/publish', async (req, res) => { | ||
| 168 | let body = req.body; | ||
| 169 | let status = 200; | ||
| 170 | |||
| 171 | console.log('Incoming message:', req.body); | ||
| 172 | |||
| 173 | if ( | ||
| 174 | body.hasOwnProperty('topic') && | ||
| 175 | body.hasOwnProperty('event') && | ||
| 176 | body.hasOwnProperty('message') | ||
| 177 | ) { | ||
| 178 | const topic = req.body.topic; | ||
| 179 | const event = req.body.event; | ||
| 180 | const message = req.body.message; | ||
| 181 | |||
| 182 | if (topic in sseTopics) { | ||
| 183 | // sends message to all the subscribers | ||
| 184 | sseTopics[topic].publish(message, event); | ||
| 185 | } | ||
| 186 | } else { | ||
| 187 | status = 400; | ||
| 188 | } | ||
| 189 | |||
| 190 | res.status(status).send({ | ||
| 191 | status, | ||
| 192 | }); | ||
| 193 | }); | ||
| 194 | |||
| 195 | // returns JSON object of all opened topics | ||
| 196 | app.get('/status', async (req, res) => { | ||
| 197 | res.send(sseTopics); | ||
| 198 | }); | ||
| 199 | |||
| 200 | // health-check endpoint | ||
| 201 | app.get('/', async (req, res) => { | ||
| 202 | res.send('OK'); | ||
| 203 | }); | ||
| 204 | |||
| 205 | // return a 404 if no routes match | ||
| 206 | app.use((req, res, next) => { | ||
| 207 | res.set('Cache-Control', 'private, no-store'); | ||
| 208 | res.status(404).end('Not found'); | ||
| 209 | }); | ||
| 210 | |||
| 211 | // starts the server | ||
| 212 | app.listen(port, () => { | ||
| 213 | console.log(`PubSub server running on http://localhost:${port}`); | ||
| 214 | }); | ||
| 215 | ``` | ||
| 216 | |||
| 217 | ### Our custom message format | ||
| 218 | |||
| 219 | Each message posted on a server must be in a specific format that out server accepts. Having structure like this allows us to have multiple separated type of events on each topic. | ||
| 220 | |||
| 221 | With this we can separate streams and only receive events that belong to the topic. | ||
| 222 | |||
| 223 | One example would be, that we have index page and we want to receive messages about new upvotes or new subscribers but we don't want to follow events for other pages. This reduces clutter and overall network. And structure is much nicer and maintanable. | ||
| 224 | |||
| 225 | ```json | ||
| 226 | { | ||
| 227 | "topic": "sample-topic", | ||
| 228 | "event": "sample-event", | ||
| 229 | "message": { "name": "John" } | ||
| 230 | } | ||
| 231 | ``` | ||
| 232 | |||
| 233 | ## Publisher and subscriber clients | ||
| 234 | |||
| 235 | ### Publisher and subscriber in action | ||
| 236 | |||
| 237 | <video src="/assets/simple-pubsub-server/clients.mp4" controls></video> | ||
| 238 | |||
| 239 | You can download [the code](../assets/simple-pubsub-server/sse-pubsub-server.zip) and follow along. | ||
| 240 | |||
| 241 | ### Publisher | ||
| 242 | |||
| 243 | As talked about above publisher is the one that send messages to the broker/server. Message inside the payload can be whatever you want (string, object, array). I would however personally avoid send large chunks of data like blobs and such. | ||
| 244 | |||
| 245 | ```html | ||
| 246 | <!DOCTYPE html> | ||
| 247 | <html lang="en"> | ||
| 248 | |||
| 249 | <head> | ||
| 250 | <meta charset="UTF-8"> | ||
| 251 | <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||
| 252 | <title>Publisher</title> | ||
| 253 | </head> | ||
| 254 | |||
| 255 | <body> | ||
| 256 | |||
| 257 | <h1>Publisher</h1> | ||
| 258 | |||
| 259 | <fieldset> | ||
| 260 | <p> | ||
| 261 | <label>Server:</label> | ||
| 262 | <input type="text" id="server" value="http://localhost:4000"> | ||
| 263 | </p> | ||
| 264 | <p> | ||
| 265 | <label>Topic:</label> | ||
| 266 | <input type="text" id="topic" value="sample-topic"> | ||
| 267 | </p> | ||
| 268 | <p> | ||
| 269 | <label>Event:</label> | ||
| 270 | <input type="text" id="event" value="sample-event"> | ||
| 271 | </p> | ||
| 272 | <p> | ||
| 273 | <label>Message:</label> | ||
| 274 | <input type="text" id="message" value='{"name": "John"}'> | ||
| 275 | </p> | ||
| 276 | <p> | ||
| 277 | <button type="button" id="button">Publish message to topic</button> | ||
| 278 | </p> | ||
| 279 | </fieldset> | ||
| 280 | |||
| 281 | <script> | ||
| 282 | |||
| 283 | const button = document.querySelector('#button'); | ||
| 284 | const server = document.querySelector('#server'); | ||
| 285 | const topic = document.querySelector('#topic'); | ||
| 286 | const event = document.querySelector('#event'); | ||
| 287 | const message = document.querySelector('#message'); | ||
| 288 | |||
| 289 | button.addEventListener('click', async (evt) => { | ||
| 290 | const req = await fetch(`${server.value}/publish`, { | ||
| 291 | method: 'post', | ||
| 292 | headers: { | ||
| 293 | 'Accept': 'application/json', | ||
| 294 | 'Content-Type': 'application/json', | ||
| 295 | }, | ||
| 296 | body: JSON.stringify({ | ||
| 297 | topic: topic.value, | ||
| 298 | event: event.value, | ||
| 299 | message: JSON.parse(message.value), | ||
| 300 | }), | ||
| 301 | }); | ||
| 302 | |||
| 303 | const res = await req.json(); | ||
| 304 | console.log(res); | ||
| 305 | }); | ||
| 306 | |||
| 307 | </script> | ||
| 308 | |||
| 309 | </body> | ||
| 310 | |||
| 311 | </html> | ||
| 312 | |||
| 313 | ``` | ||
| 314 | |||
| 315 | ### Subscriber | ||
| 316 | |||
| 317 | Subscriber is responsible for receiving new messages that come from server via publisher. The code bellow is very rudimentary but works and follows the implementation guidelines for EventSource. | ||
| 318 | |||
| 319 | You can use either Developer Tools Console to see incoming messages or you can defer to Debugging with Google Chrome section above to see all EventStream messages. | ||
| 320 | |||
| 321 | **ⓘ** Don't be alarmed if the subscriber gets disconnected from the server every so often. The code we have here resets connection every 15s but it automatically get reconnected and fetches all messages up to last received message id. This setting can be adjusted in `server.js` file; search for the `maxStreamDuration` variable. | ||
| 322 | |||
| 323 | ```html | ||
| 324 | <!DOCTYPE html> | ||
| 325 | <html lang="en"> | ||
| 326 | |||
| 327 | <head> | ||
| 328 | <meta charset="UTF-8"> | ||
| 329 | <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||
| 330 | <title>Subscriber</title> | ||
| 331 | <link rel="stylesheet" href="style.css"> | ||
| 332 | </head> | ||
| 333 | |||
| 334 | <body> | ||
| 335 | |||
| 336 | <h1>Subscriber</h1> | ||
| 337 | |||
| 338 | <fieldset> | ||
| 339 | <p> | ||
| 340 | <label>Server:</label> | ||
| 341 | <input type="text" id="server" value="http://localhost:4000"> | ||
| 342 | </p> | ||
| 343 | <p> | ||
| 344 | <label>Topic:</label> | ||
| 345 | <input type="text" id="topic" value="sample-topic"> | ||
| 346 | </p> | ||
| 347 | <p> | ||
| 348 | <label>Event:</label> | ||
| 349 | <input type="text" id="event" value="sample-event"> | ||
| 350 | </p> | ||
| 351 | <p> | ||
| 352 | <button type="button" id="button">Subscribe to topic</button> | ||
| 353 | </p> | ||
| 354 | </fieldset> | ||
| 355 | |||
| 356 | <script> | ||
| 357 | |||
| 358 | const button = document.querySelector('#button'); | ||
| 359 | const server = document.querySelector('#server'); | ||
| 360 | const topic = document.querySelector('#topic'); | ||
| 361 | const event = document.querySelector('#event'); | ||
| 362 | |||
| 363 | button.addEventListener('click', async (evt) => { | ||
| 364 | |||
| 365 | let es = new EventSource(`${server.value}/stream/${topic.value}`); | ||
| 366 | |||
| 367 | es.addEventListener(event.value, function (evt) { | ||
| 368 | console.log(`incoming message`, JSON.parse(evt.data)); | ||
| 369 | }); | ||
| 370 | |||
| 371 | es.addEventListener('open', function (evt) { | ||
| 372 | console.log('connected', evt); | ||
| 373 | }); | ||
| 374 | |||
| 375 | es.addEventListener('error', function (evt) { | ||
| 376 | console.log('error', evt); | ||
| 377 | }); | ||
| 378 | |||
| 379 | }); | ||
| 380 | |||
| 381 | </script> | ||
| 382 | |||
| 383 | </body> | ||
| 384 | |||
| 385 | </html> | ||
| 386 | |||
| 387 | ``` | ||
| 388 | |||
| 389 | |||
| 390 | |||
| 391 | ## Reading further | ||
| 392 | |||
| 393 | - [https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) | ||
| 394 | - [https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/](https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/) | ||
| 395 | - [https://apifriends.com/api-streaming/server-sent-events/](https://apifriends.com/api-streaming/server-sent-events/) | ||
| 396 | - [https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html](https://tools.ietf.org/id/draft-xie-bidirectional-messaging-01.html) | ||
| 397 | - [https://developers.google.com/web/fundamentals/performance/http2](https://developers.google.com/web/fundamentals/performance/http2) | ||
| 398 | - [https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) | ||
