Deleting kafka topics from a consumer group

Kafka consumer groups let you keep track of the latest offsets consumed for a given topic/partition. We ran into an issue recently when we started monitoring the lag for a given consumer group using kafka-lag-exporter, though: if your consumer group has ever committed an offset for a given topic, it stays there as long as the consumer group exists.

We tried deleting it using the kafka-consumer-groups command line tool, but we got a message saying that the operation wasn’t supported by our broker (we are using MSK).

So what to do? Well, I first started by looking into poking around the __consumer_offsets topic, and then noped out of there when I saw that it stores data and keys in some binary format that you need to use a Java class to parse.

The next idea I had was to delete the consumer group, and recreate it, leaving out the offensive topic(s). But we subscribed to a lot of topics! Well, a little bash can go a long way.

#!/usr/bin/env bash
set -o errexit
set -o nounset

export kafka_server=your-kafka-server:9092
export consumer_group=your-consumer-group
export skiptopics=("old-unused-topic1" "old-unused-topic2")

containsElement () {
  local e match="$1"
  shift
  for e; do [[ "$e" == "$match" ]] && return 0; done
  return 1
}

current_offsets=$(docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --describe | tail -n +3 | awk '{ print $2, $4}')

echo "Current offsets:"
echo "$current_offsets"

( set -o xtrace; docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --delete )

while IFS= read -r line; do
  arr=($line)
  topic="${arr[0]}"
  if containsElement "$topic" "${skiptopics[@]}"; then
    continue
  fi
  offset="${arr[1]}"
  ( set -o xtrace; docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --reset-offsets --topic $topic --to-offset $offset --execute )
done < <(printf '%s\n' "$current_offsets")

Comments

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.