[Webinar] Build Your GenAI Stack with Confluent and AWS | Register Now
Real-time analytics has become the need of the hour for modern internet companies. The ability to derive internal insights around business metrics, user growth and adoption as well as security incidents from all raw logs is crucial for day-to-day operations. Even more critical is enabling access to usage analytics for millions of customers, which is non-trivial to achieve. A good example of this is LinkedIn’s “Who viewed your profile,” which allows all 700 million+ users to slice and dice their page view data.
Another example is Uber’s Restaurant Manager, which enables restaurant owners across the globe to gain insights around menu preference, sales metrics, busy hours, and so on. All such user-facing analytical applications need an analytical store that can support thousands of complex queries per second at millisecond-response-time granularity while ingesting millions of events/second. Most transactional databases are unable to fulfill such stringent requirements. Enter Apache Pinot™.
Apache Pinot is a distributed analytics data store that is rapidly becoming the go-to solution for building real-time analytical applications at scale. Pinot stands out due to its ability to deliver low-latency performance for high-throughput analytical queries. The aforementioned use cases – LinkedIn’s “Who Viewed Your Profile” and Uber’s “Restaurant Manager” are a few examples of powerful analytical applications being powered by Apache Pinot in production.
As shown in the diagram above, Pinot can ingest data from a wide variety of data sources, including event streaming systems like Apache Kafka® and batch data systems like Hadoop Distributed File System (HDFS) or Amazon S3. This blog post details how Pinot integrates with Kafka to deliver fast analytics on streams of data. Fast refers to both data freshness as well as query latency. This post then elaborates on how input Kafka data is partitioned, replicated, and indexed within Pinot, and it describes how a distributed Pinot query processes all of this data. Finally, it talks about memory management of Pinot servers consuming data from Kafka and how it affects overall performance.
Below is a diagram of Pinot’s distributed architecture:
A Pinot table is composed of a set of Pinot segments that are distributed across Pinot servers. A Pinot segment is a unit of partitioning, replication, and query processing that represents a subset of the input data along with the specified indices. The Pinot controller uses Apache Helix to define how segments are assigned to different servers and how they’re replicated. The Pinot broker then uses this information to scatter queries to individual servers and gather them back together. Please refer to the Pinot documentation to learn more about the architecture.
This section describes how Pinot consumes data from Kafka topics and creates the corresponding data segments to make it available for real-time queries. For the purpose of this illustration, we will set up a real-time Pinot table ingesting data from a Kafka topic with four partitions in our Pinot cluster and four servers. To create such a table within Pinot, we need to specify two elements:
"REALTIME": { "tableName": "pinotTable", "tableType": "REALTIME", "segmentsConfig": { "schemaName": "pinotTable", ... } }, "tableIndexConfig": { ... "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "<topic-name>", "stream.kafka.broker.list": "<broker-list>", ... } }, ... }
For more information, please refer to this stream ingestion tutorial.
Creating the schema and table config notifies the Pinot controller to start ingesting data for this table. At this point, the Pinot controller discovers the four partitions from Kafka and determines the starting offset from which to start consuming the data. This starting offset can be configured within the table config and defaults to “largest”—that is, the latest message in each partition. It then creates corresponding segment metadata in ZooKeeper for each of the four partitions and assigns the segments to the four Pinot servers. This in turn triggers the servers to start fetching data from the corresponding Kafka partition and create a local segment as shown below:
The segments mentioned above are known as “mutable” segments and are stored in memory in the CONSUMING state (please refer to the Helix segment state machine for more details). Each mutable segment organizes the incoming data into a columnar format and updates the required indices in real time (for instance, inverted or text indexes). The mutable segments are available for query processing immediately as they’re being built. Thus, the data freshness of Pinot matches that of Kafka given that the ingestion overhead is very low.
With any good distributed system, we want to ensure that the data is replicated. So let’s configure a replication factor of two for this real-time Pinot table. The Pinot controller ensures that each segment is replicated on two distinct servers. The replicas independently consume data from the corresponding Kafka partitions and create local copies of the segment as shown below:
At some point, the servers independently determine that it’s time to persist the corresponding in-memory segments and flush them to disk based on some criteria. Such on-disk segments are known as “immutable” segments. The criteria used for creating immutable segments can either be the amount of time elapsed since the segment was initially created or the number of rows consumed. Since different replicas may arrive at this independently (in the case of time-based completion), we need to ensure that all of them agree on the same end Kafka offset. This is done using a segment completion protocol between the replicas and the Pinot controller. The high-level idea is as follows:
Needless to say, this is a complex coordination process and involves lots of edge cases that need to be handled. For more information, please refer to this design document.
For a given Kafka partition, when the current mutable (consuming) segment is fully persisted, the Pinot controller creates the next mutable segment for this Kafka partition and assigns it to some set of Pinot servers. The starting offset of this new mutable segment is one more than the ending offset of the last committed segment.
The job of query processing is done by a stateless service known as the Pinot broker. A typical Pinot cluster has one or more such brokers deployed, and the client application can query any one of these. When the broker receives a query, it identifies which Pinot segments are needed to get the result, including both mutable and immutable segments. It then looks up the segment assignment or “routing” information via Helix and scatters the request to the corresponding servers. Each server locally executes this request, processing data from its local segments and sending the intermediate response to the broker. The broker then aggregates all the results and sends the final response back to the client.
By default, the Pinot broker selects all of the available segments for processing a given query and the individual Pinot servers prune segments based on the query filter criteria. Although this works for most cases, for high data volume Pinot tables with a lot of segments, the broker might end up contacting many Pinot servers. A large query span increases the probability of hitting a slow server and hence impacts query latency. To avoid such scenarios, you can prune segments at the broker layer by pre-partitioning.
You can pre-partition your Kafka data using something like ksqlDB by applying ''PARTITION BY'' on a high-cardinality column used frequently in the WHERE clause. You can then provide a hint to Pinot that the input data is already partitioned on this column using the segmentPartitionConfig field inside the Pinot table config. The broker uses this information to narrow down the Pinot segments that need to be queried. This is illustrated in the diagram below:
In the above diagram, data from topic1 is re-partitioned into topic2 based on the chosen column called ‘member_id’. In this case, we’ve created a Pinot table that ingests data from topic2. With this setup, let’s assume that we get a Pinot query like this:
select count(*) from pinot_table where member_id = 123 and <... other filter criteria ...>
Let’s also assume that all records with member_id = 123 belong to partition 1 of Kafka topic2. In this case, the Pinot broker only needs to query segment S1 and therefore results in a very low query latency. For more details on data partitioning in Pinot, please refer to the Pinot documentation.
In the original design of Pinot, the mutable segment was managed entirely in heap memory. This made implementation easy in the case of data structures, such as dictionaries and forward and inverted indices. Naturally, this led to large memory consumption on Pinot hosts and increased the probability of running into garbage collection (GC) issues. To alleviate this problem, we came up with the following approaches:
To learn more about the memory management and performance tuning of Pinot’s real-time tables, please refer to this document.
This blog post shared the internal workings of Pinot’s real-time data ingestion from Kafka. As described above, Pinot leverages Kafka to ensure minimal coordination between its replicas. The two different segment formats enable data to be queried immediately yet at the same time have compact, efficient, on-disk representation. We also discussed advances in off-heap memory management within Pinot’s mutable segment, which makes the real-time tables resource efficient and increases the stability of the cluster.
If you’re interested in learning more about Pinot, we invite you to become a member of our open source community by joining our Slack community, following us on Twitter, and subscribing to our mailing list.
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.
In this third installment of a blog series examining Kafka Producer and Consumer Internals, we switch our attention to Kafka consumer clients, examining how consumers interact with brokers, coordinate their partitions, and send requests to read data from Kafka topics.