Développez l'apprentissage automatique prédictif avec Flink | Atelier du 18 déc. | S'inscrire
Designing microservices using an event-driven approach has several benefits, including improved scalability, easier maintenance, clear separation of concerns, system resilience, and cost savings. With Apache Kafka® as an event plane, services now have a durable, scalable, and reliable source of event data. From Kafka topics, a microservice can easily rebuild and restore the state of the data used to serve end users.
Microservice architects searching for a JVM framework for developers may want to explore Micronaut. This framework embraces event-driven architecture. This article briefly introduces you to Micronaut and its benefits. Then we’ll dive into the details of integrating your microservices with Apache Kafka on Confluent Cloud. Let’s get into it.
Micronaut is an open source JVM-based framework for building lightweight microservices. At its core, it is designed to avoid reflection to improve application startup times, recomputing injected dependencies at compile time rather than runtime. Per the documentation, Micronaut supports best-practice patterns for building JVM applications, such as dependency injection and inversion of control, aspect-oriented programming, sensible defaults, and auto-configuration.
We'll cover two use cases to illustrate Micronaut’s integration with Kafka. Both use cases apply a “listen to yourself” pattern, with a REST controller sending commands to alter data to Kafka. From there, a listener processes the Kafka events and updates the underlying data model. Query requests call a data source directly via JPA.
There are two examples because we want to highlight data serialization in Micronaut. We’ll begin by letting Micronaut use its sensible defaults to infer how the key and value should be serialized. Then, we’ll pivot to using the Stream Governance capabilities of Confluent Cloud to manage Apache Avro™ schemas for our structured data.
With the Micronaut framework, we can also build message and event-driven applications. And, yes, that includes Apache Kafka—as well as RabbitMQ®, JMS, and MQTT. But we’re here for Kafka, so let’s get into that. For starters, add the micronaut-kafka dependency to the application build. Here it is in Gradle:
Next, we should explore how to configure a Micronaut application to use Apache Kafka. We prefer using YAML, TOML, or Hocon—instead of a properties file—for the sake of legibility. To use YAML, you need to add snakeyaml as a runtime dependency to your build:
Now that we have that in place, we can start to configure our connection to Apache Kafka—in our case on Confluent Cloud. One of the key features about Micronaut is the use of sensible defaults in application configuration. The examples will inject those values from environment variables to protect our Confluent Cloud connection parameters and credentials. Here’s the basis of our Confluent Cloud connection:
Our first use case will adhere to the Micronaut defaults as closely as possible. In this scenario, a REST controller sends events about product price changes to a Kafka topic. A listener to that topic updates the underlying data model, and subsequent queries use the controller's GET endpoints to retrieve the Product entities from the data store.
A primary concern in event streaming is data serialization. This is vital to any distributed data contract—event data needs a known structure, full stop. And there are multiple ways to achieve this. Given the nature of the listen-to-yourself pattern, the events published are intended to be consumed by our application. This won’t always be the case; we’ll elaborate on this later. For now, let’s use this pattern to highlight the default serialization methodology used by Micronaut.
When serializing data to Kafka, Micronaut takes an “educated attempt” at inferring the Serializer implementation to use given the data type of the key and value. Our initial pass at producing data for the product-price-changes topic is to serialize the key as a String, and the value is a ProductPriceChangedEvent record. Looking at the ProductPriceChangedEvent record, we see the @Serdeable annotation:
Applying the @Serdeable annotation indicates that this ProductPriceChangedEvent class is to be serialized and deserialized as JSON. Using the DefaultSerdeRegistry class makes these selections at compile time via introspection. This directly contrasts with frameworks like Spring, which make these decisions at runtime, a much less efficient process.
Sending events to Kafka starts by decorating an interface with the @KafkaClient annotation.
The ProductPriceChangeClient interface is annotated as a @KafkaClient (1). This annotation provides AOP advice on how to configure a KafkaProducer instance. This example does not provide a value property and will use the default producer configuration. More on that in a bit.
To send events to a Kafka topic, we annotate a method of the interface with @Topic (2), providing the name of the topic to which events are sent. The parameters of this send() method (3) are the elements of the event to send to Kafka. If this method had only one parameter, it would be implied as the value of the resulting ProducerRecord. In this case, we have two parameters, one of them annotated with @KafkaKey (you guessed it, this will be the key of the ProducerRecord with the second parameter as the value). The @MessageHeader annotation could add parameters to this method, adding headers to the resulting ProducerRecord.
To consume these ProductPriceChangeEvent records from Kafka, let’s decorate a class (not an interface) as a @KafkaListener.
The @KafkaListener (1) encapsulates the configuration and creation of a KafkaConsumer. Like @KafkaClient, the absence of a value parameter means we’ll use the default consumer configuration here. However, we can include other Kafka ConsumerConfig values here, such as the auto.offset.reset, and group.id values exposed through the annotation.
The handle() method is annotated with the @Topic annotation (2), specifying the Kafka topic from which it consumes events. Here, the handle() method has one parameter (3), which is implied to be the value of the underlying ConsumerRecord. This simple example calls a method on the ProductRepository to update the price of a Product.
Let’s revisit the configuration terms we breezed through in the previous section—specifically the concept of default configuration for producers and consumers.
Starting with the producer configuration, we can provide a default configuration for any @KafkaClient-annotated interfaces to use when those classes do not specify a producer configuration. This is useful when all producers in your application use the same serializers (both key and value). But it will also be the fallback for any other producer configurations for which you may specify values, overriding the defaults. The example below configures the default producer to use StringSerializer for the key and ByteArraySerializer for the value of produced events.
The same premise applies to configuring classes annotated with @KafkaListener to configure the defaults for Kafka consumers. You could define an application-wide default for group.id and other KafkaConsumer configuration values here.
In the listen-to-yourself pattern for microservices, the serialization strategy detailed in our first example may suffice. Perhaps the product-price-changes topic is “self-contained,” meaning there are access controls that restrict write and read access to this data in our microservice. Perhaps this isn’t part of our canonical data model.
When we work with data from the canonical model—data that has meaning across the organization—we need to ensure data quality and integrity. There is no better place to do this than straight from the source, as far “left” as possible in the lifecycle of our streams. This is where stream governance comes into play, specifically building, maintaining, and evolving the schema of the data in our canonical model. We would like the entire organization to sing from the same sheet of music.
Confluent Cloud provides stream governance centered around the Confluent Schema Registry, which we can use to manage and evolve data contracts. Confluent also provides implementations of Kafka’s Serializer and Deserializer interfaces for Kafka clients that are “Schema Registry-aware.” These include industry-standard serialization libraries like Avro, Google Protobuf, and JSON with schema.
Let’s create Avro schemas for the concept of an order, which will be updated via the controller class, sending an order change event to a Kafka topic. When processed, this event updates the items in the order. Here are the schema definitions:
Notice there are two record types in this schema: an enum called OrderChangeEventType (defined inline) and a record OrderItem. Let’s define the OrderItem schema:
A common practice in the Java community with Avro is to generate Java bean classes, providing compile time safety and easier code to read. Since we’re using Gradle, we include the com.bakdata.avro Gradle plugin in our build:
This gives us Gradle tasks to generate the Java bean classes by running this command:
Now we have Java beans that implement the SpecificRecordBase interface of the Apache Avro Java SDK.
Micronaut allows multiple producer definitions in the configuration file. Given that these order changes are serialized differently than in the previous use case, the producer must be configured with the appropriate serializers. Here is the additional producer configuration:
Here the order-changes producer overrides the default value of the key and value serializers—LongSerializer and KafkaAvroSerializer, respectively. Since KafkaAvroSerializer is Schema Registry-aware, we provide the configuration needed to connect to Confluent Schema Registry.
We apply this configuration to the interface annotated as the @KafkaClient for our order changes as follows:
Similarly, we define the consumer configuration to correspond to the producer configuration for order changes:
With this configuration in place, let’s update the @KafkaListener-annotated classes to use them:
As you can see, the annotations provide easy overrides from the configuration. For instance, if we didn’t specify a group.id in the @KafkaListener annotation, the fallback is to the value from the application.yaml file.
Theoretically, this is all great. But let’s see it in action, with events and a data store. Here are some prerequisites to running the application:
Java 21 (we use SDKMAN! to manage multiple JDK installations)
Editor/IDE of choice
You can start by cloning the Confluent demo-scene repository from GitHub. The examples in this article are in the micronaut-cc directory.
As helpful as the Confluent Cloud console may be, Terraform is used here to provision Confluent Cloud environments, Kafka clusters, Stream Governance, and access controls.
With the repository cloned, open a terminal and go to that location and the micronaut-cc directory. There you’ll find a terraform subdirectory. (If this is your first foray into Confluent Cloud CLI, pay particular attention to the environment variables steps in the README file.) Let’s go there and execute the following commands:
The first command is vital to the process because we are authenticating to Confluent Cloud and using the CLI (1). From there, we need to preserve our Confluent Cloud organization id value for later Terraform steps to use. Looking at variables.tf, there is an org_id variable that needs to be defined. Terraform allows for adding these values as environment variables, prefixed with TF_VAR_
. The confluent organization list command returns the organization(s) of which the authenticated user is a member as a JSON document. That output is piped to a series of jq commands to find the id of the current organization. (Note: If your user is a member of MULTIPLE organizations, this might not be deterministic. But for our demo purposes, this works.)
As the name implies, terraform init (2) initializes the Terraform environment by pulling in the needed providers used by our Terraform code. Next, we create a “plan” for Terraform to execute (3), comparing our Terraform code to the known state of the environment to determine the changes to be made. Finally, the “plan” is applied (4) to the environment.
After the Confluent Cloud environment changes are applied, we can use the terraform output command to extract the environment and credential information to a properties file. This protects us from accidentally committing these sensitive values to our Git repository.
We can then inject these key-value pairs as environment variables used by our application at runtime.
This microservice uses MySQL as a data store—just to illustrate the processing of our events by consumer classes. With that in mind, you need to start a MySQL instance using Docker—here’s a command to get you there:
Once the database is started, create a schema named micronaut for use in our microservice. This can be done via the command line by opening a bash shell to the running container:
Going back to the application.yml file, we configure Micronaut to use this database:
Also, we want to leverage Micronaut's FlywayDB capabilities to read our JPA entity classes and create the appropriate database tables.
The following examples leverage IntelliJ IDEA. As such, the Run Configuration for our Micronaut service would look like this:
Notice the “Environment variables” value, which points to the same file we created in the terraform output command in the previous section. IntelliJ IDEA will parse these key-value pairs into environment variables, satisfying the environment variable replacement used in the application.yml file for values like the bootstrap servers, schema registry URL, and credentials for Confluent Cloud.
With the application running, let’s exercise the endpoints in the microservice. Once started, the default host and port is localhost:8080—that’s the root of our requests going forward. The controllers include a random PUT endpoint to generate events to be posted to Kafka. Here’s an example of posting a random OrderChangeEvent:
After executing this random endpoint several times, we can see the data in the Confluent Cloud console:
And the Data Contracts tab shows the Avro schema of our event data:
Why even bother with a framework if it doesn’t allow developers to create reliable, deterministic unit tests? Micronaut has you covered in this department, with bindings for the most popular test frameworks on the JVM: Spock, JUnit 5, and Kotest (for the Kotlin developers in the house).
Micronaut provides hooks to Testcontainers to test integrations with external systems—like databases, messaging systems, and the like. And yes, that includes Apache Kafka. With Testcontainers, your code is actually exercising the client libraries' functionality and drivers' functionality to interact with a containerized instance of that external system. And here’s how…
In our example, let’s create a test class for the OrdersController, which has a PUT endpoint to send an OrderChangeEvent to a Kafka topic. We want our test to validate that the event was actually produced. What better way than to consume the events and assert that the event is what we expected the code to produce?
Here’s the beginning of our test class:
For starters, our test class is annotated as a @MicronautTest (1), including a reference to a YAML configuration file on the classpath. Next, we use the @TestContainers (2) annotation to control the lifecycle of any containers in this test instance. We implement the TestPropertyProvider (3) to inject the configuration of the containers into the application. (More on that in a bit.)
For our Testcontainers, we create a Network (4) so that the Kafka broker container (5) and Schema Registry container (6) can communicate. Since Confluent Schema Registry uses Kafka for storage, we configure the SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS with the value from the Kafka broker container (7). The schemaRegistryUrl (8) value will be set later as our test class initializes, and we’ll reuse that in several places.
Implementing the TeatPropertyProvider interface means we have to implement getProperties(), and we do this with values from our running containers. The resulting Map<String, String> is injected at the startup of our Micronaut application:
We use the REST-Assured API to test our controller's methods. Using a data faker, we generate a randomly populated OrderChangeEvent. Since the PUT endpoint accepts JSON data in the body, we must serialize the Avro object as JSON. (See the toJson() method.)
With a REST-Assured RequestSpecification, we create a request with the appropriate header(s) and JSON-encoded body. Then we use the appropriate full URI to the PUT endpoint with path variable substitution. First, we assert there was a 201 HTTP response. Then, using a KafkaConsumer instance, we poll the order-changes-avro topic, filtering the resulting ConsumerRecords until we get an event whose key matches the orderId value sent to the HTTP endpoint. The Awaitility SDK is very useful for these asynchronous test scenarios, allowing us to retry for a fixed duration or until the assertion is true.
Event-driven microservices based on Apache Kafka have become an industry standard pattern. If you’re Micronaut-curious, we hope you find these code examples useful when integrating with Kafka. Here are some helpful resources to take you further in this journey:
The examples in this post are available in the micronaut-cc directory of our demo-scene repository. Feel free to leave feedback in GitHub or on social media channels.
Apache®, Apache Kafka®, Kafka®, and Apache AvroTM, are trademarks of Apache Software Foundation.
Micronaut® is a trademark of Micronaut Foundation.
RabbitMQ® is a trademark of Broadcom, Inc.
With both Confluent and Amazon Redshift supporting mTLS, streaming developers and architects are able to take advantage of a native integration that allows Amazon Redshift to query Confluent Cloud topics.