[Webinaire de démonstration] Prêt(e) à dire au revoir à ZooKeeper ? Rencontrez KRaft ! | Inscrivez-vous dès maintenant

What Are Apache Kafka® Consumer Group IDs?

Écrit par

Consumer group IDs are a vital part of consumer configuration in Apache Kafka®. Setting the consumer Group ID determines what group a consumer belongs to, which has some major consequences. There are three areas in which Group IDs are particularly pertinent:

  • Detecting new data

  • Work sharing

  • Fault tolerance

If you'd like a quick overview of consumer group IDs check out the video below, or keep reading for more!

What is a Kafka consumer?

Kafka consumers read/consume data from Kafka producers, do the work of reading event streams. They read events, or messages, from logs called topics. Topics are further split into partitions, which are append-only logs that store the messages. This enables each topic to be hosted and replicated across a number of brokers.

As you can see in the diagram, a given consumer in a consumer group can read from multiple partitions, including multiple partitions housed in the same topic.

What is the function of a consumer group? 

One of the more foundational concepts of consumer groups is that each consumer, once assigned to a group, shares the workload. When each consumer has the same Group ID, they cannot read from the same partition:

Now, you don’t always need to reach for a single consumer group. Sometimes, you may have two consumers (each in different consumer groups, allowing for parallel processing) for two different services, like a customer address service and a customer delivery notification service, that would need to read from the same partition in the same topic. These two consumers in different groups reading from the same topic can pick up reading from different offsets, which they would not be able to do if Kafka used a queue rather than a persistent log. 

However, if you do want your consumers to read from the same topic, you need to consider a couple of things:  1) the number of consumers in your group and 2) the number of partitions. This consideration is important because the consumers will share the workload as equally as possible among themselves. Sometimes making this decision is easy—I’m writing a demo app in Kafka Streams with 1 consumer and 1 partition right now because I just need to show developers how to use a .process() function in Kafka Streams. But in most use cases, this decision takes careful thought. 

Considerations in favor of fewer consumers in relation to a high number of partitions include things like how high you want your consumer throughput to be since each partition is given one thread per consumer. You might also want room to increase parallelism later on.

Considerations against having fewer consumers to a higher number of partitions include things like higher unavailability in the case of unclean failure since leader election will take longer.

If you’re interested in reading more about consumer strategy in relation to partitions, Jun Rao has an excellent blog post on the subject. 

Using consumer Group IDs to detect new data

The broker stores the progress of each consumer group in a metadata topic called  consumer offsets. Offsets  specify the location of a given event within a partition, and as such, represent progress through the topic. Like bookmarks or sticky tabs in textbooks, offsets in consumer groups serve the same purpose as how bookmarks or sticky tabs function in books.

Checking for new data

You can use a particular Group ID’s offset to check whether there’s been new data written to the partition.  If there’s an event with a larger offset, that means there’s new data to read. If you want to know how to read the offset, here’s a command using the kafka-consumer-groups utility that will read your offsets:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group1 --offsets

Note that you need to provide a valid Group ID to --group if you’re trying out this command. The output will resemble the following:

`GROUP   TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG     OWNER
Groupname topicname     0        2               2         1       ownername`

Or, if you want to learn more about how to do this with the Confluent CLI for a topic hosted in Confluent Cloud, you can check out this tutorial on reading from a specific offset and partition. 

There’s more on the kafka-consumer-groups utility in our documentation, and you can always run kafka-consumer-groups —help for a full list of the options.

For example, in Python it looks like the second line from the bottom in this configuration object:

<STARTCODEBLOCKINHTML>  config = {
# User-specific properties that you must set
'bootstrap.servers': '<BOOTSTRAP SERVERS>',
'sasl.username': '<CLUSTER API KEY>',
'sasl.password': '<CLUSTER API SECRET>',

# Fixed properties
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'group.id': 'kafka-python-client',
'auto.offset.reset': 'earliest'
}
<ENDCODEBLOCKINHTML>

Consumer Group IDs in work sharing

The Group ID determines which consumers belong to which group. You can assign Group IDs via configuration when you create the consumer client. 

For example, in Python it looks like the second line from the bottom in this configuration object:

<STARTCODEBLOCKINHTML>  config = {
# User-specific properties that you must set
'bootstrap.servers': '<BOOTSTRAP SERVERS>',
'sasl.username': '<CLUSTER API KEY>',
'sasl.password': '<CLUSTER API SECRET>',

# Fixed properties
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'group.id': 'kafka-python-client',
'auto.offset.reset': 'earliest'
}
<ENDCODEBLOCKINHTML>

If there are eight partitions, each of those four consumers will be assigned two partitions. What if there are nine partitions? That means the leftover partition will be assigned to the first consumer in the group so that one consumer reads from three partitions and the rest of the consumers read from two partitions. It’s the broker’s job to continually ensure that partitions are evenly distributed among the connected consumers.

Note: At the top, you'll see that although there are four consumers, three are idle. That's because only one consumer in the same group can read from a single partition.

This whole process is predicated on the presence of a Group ID to unify the consumers. It’s important to remember this while you’re setting up your consumers. 

If you’re connecting microservices, you want to make sure that each service has its own consumer group (and hence its own Group ID). Why is that? Let’s walk through an example.

Let’s say there’s a topic “payments,” and both the “orders” microservice and the “refunds” microservice will need to read from that topic. You wouldn’t want them to share the same offsets, because if they did, the progress through the “payments” topic would be shared by “orders” and “refunds,” which would mean potential missed orders or refunds. 

However, if you had a group of consumers handling “orders” by reading from partitions in the “payments” topic, then the current offset for each consumer in the group, stored in the broker, is vital to ensure continuous progress in case a consumer in the group crashes. At the same time, if consumers from another, separate group, like “refunds” are reading from the “payments” topic, they can continue their progress unaffected even if the consumers in the “orders” group are rebalancing. 

The role of consumer Group IDs in fault tolerance

As the last example revealed, Group IDs also play a vital role in fault tolerance. 

What happens when a consumer crashes? 

Each consumer sends “heartbeat requests” to the broker at a set interval. If a consumer does not respond in time, a rebalance is triggered. 

How does a Group ID play into rebalancing? Well, in either case, the broker’s record of the associated offset determines where the consumer will begin reading after a rejoin. As long as the Group ID remains the same, it can pick up exactly where it left off, without any risk of data loss. 

Rebalance process

When a new consumer joins a group, the first step is the broker’s recognition of the new consumer. In the second step, the broker adjusts the consumer group state from RUNNING to PREPARE_REBALANCE. Once the consumer group reacts to the change in state, the final state that the broker introduces is COMPLETING_REBALANCE. (Afterwards, the state is restored to RUNNING once more.)

Now, what pieces of information does the broker use to track all this? Well, in each type of consumer membership, there’s a member.id that ensures a unique identity for each group member. 

In a dynamic membership, at the point in a rebalance when a new consumer sends a JoinGroupRequest with a special UNKNOWN_MEMBER_ID, the broker receives that UNKNOWN_MEMBER_ID, sees that consumer as a new member of the group, and generates a new ID for it. 

During a client restart, this happens for all members of the group because when the members send their JoinGroupRequests, an UNKNOWN_MEMBER_ID is included. The member.id is not persisted, so a rebalance is triggered because the broker reads these members as new members. 

An additional problem: Using dynamic membership in an application with a large volume of local state 

KIP-345 (which is a Kafka Improvement Proposal, introducing the concept of static membership to the Kafka community) notes:

When the service state is heavy, a rebalance of one topic partition from instance A to B means a huge amount of data transfer.” — KIP-345

In this context, a “heavy” service state means that an app that has built up a large amount of local state—perhaps a large KTable. If the partition is reassigned, then that state must be saved and transferred to a new node, and if there’s a lot of data, the process will be slow. 

Now, this process can and should be avoided altogether if the partition assignment remains unchanged, or has the potential to remain unchanged, before and after the rebalance. But how to execute this task? This is where static membership comes into play. 

Static membership

What is static membership, and how does it solve the problem encountered by heavy-state applications with dynamic membership? 

A consumer with static membership is a consumer with a configured group.instance.id, in addition to its broker-managed member.id. The group.instance.id persists the identity of the consumer over a rebalance. 

So, what would happen if a client with a group of consumers with static membership needed to restart? In this case, since the broker recognizes the group.instance.id, it would not need to assign new membership, and a rebalance can be avoided. This makes heavy-state applications much more efficient, and it also makes rolling upgrades much faster. Since the consumers’ identity as part of the original groups is persisted by the group.instance.id, they no longer have to rebalance on removal and re-entry into the group. The only lag involved will be from the restarted node.

Implementation 

To implement static membership in your consumer group, set the group.instance.id value in your configuration. Here’s an example of what that might look like, using the node-rdkafka client:

javascript
const consumer = new Kafka.KafkaConsumer({
  'group.id': 'kafka-consumer',
  'group.instance.id': 'kafka-consumer-<unique-instance-id>',
  'partition.assignment.strategy': 'range',
  'metadata.broker.list': 'localhost:9092',
}, {});

group.instance.id vs. group.id

To recap, in the example above, the consumer is instantiated with both a group.id and a group.instance.id. What’s the difference? A group.id is what determines which group a consumer belongs to. You can read more about group IDs in our blog post Configuring Apache Kafka Group IDs.

Alternatively, a group.instance.id should be different for each member of the consumer group, because it encapsulates the member’s identity as a consumer instance. The broker maps the group.instance.id to each member.id to ensure each consumer’s unique identity. The member.id serves as extra validation in the case of static membership.

Where to go from here

In summary, when you set a consumer Group ID in the process of creating a consumer client, that Group ID assigns the consumer to its group, which has ramifications for work sharing, detecting new data, and data recovery. In addition, to avoid long rebalances when the service state is heavy, a piece of configuration for each group member called a `group.instance.id` is available. To learn more about this and other topics, check out these recommended resources:

  • Confluent Developer: Learn Apache Kafka through Confluent Developer tutorials, documentation, courses, blog posts, and examples. 

  • Confluent Community: If you have a question about Apache Kafka or you’d like to meet other Kafka developers, head over to Confluent Community and introduce yourself on our Community Slack or Forum. 

  • Get Started with Confluent Cloud: You can learn more about how to get started with Confluent by joining this live webinar, hosted by Tim Berglund.

‎ 

Apache®, Apache Kafka®, and Kafka® are registered trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by the Apache Software Foundation is implied by using these marks. All other trademarks are the property of their respective owners.

Avez-vous aimé cet article de blog ? Partagez-le !