Kafka consumer groups let you keep track of the latest offsets consumed for a given topic/partition. We ran into an issue recently when we started monitoring the lag for a given consumer group using kafka-lag-exporter, though: if your consumer group has ever committed an offset for a given topic, it stays there as long as the consumer group exists.
We tried deleting it using the kafka-consumer-groups command line tool, but we got a message saying that the operation wasn’t supported by our broker (we are using MSK).
So what to do? Well, I first started by looking into poking around the __consumer_offsets topic, and then noped out of there when I saw that it stores data and keys in some binary format that you need to use a Java class to parse.
The next idea I had was to delete the consumer group, and recreate it, leaving out the offensive topic(s). But we subscribed to a lot of topics! Well, a little bash can go a long way.
#!/usr/bin/env bash set -o errexit set -o nounset export kafka_server=your-kafka-server:9092 export consumer_group=your-consumer-group export skiptopics=("old-unused-topic1" "old-unused-topic2") containsElement () { local e match="$1" shift for e; do [[ "$e" == "$match" ]] && return 0; done return 1 } current_offsets=$(docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --describe | tail -n +3 | awk '{ print $2, $4}') echo "Current offsets:" echo "$current_offsets" ( set -o xtrace; docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --delete ) while IFS= read -r line; do arr=($line) topic="${arr[0]}" if containsElement "$topic" "${skiptopics[@]}"; then continue fi offset="${arr[1]}" ( set -o xtrace; docker run --rm --volume ~/kafka-ssl.properties:/config.properties --entrypoint bin/kafka-consumer-groups.sh solsson/kafka --bootstrap-server $kafka_server --group $consumer_group --reset-offsets --topic $topic --to-offset $offset --execute ) done < <(printf '%s\n' "$current_offsets")
Leave a Reply