Trello has been using RabbitMQ for the last three years. Prior to RabbitMQ, we were relying on a Redis Pub-Sub implementation. Recently, due to RabbitMQ’s questionable behavior when network partitions occur, we’ve made the switch over to Kafka.
This blog post goes into depth on our RabbitMQ implementation, why we chose Kafka, and the Kafka-based architecture we ended up with.
Current state of the world
Trello uses a cluster of 15 RabbitMQ instances for all websocket updates. Our server machines publish messages to the cluster and then our websocket instances pull messages from the queue. However, there are some configuration peculiarities involved.
Interlude: how RabbitMQ works
RabbitMQ allows you to publish messages to an exchange with a routing key. Each exchange has a routing policy associated with it: fanout, single routing key, prefix, etc. Queues bind to the exchange with a routing key and the exchange tries to match the published messages to queues based on their routing keys and the exchanges configuration.
When creating queues, you can specify them to be transient; they immediately get destroyed as soon as the TCP connection that created them gets closed and all associated bindings are removed.
Interlude 2: the Trello websocket protocol
The websocket protocol we use is extremely simple; there is a bare-minimum request-response mechanism and the only commands we support are subscribing and unsubscribing to a channel.
The subscription command contains a Trello model type (board, member, organization, card), and its respective model id.
Message routing
We set up this system by letting each websocket process (8 per instance) connect to RabbitMQ and create a transient queue for itself. When the process gets a websocket connection and receives a subscribe command, it will create a binding for that specific subscription to the general update exchange (which is configured to do specific key routing).
RabbitMQ sharding
Messages through RabbitMQ are sharded by their model id over 16 shards.
Trello Server publishes all messages to a single exchange on a rabbitmq-inbound cluster of 3 instances using a client-side calculated shard key. These 16 different shard keys have their own bindings to 16 different queues. We then use the shovel plugin to distribute these 16 queues to 4 different rabbitmq-outbound clusters (again, 3 instances each), which each contain 4 queues. The websocket client servers connect to all RabbitMQ clusters and then subscribe to the queues they need, depending on the requested models of all of their connected users.
The theory behind this was to distribute the load and scale the RMQ infrastructure horizontally. However, because the clusters themselves were not reliable (single instance failures or network interruptions could cause an entire cluster to fail completely) the inbound cluster was still a single point of failure.
Problems
Rabbit’s main problem is its handling of partitions and general cluster interruptions. The results vary a bit, but range from split-brain to complete cluster failure. Worse still, recovering from a dead cluster usually involves resetting it completely. In our case this means we have to drop all sockets and force the web clients to reconnect to ensure that they can re-retrieve missed updates. However, that may not be enough to fully recover in a split-brain scenario – the web client may have missed a message but received a later one and has no way of knowing.
Additionally, there was another problem–creating a queue and bindings in RabbitMQ is slow and expensive. Destroying queues and bindings is also expensive. Every time we would lose a socket server we would see an unsubscribe and resubscribe storm as client websockets were dropped and tried to reconnect, which took some time for RMQ to process. While we could handle it for simple cases where we restarted one server, if we ever lost ALL the websocket connections and had to reconnect them (which has happened more times than we would like), the flood of binding add/remove commands would cause the RMQ cluster to become unresponsive, even to monitoring commands or a normal process SIGHALT. At times this lead to cluster failure.
To workaround this, we introduced some jitter when propagating the disconnects to the RMQ server; this helped a lot with large-scale socket drops, but network partitions continued to be a problem.
Available solutions
Queuing providers
Compare this great table with the alternatives:
Feature | Kafka | SNS + SQS | SNS + FIFO SQS | Kinesis | Redis Streams |
---|---|---|---|---|---|
Awesome driver support (for node.js) | ⚠️(see below) | ✅(SDK) | ✅(SDK) | (Java stdin/out wrapper) | ✅(SDK) |
Useable as a delta queue | (lack of filtering / expensive topic creation) | (only-once processing) | (only-once processing) | (lack of filtering) | ✅ 1 |
Has fail-over capabilities | ✅ | N/A | N/A | N/A | ⚠️2 |
In-Order message delivery (per shard) | ✅ | ✅ | ✅ | ✅ | |
Fits the socket message distribution model (fanout) | ⚠️7 | ✅ | ✅ | ⚠️3 | ✅ |
Low Latency | ✅ | ✅ | ✅ | 4 | ✅ |
Actually available | ✅ | ✅ | 5 | ✅ | 6 |
Footnotes
Bold is a must-have.
- It is assumed that (similarly to redis’ current pub-sub topics) streams are extremely lightweight to create and subscribe to
- Supposedly possible with Sentinel. Lossless fail-over is not the default, but can be achieved at the loss of some throughput. Untested.
- Partitions have to be read separately, tooling support for all partitions → 1 process is unclear.
- Due to its limitation to 5 reads/s per shard (and socket servers basically having to listen on all shards), scaling up socket servers simultaneously increases latency.
- Due to its limited throughput (300 msgs/s), not actually available for our purposes (2k+ msgs/s + growth)
- As of 12/7/2017 only available in the unstable branch / unreleased.
- Partitions for the same topic may be distributed over multiple servers, however that’s generally abstracted by the available libraries.
We decided that Kafka was the best option. Hopefully, Redis streams will actually be available in the future; Redis is a neat tool and would allow for a more efficient architecture.
Kafka drivers
Feature | kafka-node | node-rdkafka |
---|---|---|
Widely used | ✅ | ⚠️ |
Actively developed | ✅ | ⚠️ (seems to have some support by blizzard) |
No C++/build dependencies | ✅ | |
Clean, pretty abstractions | ⚠️ 1 | ⚠️ 2 |
Doesn’t break on failovers | 3 | ✅ |
Footnotes
Bold is a must-have.
- Looks alright from the outside, but if you dig into the code, you’ll find that inheritance is weird.
- The C++ is a leaky abstraction. Because it sets up its own (non-node managed) threading, it (optionally) wants you to configure which exit signal it should listen on to abort the thread. It also requires the JS user-land to continually poll the C++ side for updates (for example to receive delivery reports).
- When a partition is lost and fails over, publishing to it will silently fail. Similarly, the consumer will not receive any messages published to that partition on its new master.
Because we need fail-over, the choice on this one was pretty obvious: node-rdkafka
. We were quite confused when we tested fail-over and it didn’t work, but when we tested node-rdkafka
we found everything we wanted and didn’t further investigate why that was the case.
It iss important to note that node-rdkafka
is actually a wrapper around librdkafka
, the “official” (as in: developed by a Confluent employee) C++ client for Kafka.
Result
New architecture
Socket servers now have a master-client architecture. The master is subscribed to the whole topic and receives all delta updates and does its filtering locally based on which models the clients need to forward to users. This puts more load on our servers from the get-go, but scaling it is relatively easy (by just getting a bigger CPU). When a client receives a subscribe request, it checks permissions and then forwards the request to the master, which saves the model id in a map.
The “client” actually accepts socket connections from users, handles their authentication, and forwards subscription requests to the master.
When a delta update comes in, the master checks if there are any clients interested in that specific model and forwards the message to it to then be distributed to users.
Metrics
We now have really pretty metrics about all things Kafka! Previously, only a few metrics like message rates were available in the RabbitMQ dashboard. Now we import all of the Kafka metrics into our own store, which allows us to put alerts on everything.
Metrics like consumer lag (from the queue server and client perspective!) weren’t previously available to us in such an organized fashion. While it would have been possible to build for Rabbit, we have only added it during the course of this rewrite.
On to actual differences:
(white lines are deployments, tiny white line before the spike is the start of the socket deployment)
As you can see, memory usage dropped by about 33%, while CPU usage increased to approximately 2x. The reduction of memory is due to the reduced number of required queues, while the increase in CPU is due to local filtering.
Additionally, there is a second set of lines that represent a different instance type we’ve been experimenting with on recent server additions: m4 instead of r3. We plan to drop a number of socket servers and move the remaining ones to more compute-targeted types (likely c5) too. This should give us a lot of room to grow.
Outages
Luckily, we only experienced a small one! While we’ve only recently switched to the new Kakfa-based architecture, the cluster had been enabled and had been published for over a month. We have not had an outage yet! Great news when compared to the 4 outages caused by RabbitMQ in the month prior to the switch.
During an upgrade for RabbitMQ (trusty
→ xenial
), we managed to crash and reconnect our whole server farm. While all other systems handled this correctly, the max_open_file
limit on the Kafka servers was not properly set and led to some processes being unable to connect.
Costs
A lot less! While not the primary motivating factor, it’s pretty neat to drop costs.
RMQ consisted of a large number of c3.2xlarge instances. Now Kafka consists of a few m4.large for Zookeeper and i3.large for Kafka. The changes resulted in a 5x reduction in costs. Yay!