[Webinar] Build Your GenAI Stack with Confluent and AWS | Register Now
Event streaming applications are a powerful way to react to events as they happen and to take advantage of data while it is fresh. However, they can be a challenge to develop, deploy, and maintain. ksqlDB, the database purpose built for data in motion, allows us to build and deploy these applications using familiar SQL syntax. An easy way to get up to speed on ksqlDB is with the ksqlDB 101 course on Confluent Developer. This course offers a mix of lectures and hands-on exercises that are both fun and informative.
We’ll start with an overview of what ksqlDB is and what we can build with it. We’ll then see how ksqlDB splits up its distributed compute layer and its distributed storage layer and how that enables us to build rich streaming applications backed by Apache Kafka®.
When using ksqlDB in Confluent Cloud, we can use the Confluent Cloud Console to build and manage our event streaming applications. The graphical editor makes it easy to iterate on our queries as well as to create new streams and tables. There are also graphical tools for monitoring the flow of our applications while they are running. We can also use the ksqlDB command line interface (CLI), which works with both Confluent Cloud and self-managed ksqlDB clusters. Anything that we can do with the UI can be done with the CLI—it just might not be as pretty. Another option for interacting with ksqlDB is the native Java client, which we can include in our custom Java applications. In addition, ksqlDB offers a REST API that can be called from any number of tools or from applications written in other programming languages.
To get data into our applications, we can create new streams and tables and tell ksqlDB to create the backing Kafka topics, or we can create streams or tables based on existing topics. Then we can insert data using SQL INSERT statements, or we can operate on data that is being produced to the underlying topics by other applications. Another option is to use ksqlDB’s Kafka Connect integration to pull data from or send data to any external system for which there is a connector.
Filtering in ksqlDB is as simple as using a SQL WHERE clause, as in SELECT * FROM stream WHERE field = 'value';. We can go further though and use this filter action to create a new stream with its own backing Kafka topic, like so: CREATE STREAM filtered AS SELECT * FROM unfiltered WHERE field = 'value';. Now every event in the unfiltered stream that matches our predicate will become part of our filtered stream.
In most cases, the data that we need to work with comes from more than one source. For example, it may be that we have a stream of events that contain some of the information we need but that other information resides in a lookup table. This table can be loaded into a ksqlDB table and used to enrich that stream of events using familiar SQL JOIN syntax. ksqlDB allows us to join multiple streams and tables to derive the exact data that we need for our stream processing.
Using ksqlDB, we can derive new streams of data from existing streams and tables, and we can also transform the data that is going into our new streams. We can drop fields, combine fields with CONCAT, rename them with AS, or change their data type with CAST. There are a host of other built-in functions that we can use to get exactly the data we need in the shape that works best for our applications.
Sometimes our data can get pretty complicated, but that’s not a problem for ksqlDB, which supports ARRAY, MAP, and STRUCT data types. If we need to transform a nested structure into a simpler form, we can use the arrow -> operator combined with AS to turn nested fields into top-level fields. Our new stream will contain the simplified form while the original stream remains unchanged. This flexibility allows us to meet new requirements without breaking existing downstream applications.
With ksqlDB, derived streams are not limited to the data format of their source streams. We may have a stream of JSON events, but what if a downstream application needs them to be in AVRO? No problem. Using the VALUE_FORMAT property, we can specify the format for any new stream or table. We can also define or modify schemas along the way.
We all know we’re not supposed to cross the streams, but sometimes we do need to merge them. If we have multiple streams of similar data, we can combine them into one stream for more efficient processing. Just as we can use CREATE STREAM ... AS SELECT to create a new stream from an existing one, we can also add to that stream using INSERT INTO ... SELECT. We can do this with each stream that we need to merge until we have the dataset that we need.
What SQL has joined let SQL put asunder. If we have a single stream that represents data that we need to differentiate, we can derive multiple streams based on the differentiator. For example, if we have a stream of sales data that we need to split into multiple streams based on region, we can do this with a series of queries using CREATE STREAM ... AS SELECT ... WHERE.
While its underlying storage is Kafka topics, ksqlDB represents the data in those topics as streams or tables. Streams are an unbounded series of events, mapping closely to the underlying topic. The events in a stream may or may not have a key. Tables, on the other hand, require a key and represent the most recent value for each key. As new events land in a table’s underlying topic, the value for its key is updated or a new entry is added if the key had not previously existed in the table. Some datasets are easier to work with as a table, such as products, while others, such as orders, make more sense as a stream. Both are useful abstractions that help us build powerful applications.
One way to derive a table in ksqlDB is to use a SQL aggregation, such as COUNT or SUM. There are several aggregation functions available and they will all result in a table. We can use CREATE TABLE AS to turn an aggregation query into a persistent table that will always contain the most recent results of the aggregation. The key to this table will be the field or fields in the GROUP BY clause.
Once we have a table that has been materialized from an aggregation, we can query it to get the current value for a given key, just like we would with a relational database. In ksqlDB, this is called a pull query, and it works like you’d expect. But since ksqlDB is an event streaming database, it also provides a query that returns a continuous stream of results. This is called a push query, and it is generated by adding the EMIT CHANGES clause, like so: SELECT * FROM stream EMIT CHANGES. This query will return new results as they arrive in the underlying stream.
ksqlDB might look a bit like magic, but there is some tried and true technology under the covers. We’ll explore that in this module, but for a deeper look, check out the course Inside ksqlDB, available on Confluent Developer.
Once we have our event streaming application up and running, we will need to be able to maintain it. For this, we can take advantage of ksql-migrations, a scriptable command line tool for managing ksqlDB streams and tables.
We’ve already seen how ksqlDB enables us to perform filtering, transformations, and aggregations on the events in a stream, but what about when our events contain arrays or maps and we need to do these same actions on each of their elements? For that, we have lambda functions. With lambda functions, we can filter the contents of an array or map field, we can transform each element, or we can run a reduce against all of the elements.
To learn more about what you can do with ksqlDB and to get hands-on practice with it:
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.
A headless data architecture means no longer having to coordinate multiple copies of data, and being free to use whatever processing or query engine is most suitable for the job. This blog details how it works.