Beer reviews and other ramblings

Monitoring Kafka Consumer Lag in Seconds

Monitoring Kafka Consumer Lag in Seconds

In this post I’ll talk about how we’re monitoring Kafka consumer lag using the same metric as our Service Level Objective: lag in seconds, and why that’s important.

The logging stack we built at work (ZipRecruiter) uses Apache Kafka as a buffer for logs, which means we can send data from Kafka to other destinations with anything that can act as a Kafka consumer. We’re using Logstash to send logs to Elasticsearch, and Secor to send logs to S3 for archival. Monitoring the consumer lag in seconds means we know exactly how  far behind in time these tools are.

The logs are important for many reasons, among them is our ability to troubleshoot production incidents effectively. Having the latest logs in Elasticsearch (to be viewed with Kibana) can make or break our ability to fix problems with the site.

The Metric

Lag in seconds is the difference between the Kafka insertion time of the most recent message committed by the consumer and the time the check script runs. For example: a log is inserted into Kafka at 1:00:00. At 1:00:30 when the consumer is done with the log and has committed offsets, the lag is 30 seconds.

Lag is obtained for each Kafka partition, and then max, median, and min values are calculated for an overall picture of lag across partitions without having to make these calculations mentally (especially when on-call).

Why is monitoring lag in seconds better than something more straightforward, like number of records? With a constantly growing data volume, putting such a hard coded threshold on this alert would inevitably mean exceeding that threshold. Monitoring lag in seconds from Kafka insertion time means we don’t care how many records there are, as long as all of them are indexed in a reasonable amount of time (5 minutes, the SLO). This allows the infrastructure to grow when we need it to without adjusting what we monitor.

Healthy Logstash lag varies from -1 to 4 seconds over a period of 18 hours

The Service Level Objective

A service level objective (SLO) is a specific part of the overall service level agreement (SLA). Our SLO is to get logs from Kafka to Elasticsearch in 2.5 minutes or less. Generally we exceed that goal dramatically, with average lag between 0 and 5 seconds. The lag varies by 5 seconds because Logstash commits offsets every 5 seconds.

The overall target is 5 minutes from being written to a file on an EC2 instance to being searchable in Elasticsearch. Filebeat on the instance has 2.5 minutes to read the logs and write them to Kafka, which means Logstash has 2.5 minutes to get them from Kafka to Elasticsearch.

The Secor SLO is a little more relaxed, at 15 minutes from being written to a file to being uploaded to S3. Most of the things which process logs from S3 don’t need super up to date data, as they are generally once per hour/day reports, or Athena queries.

The Alert

Monitoring and alerting about the right thing is tricky. Getting it wrong could mean waking someone up in the middle of the night for no reason. Because we value the well-being of our on-call engineers (of which I am one), we don’t want to alert every time we exceed 5 minutes of lag. Sometimes spikes in data volume will mean we can’t index fast enough, but it’ll catch up within a reasonable amount of time. We only want to alert when there’s a real problem. Because of this, we don’t send an alert until we’ve exceeded 5 minutes of lag over a consecutive 10 minute period (see image below). At that point our users (the other engineers at the company) can’t do their job effectively. That being said, we don’t expect our on call engineers to respond immediately (they have 30 minutes) but it’s best to be aware of issues as soon as possible.

Here’s an example of a 5 minute spike in lag which lasted 10 minutes:

The code

So here’s how it’s done. This is an abridged version. I may open source the actual Icinga/nagios check plugin if there’s interest, but for now this should get you started. This is written in Python, requires the kafka-python module and is inspired by an example from the kafka-python GitHub repo here.

First, you’ll need to enable timestamps in Kafka with the LogAppendtime option. Modify the Kafka properties file (server.properties) to add this line on each broker, then restart Kafka.

Next we obtain the current offsets for the consumers from the built-in Kafka tool-set. You can also get this info from Burrow.

Both of the above commands produce tabular output, like this:

From the above commands we get the topic, partition and offset. Then we can use Python to get the message associated with the offset:

Now that we have the message associated with the offset, we can get the timestamp (LogAppendTime) of the message:

Now all that’s left to do is calculate the lag. We use the datetime.timedelta.total_seconds() function instead of the timedelta.seconds attribute, because if the lag is negative, timedelta returns a negative value for the day, but a positive value for seconds (including all of the seconds which exist between the prior day and now, eg. 86391).

And that’s it. You now have lag_in_seconds and you can do with it what you want. I recommend visualizing it over time…

The Graphs

Graphs are cool. One of my favorite things about metrics are the graphs. There are three metrics in each of these images: max (green), median (yellow), and min (blue). With just three metrics we can get an overall picture of lag across partitions without having to visualize and mentally comprehend a potentially unbounded number of partitions. Of course, if you have thousands of partitions you may want to consider producing percentile metrics (eg. p99) as well.

The Elasticsearch cluster goes down, and the lag spikes up at 45°:

Recovery from Elasticsearch cluster outage; lag comes down after many hours:

To my delight, the lag check found this situation where the lag spikes greater than 5 minutes every day at 5pm. Turns out this is due to new daily indices being created, which happens at 12:00 UTC (ie. 5pm PDT). Elasticsearch takes a while to create the new indices, and returns timeout errors for some requests in the meantime. Still working out how to properly fix this, but it probably involves changing the index naming scheme we’re using (an index per tag).

Conclusion

There are, of course, other solutions which exist for monitoring Kafka consumer lag. Burrow has a fully featured lag evaluation system, alerting capabilities, and overall is most likely sufficient for most people’s needs. At work we’re monitoring lag in seconds because that’s what we care about. You should always choose the solution which works best for your needs.

If you like or have questions about this post, feel free to leave a comment below or on twitter: @jeremydonahue. You can also sign up for email notifications in the side-bar if that’s your thing.

I’ll be posting soon about how we ship 3 billion logs per day with this stack.



9 thoughts on “Monitoring Kafka Consumer Lag in Seconds”

  • This is super pedantic, but you’re not measureing what you say you want. You’re measuring “difference of message timestamp against localtime”. You say you process > 30k messages per second, but don’t say if that’s on one topic or distributed across a bunch, but given that traffic it’s likely that this just so happens to be an accurate proxy for “difference of message timestamp against last written message timestamp”. It’s just a proxy though, and may surprise you. For example, if you’re not computing the difference on the broker itself, you may or may not run into fun issues with timezones, DST, and friends Also, if a topic may be quiet for some time, your metrics will not be happy. Either issue could be solved by asking the broker for the most recently written message and computing the difference against its timestamp; you have the offset of the committed message, and the lag, and can compute the offset of the last written message if you care. It’s a little more work, but it’ll never surprise you.

    • Hi Gunnar, I think you’re correct that it would be better to use “difference of message timestamp against last written message timestamp”. Though in this case, as you deduced, the behavior of the traffic is such that the timestamp of the latest message in Kafka is always equal to “now”. We are using a single topic for these logs (which simplifies things significantly), and our instances constantly generate logs which are written to the topic. If the most recent message is not “now”, then there’s a problem. Granted, we have other monitoring for that condition so it would make sense to update this to use the timestamp of the last message in Kafka. I’ll definitely consider it, thanks for the comment.

  • Also, we’ve not been super happy about the Kafka command-line tools. We have some topics with a couple hundred consumer groups, and the command-line tools have failed to show us all the consumers. When we ask them explicitly about the status of any given consumer, they tell us, but when we ask them to describe all the consumers of a topic they’ve left some out.

    I don’t like surprises, and prefer to do my monitoring from within the customers themselves to avoid this kind of thing. If you want accurate timestamp-based lag metrics, that does mean an extra trip to the broker every now and then to get the timestamp of the last written message, but you can just as easily get your proxy metric in the consumer.

    Monitoring Kafka has apparently given me more PTSD than I knew.

    • Monitoring Kafka has apparently given me more PTSD than I knew.

      Haha!

      I know what you mean about the command line tools. I think when it fails to show some consumers, it’s because they haven’t committed offsets to __consumer_offsets within some amount of time. Then when you request information about a specific group, it seeks further back looking for information about that group. I could be wrong about that, though. It’s hard to tell what what’s going on, but it seems to come down to handleListGroups() in GroupCoordinator.scala. It’s unclear how far back in the __consumer_offsets topic it’s going by default. If anyone has insight into this, please let me know.

  • I am setting up a Kafka-Logstash-Elastic pipeline. My rate is 75 million messages per hour. I am really struggling with the rate . Can you please help me with the settings of Logstash and ElasticSearch. My message size is 1 kB.

    • Hi Prateek, I had to increase pipeline.batch.size significantly, in addition to pipeline.workers. Also keep in mind that throughput will be limited by the computation cost of your Logstash filters. If you have a lot of them, you’ll need enough processing power to keep up. See the Logstash Performance Troubleshooting Guide for details.

      • Thanks. Would you like to suggest something for Elasticsearch. The settings I need to change for ES.

        • A few common mistakes that impact ES throughput:
          Store ES data on dedicated data drives, not the root FS.
          vm.swappiness = 1
          replicas=1 for big indices unless it’s super critical
          use a different index for each day of data at this scale
          Make sure your garbage collection config is optimized, with either G1GC or CMS, with InitiatingOccupancyPercent=75.

Leave a Reply

Your email address will not be published. Required fields are marked *