[Atelier] Le traitement des flux en toute simplicité avec Flink | S'inscrire
Streaming applications have gained immense popularity due to their ability to process real-time data efficiently. However, when developing such applications, one crucial aspect that often goes unnoticed is the default partitioning behavior of Java and non-Java producers. This disparity can result in data mismatches and inconsistencies, posing challenges for developers. This blog post delves into the causes of these inconsistencies and explores available options to overcome them, enabling seamless integration between Java and non-Java producers.
In Apache Kafka®’s Java producer library, the default partitioning strategy uses a hashing function called DefaultPartitioner
to determine the topic partition to which a message should be assigned. The DefaultPartitioner
uses a 32-bit hash called murmur2
, the name is derived from its inner loop, which employs two fundamental operations: multiplication and rotation. It computes the partition for a record (with the key defined); for example, let’s say we have a topic with six (6) partitions and we need to determine in what partition a given message needs to be assigned to:
This partitioning strategy is deterministic, it ensures that messages with the same key are consistently assigned to the same partition, preserving the order of related messages within the partition. If the key is null
or not provided, a round-robin mechanism distributes the messages evenly across available partitions.
Kafka provides flexibility for implementing custom partitioners. By extending the org.apache.kafka.clients.producer.Partitioner
interface, developers can define their own partitioning logic, allowing them to use alternative hashing functions or consider other message attributes to determine the target partition.
However, everything would be perfect if there wasn’t an elephant in the room:
That is a very good question, my tusk-tastic friend!
Many non-Java clients rely on a C/C++ library called librdkafka. If you are not familiar with it, the library provides a high-performance, lightweight, and feature-rich implementation of the Apache Kafka wire protocol. It serves as a client library for interacting with Kafka clusters.
Ermmm, so, what is wrong with librdkafka?
Absolutely nothing!
That library is widely used in production environments and is utilized by numerous Kafka client implementations in different programming languages such as Python, Go, .NET, C#, etc. However librdkafka, by default, uses a different default partitioner called CRC32
(Cyclic Redundancy Check 32-bit). Like murmur2
, it also generates a fixed-size 32-bit hash value for a given input data. As one can figure out already, different hashing functions will very likely yield different results.
Because of that, messages with the same key produced by Java and many non-Java clients are expected to be assigned to different partitions. That means, for example, if a Python producer and a source connector (Java based) are producing data to different topics in Kafka, then a merge of these topics using Kafka Streams API or ksqlDB would not work properly as there would be a partition mismatch. If we consider the same table as previously, but this time hashing the keys using CRC32
, we can see the mismatches:
Let’s now run a practical example where data is produced by two different producers, one using the murmur2
and the other CRC32
hashing functions:
The first producer writes data to the topic demo_user and uses the CRC32
hashing function (the names used in this data sample are purely fictional and do not represent any real individuals)
The second producer is a ksqlDB stream where data is written directly to the stream/topic demo_user_orders
using INSERT SQL statements, and as we now know, ksqlDB is part of the Kafka Streams API which is written in Java and by default uses the murmur2
hashing function
Both topics will have a common key (user_id
)
The details about this demo can be found on GitHub
As we can see there were mismatches with the partition assignment:
If we then try to merge both topics in ksqlDB by their keys (user_id) we will find several mismatches, but first, let’s see the queries used in this example:
Table DEMO_USER: Directly derived from the topic demo_user
Stream DEMO_USER_ORDERS: Fed directly through ksqlDB's REST API
Stream DEMO_USER_ORDERS_MERGED: Merge demo_user_orders
stream (murmur2
) with table demo_user
(CRC32
)
The merged stream looks like this:
Ideally, all producers, regardless of the client language, should have the same hashing function and partitioner. Luckily, for librdkafka-based languages there is a configuration property called partitioner
. To match it with Java’s it should be set to murmur2_random
. For example, when setting the producer the configuration properties should be:
Be sure to double-check whether the client library/version you are using exposes the partitioner parameter back to librdkafka, otherwise, it will assume the default value.
The hashing functions available in librdkafka are listed on GitHub. Below is an extract of it:
Another option is to repartition the topic using ksqlDB. Basically, write a permanent query to read the original table (DEMO_USER
, materialized from the topic demo_user
) partitioned with CRC32
and dump it to another table/topic. For example:
Since that is a CREATE TABLE … AS SELECT
statement, the messages will be repartitioned using the murmur2
hashing function as ksqlDB is Java based.
If we now merge the stream DEMO_USER_ORDERS
with the table DEMO_USER_REPARTITION
we will have an exact match on the partition assignment:
One important note: If using the Apache Flink stream processing framework (Java or SQL based), none of that would necessarily be a problem. The reason is that Flink typically repartitions the messages upon ingesting them.
After delving into the causes of inconsistencies between Java and many non-Java producers, we have gained insight into the default partitioning behavior of these applications. It is evident that this disparity can lead to data mismatches and challenges for developers, despite the immense popularity of streaming applications for processing real-time data efficiently. Fortunately, by exploring available options, we can overcome these issues and achieve seamless integration between Java and many non-Java producers, ensuring a more robust and reliable system overall.
Learn when to consider expanding to multiple Apache Kafka clusters, how to manage the operations for large clusters, and tools and resources for efficient operations.
When a client wants to send or receive a message from Apache Kafka®, there are two types of connection that must succeed: The initial connection to a broker (the […]