Développez l'apprentissage automatique prédictif avec Flink | Atelier du 18 déc. | S'inscrire
Hey, you! Yeah, you! The puzzled-looking Spring Boot developer, scouring the web for a guide on integrating your microservices with Apache Kafka® on Confluent Cloud with Stream Governance. Admit it, you’ve been Googling nonstop for the past hour and all you’ve found are examples using StringSerializer/StringDeserializer with not even the slightest mention of "schema registry-aware" serialization methods. And I bet the examples you found are implemented in Java. Wouldn’t it be nice to find at least one written in Kotlin (after all, Kotlin is a first-class citizen in the Spring world)? We welcome you. This is a safe space. Let’s get into it.
The Spring Framework and Apache Kafka have been fairly simpatico for a number of years, allowing Spring shops to ease the transition to event-driven microservices. This means your organization’s yearslong investment in building, deploying, testing, and monitoring Spring Boot applications doesn’t have to restart from scratch when moving into the world of data streams.
In this guide we’ll first look at producing events to Apache Kafka topics through the lens of Confluent Cloud, producing events serialized with Apache Avro, and registering schemas with Schema Registry. Then we’ll shift our focus to the consumer side, via the @KafkaListener
annotation.
Why not? But seriously, Kotlin is popularly labeled as a JVM language that simplifies the development process for cross-platform projects. This reduces the time spent writing and maintaining code for different platforms while reaping the benefits and flexibility of native programming. Kotlin has gained popularity since its inception with developers for its concise syntax, null safety, and interoperability with Java codebases.
Kotlin is a great fit for server-side applications—as evidenced by its support in popular frameworks such as Spring, Ktor, Quarkus, and Micronaut, just to name a few. Several factors contribute to this adoption—per the Kotlin documentation. The factors that really stand out are the expressive nature of the Kotlin language and coroutines to allow server-side applications to take full advantage of modest hardware allocations. For shops using Kafka, Java interoperability allows you to continue using your favorite Java libraries—which may include Spring Framework libraries.
These features—and more—help to flatten the learning curve of Kotlin for Java developers. If sharpening your Kotlin skills sounds interesting, check out Kotlin Koans.
JDK 21 (we strongly suggest sdkman)
Your favorite IDE; for Kotlin, we suggest IntelliJ IDEA (community edition will suffice)
Clone the GitHub Confluent demo-scene repository. Our examples are in the spring-into-kafka-cc module.
You need a Confluent Cloud account to get started. Within the account, let’s create an environment that includes an Apache Kafka cluster and Stream Governance to host our Schema Registry. When it comes to provisioning infrastructure, Terraform is our “love language"—and the Confluent Terraform Provider allows you to quickly and deterministically create the assets you need to get started.
In the spring-into-kafka-cc directory, change to the terraform directory. Here we’ll find the Confluent Terraform Provider defined in main.tf:
For more on the Confluent Terraform Provider, see the Confluent Cloud docs.
From the terraform directory in your terminal, let’s initialize Terraform—downloading the providers needed in this project:
The Confluent Terraform Provider needs to know the Confluent Cloud Organization ID to which it will apply the changes. To get this value and export it to an environment variable which can be used by Terraform, use this command:
The command above uses the Confluent CLI to list the organizations to which we belong in JSON format. Then jq is used to query for the "current" organization, getting the id value from the JSON object. This id value is then exported to TF_VAR_org_id—as Terraform treats any environment variables prefixed with TF_VAR_ as input variables. This will satisfy the required value of the variable org_id specified in variables.tf:
Now we can create and apply a plan to build our Confluent Cloud infrastructure.
Once completed, use the Confluent Cloud Console to verify that the specified resources were created. There are also outputs from terraform apply which we’ll need in upcoming steps. Let’s export those to a properties file, again using a little "command-line-fu":
Here we use terraform output formatted as JSON, then parse the document with jq. For each entry, we create a key-value pair in a properties file in a directory in the current user home tools subdirectory. We’ll use this location later when configuring our Spring Boot application.
Our example uses Avro-serialized events, generating Java classes from Avro schema documents via a Gradle plugin. Those classes provide us with type-safety and compile-time checks on the data before attempting to send events to Kafka.
CustomerCommand
events consist of a CustomerAction
enum type and a Customer
entity on which that action is applied. In the common/msrc/main/avro directory, you’ll find Avro schema files (with the .avsc extension) to define these objects.
The Customer
is defined here:
Above, we see a Customer
has a mailingAddress of type Address
, defined here:
To generate Java classes from these schemas, we use the com.bakdata.avro gradle plugin, and define where the generated sourceSet is to be located in the project directory (in this case build/generated-main-avro-java):
This plugin gives us the following tasks for source generation, specifically generateAvroJava. This task is configured to execute anytime we run the compile task in our Gradle build:
When configuring Spring Boot applications, we tend to default to YAML over properties files. We find the structure of YAML much easier to navigate. But, ultimately, it’s personal preference. Also, IntelliJ IDEA (and other IDEs) provide Spring support—with features like autocomplete—to make editing less prone to human error.
Confluent Cloud requires a number of configuration properties for authentication and authorization to both Kafka and Schema Registry. In this demo, we have externalized those bits to a properties file on the classpath named cc.properties. Include the host name of your broker and schema registry, along with your credentials to each.
As a part of Spring Boot, Spring Cloud Config provides a method of importing external configuration as environment variables which can be used in the application.yml (or .properties) file. Remember the file we created using terraform output in a previous section? Now we will use spring.config.import to make the key-value pairs of that file environment variables, which we can use in the application.yml file, as follows:
Note that spring.config.import can be directed anywhere on the addressable file system as well as external sources such as an HTTP endpoint. For more information, check out the Spring Cloud Config Client documentation.
The end game here is to produce Kafka events with an Avro-serialized value to a topic. With Spring (specifically spring-kafka), we can achieve this with very little of the boilerplate code you may find in other JVM-Kafka integrations.
From the application.yml file in the previous section, you’ll see our key and value serializers for our producer are already configured—as we’ll produce events with a String key and some Avro-serialized value. The data will be compressed using the zstd codec.
A typed KafkaProducer<K, V>
is configured and encapsulated within a KafkaTemplate<K
, V> instance in the spring-kafka library. KafkaTemplate<K, V>
relies on a Spring-managed ProducerFactory<K, V>
implementation. In its most simple form, we can use the @Autowired
annotation to inject a KafkaTemplate<K, V>
into the class tasked with sending events to Kafka.
Let’s have a quick aside about KafkaTemplate. The template pattern is a Gang of Four design pattern that defines the skeleton of an algorithm in a superclass. The behavior of the specific steps of the algorithm are implemented in a subclass. The template pattern can be found in many places throughout the Spring Framework—JdbcTemplate, JmsTemplate, and AmqpTemplate, to name a few.
You may have found examples where ProducerFactory<K, V>
and KafkaTemplate<K, V>
are defined in a @Configuration
annotated class. For the purposes of our demo, the provided DefaultProducerFactory<K, V>
created from our configuration file will suffice.
Now we can create a function to send events to Kafka:
This function takes a Customer
and a CustomerAction
and uses those to create a CustomerCommand
. The email attribute of the Customer
is used as the key of the ProducerRecord<K, V>
. With the ProducerRecord
created, use the kafkaTemplate.send()
method to produce the record to the specified Kafka topic. The send()
method returns a CompletableFuture
, and in some cases you may not care about the outcome of send()
. For our demo, using the whenComplete
function as a callback provides some insight into the result of our call to kafkaTemplate.send()
.
It’s really that simple to send events to a Kafka topic from a Spring application. Additional producer configuration options can be added to the application.yml as producer.properties—batch.size, linger.ms, retries, acks—as needed for your use case.
If you thought the producer side of Spring Kafka encapsulated a lot of the boilerplate you’re accustomed to, check out the consumer side. The configuration section detailed a consumer configuration where we expect events with a String key and an Avro value. But we want to operate on typed data, so we add the consumer configuration property specific.avro.reader with a value of true such that the underlying ConsumerFactory<K, V>
will attempt to deserialize the Avro bytes to the CustomerCommand class. Here we also provide a consumer group-id value—which translates to group.id in the ConsumerConfig
properties. Finally, auto-offset-reset is set to earliest, mapping to auto.offset.reset in the Kafka consumer configuration.
Now we just need to annotate some function with a CustomerCommand input parameter to listen to our customer-commands-avro topic:
This function simply logs the input event, returning a Unit type. This is where we could add additional processing of the inbound event. For instance, we could have autowired a class with functions to handle the various CustomerAction enum values and use pattern matching to call the appropriate method:
Spring Boot, Apache Kafka, Confluent Cloud, and Kotlin—a beautiful friendship. We hope you found this demo to be a useful starting point with Spring Boot and Apache Kafka. There’s so much more to discuss. In future posts, we’ll look into topics such as testing (unit and integration tests) and transactions. We’ll also have a look at Kafka bindings in Spring Cloud Stream.
Be sure to follow my social channels for updates on this series. And if you think of other Spring-related topics involving Apache Kafka you’d like me to cover, drop me a message or mention.
Continuing our discussion of JVM microservices frameworks used with Apache Kafka, we introduce Micronaut. Let’s integrate a Micronaut microservice with Confluent Cloud—using Stream Governance—and test the Kafka integration with TestContainers.
With both Confluent and Amazon Redshift supporting mTLS, streaming developers and architects are able to take advantage of a native integration that allows Amazon Redshift to query Confluent Cloud topics.