[Atelier] Le traitement des flux en toute simplicité avec Flink | S'inscrire
A commonly asked question is whether Apache Kafka® can replace a database. It’s not uncommon to see Kafka paired with a database for typical database needs, like long-term storage, fast lookup, transaction handling, complex queries, and more.
In this blog post, we’ll walk through how to pair Kafka with a database to support CRUD operations on a customer profile. Then, we’ll improve on that design by entirely removing the external database and just using Kafka, making the architecture simpler and saving on costs.
In theory, yes. A database is defined as an organized collection of data, generally stored and accessed electronically from a computer system. Kafka uses a database infrastructure for storage, queries, and data processing, often with specific delivery and durability guarantees (aka transactions).
The real question: Should you use Kafka as a database? There are many types of databases available, like relational database management systems (RDBMS), NoSQL, and big data. Choosing to implement one of these or Kafka alone comes down to your needs and requirements. Ask yourself: How long do I need to store this data? What structure should the data have? Will I need complex queries or just key and value retrieval? Do I require ACID transactions, exactly-once semantics, or just once delivery guarantees?
Once you’ve answered these questions, you can decide between using Kafka alone or adding a relational database like MySQL or Postgres, a big data batch platform like Hadoop, a document store like MongoDB, a key-value store like RocksDB, a time series database like InfluxDB, an in-memory cache like Memcached, or something else. Every database has different characteristics.
For a thorough consideration of how “Kafka Core” (meaning Kafka brokers and its concepts, like distributed commit log, replication, partitions, guaranteed ordering, etc.) fits into the ACID (atomicity, consistency, isolation, durability) transaction properties of a database, review Martin Kleppmann’s Kafka Summit San Francisco 2018 keynote “Is Kafka a Database?”.
TL;DR: Kafka is a database and provides ACID properties. However, it works differently than other databases.
In general, no. Kafka is not a replacement for existing databases like MySQL, MongoDB, Elasticsearch, or Hadoop. But you may not need another data store in addition to Kafka. Each database has specific features, guarantees, and query options. For example, use MongoDB as a document store, Elasticsearch for text search, Oracle or MySQL for traditional relational use cases, or Hadoop for a big data lake to run map/reduce jobs for reports.
The question shouldn’t be Kafka or database XYZ? Databases and Kafka are complementary!
When asking yourself if you need another data store in addition to Kafka, first consider:
Kafka can store data forever in a durable and highly available manner along with ACID properties
Kafka possesses different options to query historical data
Kafka-native add-ons like ksqlDB or Tiered Storage make Kafka more powerful than ever for data processing and event-based long-term storage
Build stateful applications leveraging Kafka clients (microservices, business applications) without the need for another external database
In summary, Kafka has database-like properties such as durability, consistency, and infinite retention. However, it works differently than other databases, and it's not a great choice for a fast key/value lookup or ad hoc queries.
Let’s see what a sample microservice architecture on an application paired with a database looks like, and then see what it looks like when we replace the external database with a combination of Kafka and ksqlDB.
Let’s design an application to onboard new customers. It should create a new customer profile for each customer, and also support updating, reviewing, and deleting these customer profiles.
The architecture is simple. We can use WebApp to:
Onboard a customer
Update a customer profile
Delete a customer profile
Each action will trigger respective CRUD calls to a backend service called the API Manager, which then produces events to a Kafka topic:
Here are some use cases that this architecture should support:
Customer create: WebApp calls a createCustomer
method on the API Manager, which produces a record on the customer topic with a message key (example, CustomerID) and value as the customer profile. The API Manager also inserts one record in the database (probably in the same transaction). This insert to the database is done to facilitate fast read/update/delete
operations that may be triggered by the WebApp.
Customer review: WebApp calls the API Manager; however, since the API Manager requires a fast lookup for that CustomerID, it chooses to make a call to the database (not the Kafka topic) and gets the record back in low latency. (By default, Kafka consumer doesn't support fast lookup.)
Customer update: Kafka is an immutable log store, so you cannot go to a record and update a message on a topic. To address this, API Manager produces an updated record on the Kafka topic with the same message key (CustomerID). This updates the customer profile, and also updates the same record in the database (probably in the same transaction).
Customer delete: Since it's an immutable log store, you cannot delete a record on a topic. Another record on the topic with the same key (CustomerID) will be produced, with a null value that will delete that record from the database.
So do we really need an external DB?
Let's reconsider the following points to evaluate if we really need an external database:
Customer data is stored in Kafka and the external database.
The API Manager will read data from the database, but other microservices will read data from the Kafka topic (no clear source of truth).
An application may choose to read from a Kafka topic when it requires a series of events, and for a fast lookup/edit, it will choose the database (different actions are routed to different systems).
Customers have to plan for the resiliency of this database, which results in added cost and complexity when using a database.
Constant need to serialize/deserialize/store to database/fetch from database is not very performant and efficient.
Kafka has similar properties to a database, however, Kafka stores data as a series of events while other database’s co-locate data based on the keys and indexes to facilitate fast lookup.
In a database key, indexes are assigned at the table or document level, whereas in Kafka, keys are assigned at the message level. When comparing some characteristics of Kafka to any database, generally a database offers a fast lookup based on a key or index. However, inKafka fast lookup is supported on message offset or message timestamp.
Now, this is where things get interesting. With a database, when you search based on a key/index, it's a fast index scan for that value, and you may quickly get one more record based on that key/index. However, in Kafka, when you search for a key, that key may be present at multiple different offsets on a partition or different partitions. So unless you know the exact offset where the key is present, the Kafka client has to perform a scan of the entire partition and search a message key:
CustomerID 100, is present at offset 99, 97, and 95. Does this imply fast key-based lookup is not possible in this Kafka database?
In the previous section, we saw that a simple read operation looking for a specific key isn’t very performant, since Kafka as a database doesn't support key-based lookup.
ksqlDB bridges this gap by providing you with an option to store the latest data locally on the computing node. This stored data on a local node is also called a “state store.” It's important to know thatksqlDB leverages RocksDB as a state store.
When processing stateful operations using ksqlDB, ksqlDB always uses state store to store and query data. We can use the same state store to do the fast key-based lookup.
To help illustrate, consider the following stream-table join, which requires looking up the corresponding row in the table for each event in the stream:
|
In ksqlDB, tables (users) are materialized into RocksDB stores on the computing node to provide fast and memory-efficient key lookups. Each time a query processes an event from the source table topic, it will serialize the event into the format required and execute upserts into RocksDB. Next, when it receives an event from the source stream (logins), it looks for the user value in the materialized store instead of scanning the topic.
The state store is also replicated by ksqlDB on a highly durable Kafka topic called the changelog. These topics are used to back up and restore the local state, if a new node comes online (or an old one is physically relocated). A changelog topic is a log compacted, internal topic to ksqlDB that contains every change to the local state store; the key and value for events in the changelog topic are byte for byte the same as the most recent matching key and value in the RocksDB store. Leveraging this topic makes recovery simple: You can scan the changelog topic and put each Kafka message directly into the state store without any additional processing.
This concept is described in a lot more detail with animations in this blog: How Real-Time Materialized Views Work with ksqlDB, Animated.
For high availability of ksqlDB on Confluent Cloud, it’s recommended to use eight or more CSUs to enable fault tolerance, faster failure recovery, and standby replication of your ksqlDB cluster.
Here is the summary of the storage available with the number of CSUs:
You can also monitor used storage of ksqlDB on the Confluent Cloud using the metrics API.
A materialized view, sometimes called a materialized cache, is an approach to precomputing the results of a query and storing them in the state store for fast read access. In contrast with a regular database query, which does all of its work at read time, a materialized view does nearly all of its work at write time. This is why materialized views can offer highly performant reads.
Let's create a materialized view for our sample dataset.
|
Since Kafka has properties similar to a database as well as log compaction, state store, and materialized views, here is the revised architecture:
Assuming a materialized view has been made on the Customer topic, here is the revised flow of events when a CRUD operation takes place on a WebApp:
On create: The user goes to the WebApp and creates a new customer record. A REST call is then made to the API Manager. API Manager produces a record on the Customer topic with the key as CustomerID. Since it's a materialized view, this record is added to the internal state store of ksqlDB.
On read: The user goes to the WebApp and looks for an existing Customer by entering the CustomerID. A REST call is then made to the API Manager. The API Manager does a fast lookup for the CustomerID on the state store and gets a response back with low latency.
On update: The user goes to the WebApp and updates a customer. The API Manager produces an updated customer record on the topic with the same CustomerID, and the state store of ksqlDB is updated with the new record value.
On delete: The API Manager produces a record with key as CustomerID and value as null (a “Tombstone” record, which marks records to be deleted from topic and state store).
Single key lookup based on a message key is the fastest way to access data stored in the state store. Here is an example of making a single key lookup to a materialized view using POSTMAN:
Note: In this example, a REST call was made over the internet to 1 CSU ksqlDB hosted on Confluent Cloud and a basic Confluent Cloud cluster, the topic had a million records, and performance may vary based on your setup.
In ksqlDB versions older than 0.22, pull queries with a WHERE clause using a range expression were implemented as full table scans, that is, all the records from a table were retrieved from the state store and then filtered within ksqlDB. With the 0.22 release, range queries on the primary key are now optimized to retrieve the exact range of records from the underlying state store, which is both faster—because no additional filtering is needed—and also more efficient in terms of I/O.
To illustrate the performance advantage of the range-scan optimization, we ran a benchmark on a table with 10 million rows. We generated 10 million records with monotonically increasing (profile) keys and issued the following query to retrieve a subset of all the records smaller than a certain threshold:
The figure below illustrates the advantage of the range-scan optimization compared to non-optimized implementation based on table scans.
More details can be found on this blog post on optimizations for range-scan expressions.
Here is a sample query to get a batch of records from the materialized cache. In this example, the key indicating when the record was created is truetickets_
currenttimestamp
. And in the query, we want to retrieve all the records within a certain time window (range scan).
Note: Since these keys have different timestamps, by default they will end up in different partitions of a topic; however, you can write a custom partitioner and override the default implementation to choose a partition based on the CustomerIDonly (ignoring the timestamp).
Let’s review the pros and cons of using a state store.
Pros:
Automated state management using state store (where the latest state resulting from a series of events can be captured from the state store)
Use only Kafka for all CRUD operations
A more simplistic architecture, also the data is always in the same serialized format
KTable can be shared based on multiple ksqlDB instances based on the key
Cheaper than hosting and maintaining an external database
In-memory database, so it's very fast
State store supports index and prefix/range scan
Cons:
State store by default runs on a single node (can be configured to have standby nodes) and hence the storage is not highly available. In the event that the hosting node/storage is lost, the state store has to be rebuilt from the changelog topic (alternatively, you can keep a cold state store ready as standby)
Cannot execute complex queries
Single point of failure (if shared between multiple microservices)
On Confluent Cloud, maximum storage capacity is limited
Every business requirement is unique, and the required architecture will vary to address that requirement. Always carefully evaluate your options before attempting to replace a database with Kafka, and when deciding whether to add a certain database based on some special architecture requirements.
To get started:
Produce data to Kafka (in AVRO/JSON/Protobuf format)
Ensure produced data uses a valid message key
Use ksqlDB to create a materialized view on that topic
Use ksqlDB REST API to make calls to the state store
The following are recommended assets to learn more:
Marin Kleppmann’s 2019 keynote: Is Kafka a Database?
Kai Waehner’s 2021 talk: Can Apache Kafka Replace a Database?
Article: How Real-Time Materialized Views Work with ksqlDB Animated
Article: Announcing ksqlDB 0.22.0
Documentation: RocksDB
Article: How to Tune RocksDB for Your Kafka Streams Application
Documentation: Sizing ksqlDB for Self-Managed Setup
Documentation: Configuring a Streams Application
Course: Inside ksqlDB: High Availability
Documentation: ksqlDB Capacity Planning
Documentation: ksqlDB Monitoring
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.
In this third installment of a blog series examining Kafka Producer and Consumer Internals, we switch our attention to Kafka consumer clients, examining how consumers interact with brokers, coordinate their partitions, and send requests to read data from Kafka topics.