For a while now we needed a solution for distributed stream processing and messaging. Apache Kafka seemed like a solid foundation upon which we could build business-critical applications. Possible use cases include product updates pipelines, order tracking, real-time user notifications, merchant billing and more.
This is the story of how we introduced Kafka to a large monolithic Rails codebase. We will go through the technical details, challenges we faced and decisions we made in the process.
Note: Some familiarity with basic Kafka concepts is assumed.
The challenges ahead
An initial issue stemmed from the fact that Kafka provides relatively low-level abstractions. While this has certain advantages, it also means that authors of client libraries are exposed to a large API surface area and have to get many things right, which makes writing a solid client implementation a demanding task.
Being mostly a Ruby-based organization, we tried different Kafka client libraries written in Ruby but constantly faced hard-to-diagnose bugs. Ruby lacking concurrency primitives makes writing an efficient client a non-trivial task.
We sidestepped these problems by following a different route: hide the low-level complexities in a standalone service that exposes a minimal API for clients to interact with. Since the service could be written in a language other than Ruby, we could leverage the battle-tested librdkafka with which we were pretty experienced, since we used it from several Python and Go applications.
So we built Rafka - a proxy service that sits in front of Kafka and exposes it with simple semantics and a simple API. It provides sane defaults, hiding many details away from users. We chose Go, which already had a solid Kafka client library backed by librdkafka and also provided us with the necessary tools for efficiently implementing the features we needed.
We chose to use a subset of the simple Redis protocol, so that writing a client library becomes a straightforward task; all we had to do was add a thin layer on top of the well-proven Ruby redis client.
In a couple of days we had a working client library written in Ruby, packaged as a gem named rafka-rb, featuring both consumer and producer implementations.
With Rafka and its companion Ruby client library in place, our services and most importantly our Rails application could start writing and reading data from Kafka with minimum effort.
Our main Rails application is where most developers spend their time working on. Therefore, being able to easily write and deploy new consumers from within the application was of great importance. Therefore, the next step was to make it straightforward for developers working in the main Rails codebase to write and deploy Kafka consumers & producers in a consistent and straightforward manner.
Producing from a Rails application
Integrating producers in an existing application is a simple task, mostly due to the fact that only one producer is needed even if one needs to produce to multiple topics.
For this reason, we have a single producer instance that’s created during initialization and used everywhere throughout the codebase:
Producing is as easy as:
Consuming from a Rails application
Writing consumers is a whole other story, mainly since consuming is a long-running task. In this section we’ll take a look at how we provided the building blocks for writing consumers from within a monolithic Rails codebase, with the help of Rafka.
Links to the source code of all the outlined components are provided at the end of this post.
Let’s dive right in.
Consumers are plain old Ruby objects with their classes defined inside the
Rails application. They inherit from the abstract
KafkaConsumer class, which
transparently integrates every consumer with statsd for statistics, Sentry
for error tracking and may provide other needed integrations in the future.
Their class names end with the “Consumer” suffix and the files that
define them are named accordingly, following Rails conventions.
A typical consumer looks as follows:
We should mention here that each consumer is backed by a
under the hood.
After writing a new consumer, it has to be enabled manually in the configuration file:
Again, following Rails conventions, consumer names are derived from the class names.
A key point is that all instances of a consumer are essentially standalone Kafka consumers belonging in the same consumer group.
On deployment, Capistrano reads the configuration file to spawn the appropriate consumers on the appropriate servers (defined by a Capistrano role). More on that later.
Voila! This is all it takes to write and deploy a consumer.
Which brings us to the next question: how are consumers spawned as long-running processes?
Consumers as long-running processes
The natural step after having a bunch of consumers implemented was to actually run them.
This is achieved with a class named
KafkaConsumerWorker which wraps
consumers objects and facilitates running them as long-running processes.
A simplified version of the worker is given below:
The worker ultimately ends up continuously calling the underlying consumer’s
#process method to process messages in a loop. It also provides graceful shutdown functionality.
Lastly, it integrates the consumer
with systemd, thus providing robustness, liveness checks, visibility and
Even though one won’t have to interact directly with the worker, it is quite easy to do so in case it is needed for development or debugging purposes:
The next step was to start the worker using systemd. We did this with a straightforward systemd service file:
Each consumer instance is identified by a string containing the consumer name
and the instance number (e.g.
price_drops:1), which is passed to systemd as a
template argument (the
%i part). This way we can spawn many instances of
different consumers using the same service file.
Integrating consumers with systemd means that for each consumer we get several features out-of-the-box:
- consumer management commands (start, stop, restart, status)
- alerts when a consumer raises an exception
- visibility: what state each consumer is in (working, waiting for job, shutting down), its current offset/topic/partition (via sd_notify(3))
- automated restart of failed consumers
- automated restart of stuck consumers via systemd watchdog timers
- simplified logging: we just log to stdout/stderr and systemd takes care of the rest
Inspecting a consumer instance provides a pretty informational output which is useful for debugging when something goes wrong:
The last missing piece of the puzzle is the command that systemd calls in order to start a consumer. The command is actually a plain old rake task that sets up the worker along with its underlying consumer and runs it.
Similar to the other components, the task lives inside the Rails codebase:
Since we use Capistrano for deployment, we added a Capistrano task that takes care of stopping and starting consumers. A simplified version of it follows:
kafkactl is a wrapper script that executes the necessary systemctl commands under the hood.
When someone deploys the application, Capistrano reads the configuration YAML and spawns the enabled consumers as can be observed in its output:
After deploying a new consumer we monitor the Grafana dashboards to verify everything works fine and we’re watching Slack to make sure no alerts are triggered.
The big picture
At a glance we can see that our Kafka/Rails integration infrastructure consists of the following components:
- Rafka: a Kafka proxy service with simple semantics and a small API
- rafka-rb: a Ruby client library for Rafka
KafkaConsumer: a Ruby abstract class meant to be sub-classed by concrete consumer implementations
KafkaConsumerWorker: a Ruby class that facilitates spawning consumers as long-running processes
kafka:consumer: a rake task that runs a consumer instance
- kafka_consumers.yml: a configuration file that controls which consumers should run on production and with how many instances
- kafka-consumer@.service: a systemd service file that spawns a consumer by calling the rake task
Their interactions are illustrated in the following diagram:
We should note here that these concepts are orthogonal and each of them can be used separately from the others, whether it’s for debugging, testing or prototyping purposes.
Since many consumers would perform mission-critical tasks (e.g., calculate billing costs), it is imperative that they are sufficiently monitored.
Monitoring happens on various levels and each consumer transparently gets the following features:
- Statistics: job process timings and consumer throughput (processed msgs/sec)
The fact that we get these features out-of-the-box is one of the benefits of having a common consumer infrastructure.
Reflecting and moving forward
We are pretty happy with the way developers are interacting with Kafka and we received very positive feedback about it.
Being able to write and deploy consumers in a few simple steps increased the velocity of our development teams and enabled us to build on top of Kafka in a consistent and efficient manner.
In the future we want to package and distribute all the components described in this post as open source libraries (maybe a Rubygem) that other organizations can use and benefit from.
Finally, we plan to add more features to Rafka and the consumer/producer infrastructure, notably:
- batch processing capabilities
- multi-topic consumers
- KSQL-backed primitives (aggregations, joins, etc.)
- before/after/around consumer hooks
We would be happy to hear your thoughts and feedback!
The code mentioned in this post can be found in the following repositories.
If anything is unclear or missing, please file a Github issue.