[Webinar] Build Your GenAI Stack with Confluent and AWS | Register Now
Today, every company is a data company. There are many different data pipeline, integration, and ingestion tools in the market, but before you can feed your data analytics needs, data must be collected before any processing can begin. Often collecting and delivering data to the right place in the right format is even more challenging than data analytics. In my Kafka Summit session, I shared about my team’s experience with replacing an SIEM vendor’s data collection layer with Kafka Connect. Your data may be different than ours, but the data collection approach will likely look the same. This post expands on my Kafka Summit session and focuses on how to collect data from many different remote hosts and services with more explanation on the technical details.
Once you have Kafka up and running, it’s time to feed it with the data. When collecting data, there are two fundamental choices to make: Are we going to poll the data periodically (pull), or will the data be sent to us (push)? The answer is both! Let’s take a look at how using these three components:
First, here’s a quick recap on what the NettySource connector is and why you need it. The connector was developed to receive data from different network devices to Apache Kafka®. The connector will help you to receive data using both the TCP and UDP transport protocols and can be extended to support many different application layer protocols. Support for some of these protocols are already available, but for others where it is not available, the connector makes new implementations easy.
Let’s walk through different connector configurations from the simplest to the most advanced.
connector.class=com.mckesson.kafka.connect.nettysource.NettySourceConnector topic=network_data transport.protocol=TCP bind.address=10.0.0.10 port=1111
This minimal configuration will allow you to receive network data as is. Minimal configuration options are required to run the connector:
Note that different defaults will be applied depending on the selected transport protocol. If TCP transport is configured, the input stream will split into records using \n or \0 terminating bytes; for UDP, records will be created from each datagram packet.
ℹ️ | Tip: The easiest way to test that the connector is running and receiving data is to use Linux:
$ echo -e "Test Message\n" | ncat localhost 1111 |
Kafka record headers can be used for different purposes, and the connector’s default implementation adds the following headers for each produced Kafka record:
Headers can be used to route, filter, and transform messages from different hosts.
For TCP transport, cryptographic protocols can be configured. To enable TLS, add the following configuration:
ssl.enabled = true ssl.keystore.type = JKS ssl.keystore.location = keystore.jks ssl.keystore.password = <password> ssl.key.alias = my_key ssl.key.password = <password>
For more configuration options, check out the Kafka docs or connector documentation.
ℹ️ | Tip: To validate SSL/TLS configuration, use openssl commands:
$ echo -e "Test Message\n" | openssl s_client -connect 127.0.0.1:1111 |
In the real world, basic configuration is not enough. Fortunately, there are some extended configuration options available.
You may find that one instance of the connector is not sufficient to consume a large amount of data. To address this, you can configure multiple tasks. Multiple tasks may happen to run on the same Kafka Connect worker, so to avoid port conflicts, you can define multiple ports:
tasks.max=3 ... ports=1111,2222,3333
If multiple ports are configured, each task will attempt to start listening to listed ports in the order that they are declared. If one port is already in use, the next from the list will be taken; if all declared ports are in use, the task will fail.
Once you’ve configured multiple tasks, the next reasonable step would be to place connectors behind a load balancer. To enable “active check” on the load balancer, we need to enable the health check listener on the connector side:
healthcheck.enabled=true healthcheck.port=4444
Once you’ve enabled the health check, a simple TCP listener will start on the configured port (use healthcheck.ports if multiple ports are configured).
To implement a highly available data receiving service, you can run multiple Kafka Connect clusters that send data to the same Kafka cluster. This allows you to perform connector maintenance with zero downtime in receiving the data:
You can perform this using any load balancing solution. HAProxy for TCP balancing and Keepalived (IPVS) for UDP. Below are very simple configurations for multi-backend Kafka Connect clusters.
HAproxy:
frontend test_1111 mode tcp bind *:1111 timeout client 10m maxconn 100 default_backend test_1111
backend test_1111 balance roundrobin timeout server 5m option tcp-check tcp-check connect port 1111 ## KC backend #1 server b1_s1 10.0.1.1:1111 check server b1_s2 10.0.1.2:1111 check # KC backend #2 server b2_s1 10.0.2.1:1111 check server b2_s2 10.0.2.2:1111 check
Keepalived:
virtual_server 10.0.0.1 1111 { protocol UDP lb_algo rr lb_kind NAT !KC backend #1 real_server 10.0.1.1 5001 { weight 1 TCP_CHECK { connect_timeout 3 connect_port 4444 } } real_server 10.0.1.2 5001 { weight 1 TCP_CHECK { connect_timeout 3 connect_port 4444 } } !KC backend #2 real_server 10.0.2.1 5001 { weight 1 TCP_CHECK { connect_timeout 3 connect_port 4444 } } real_server 10.0.2.2 5001 { weight 1 TCP_CHECK { connect_timeout 3 connect_port 4444 } }
Internally, the connector uses an in-memory queue. Depending on the size of the messages that the connector receives, you may want to adjust the capacity to avoid large memory consumption. The following parameter can be used to change queue capacity:
connector.queue.capacity=1000
The connector also supports custom queue implementation:
connector.queue.class=<custom implementation>
Custom queues must implement a BlockingQueue interface. Configurable can also be implemented for custom queue configurations.
The connector is based on Netty project. If you are familiar with the Netty event model, you can customize the connector accordingly. TCP and UDP protocols have different default channel pipeline factories implemented. The following channel handlers are available in the default implementations:
Handler name | Default implementation | Description |
framer | TCP: DelimeterOrMaxLengthFrameDecoder | Decodes the received ChannelBuffers into meaningful frame objects. |
decoder | StringDecoder | Decodes a received ChannelBuffer into a String |
recordHandler | StringRecordHandler | Produces a Kafka record from decoded data |
Two options are available for customizing the connector: configure or replace default pipeline factories.
Pre-configured handlers can be changed or removed with configuration options. See the example below, which removes the decoder handler and replaces framer with custom implementation:
pipeline.factory.handlers=framer, decoder pipeline.factory.handlers.framer.class=com.mckesson.kafka.connect.nettysource.SimpleDelimeterFrameDecoder pipeline.factory.handlers.framer.maxFrameLength=8192 pipeline.factory.handlers.decoder.class=
If changing handlers of the default pipeline factory is not enough, you can replace whole with a custom implementation. This allows you to implement your own application protocols. The following example supports the HTTP protocol using the connector:
pipeline.factory.class=com.mckesson.kafka.connect.nettysource.HttpPipelineFactory pipeline.factory.handlers=recordHandler pipeline.factory.handlers.recordHandler.class=com.mckesson.kafka.connect.nettysource.HttpRequestContentRecordHandler
As you can see, the Netty Source Connector is a very powerful connector that allows you to receive different data from the network. For additional information, you can refer to the documentation.
The Netty Source Connector was made to receive data from a remote system, but the other way to collect data is to pull it from a remote system. This is where the PollableAPIClient Source Connector comes in. The purpose of this connector is to pull data from different remote APIs and services. What sets it apart is that it makes integration with new APIs quick and easy even for developers who have never worked with the Kafka Connect API before.
This connector is built on the concept of separating data ingestion from all the Kafka Connect API details. It allows you to plug in different data API implementations. Typically, the connector configuration looks like this:
connector.class=com.mckesson.kafka.connect.source.PollableAPIClientSourceConnector topic=api_data apiclient.class=com.mckesson.kafka.connect.source.PollableAPIClient poll.interval = 30000
Here, apiclient.class is a class that implements PollableAPIClient interface. Implementations for different public APIs can be found in the documentation. You can configure the frequency at which the remote API will be polled in two ways: via poll.interval or poll.cron.
The Kafka Connect API provides you with the ability to ”save” state between polls. Even after restarts, it will “pick up” state. Sometimes it may be required to (re)start data collection from a specific point. initial.offset is an additional configuration option available that allows you to set the starting point for your connector, as well as the value for this parameter (which can vary for different API client implementations). In some cases, you may need to reset the connector and force it to “forget” state. The reset.offsets=true configuration option allows you to do so. You can find more configuration parameters in the connector documentation.
Now let’s talk about how to implement a custom client for the connector. There are two concepts around the Kafka Connect API to note:
To implement a custom client, you need to implement the interface. Let’s take a closer look:
public interface PollableAPIClient extends Configurable, Closeable {
public List poll(String topic, Map<String, Object> partition, Map<String, Object> offset, int itemsToPoll, AtomicBoolean stop) throws APIClientException;
public List<Map<String, Object>> partitions() throws APIClientException;
public Map<String, Object> initialOffset(Map<String, Object> partition) throws APIClientException;
public void close(); }
The first step is to define how data will be partitioned. Here is an example:
public List<Map<String, Object>> partitions() { List<Map<String, Object>> partitions = new ArrayList<>(2); for (String item : Arrays.asList("directoryAudits","signIns")) { partitions.add(Collections.singletonMap(PARTITION_NAME_KEY, item)); } return partitions; }
The next step is to define the offset (starting point) for each partition:
public Map<String, Object> initialOffset(Map<String, Object> partition) { return new HashMap<String, Object>() { { put(OFFSET_KEY_TIME, System.currentTimeMillis()); } }; }
Finally, define the method that polls the data. Usually this consists of three steps:
public List<SourceRecord> poll(String topic, Map<String, Object> partition, Map<String, Object> offset, int itemsToPoll, AtomicBoolean stop) throws APIClientException { //1 long startTs = offset.get(OFFSET_KEY_TIME); //2 List<SourceRecord> records = ... long endOffset = readData(partition, startTs, records); //3 offset.put(OFFSET_KEY_TIME, endOffset);return records; }
As you can see, building custom API clients is really easy. For more details and examples, please see the documentation.
Collecting data is just the first step, but it’s more important to bring data to the right place in the right format. This brings us to the transformations library, by which we perform three tasks:
Let’s take a look at how Kafka Connect transformations can help.
To sort out the data, we need to “tag” data first. Kafka record headers are a good way to go. We can rely on existing headers or add additional ones. The first transformation is AddHeader, which allows you to add headers to a Kafka Connect record. It’s as easy as:
transforms=add_src_hdr transforms.add_src_hdr.type=com.mckesson.kafka.connect.transform.AddHeader transforms.add_src_hdr.name=data.source transforms.add_src_hdr.value=mySource
This snippet adds a Kafka record header with the name data.source and the value mySource. There are two features that connectors support: conditions and expressions, which are used widely. Now, let’s take a look at them up close.
Sometimes different transformations need to be applied to different types of data. Many transformations support conditions in order to apply or skip transformation. You can use the following configurations to define a condition:
if | Depending on if_mode, it can be either String or a valid regular expression value |
if_mode | find: Find the subsequence match: Match the entire string against the pattern eq: Check the String for equality contain: Find the substring without a regular expression in: Check if one value is from a static list of strings |
The example below will add a header if the record value contains a kafka substring:
transforms=add_src_hdr transforms.add_src_hdr.type=com.mckesson.kafka.connect.transform.AddHeader transforms.add_src_hdr.condition.if=kafka transforms.add_src_hdr.condition.if_mode=find
ℹ️ | Note: Since Kafka 2.6, predicates have been added to the Kafka Connect API. We plan to implement conditions as predicates in the future. |
Simple expressions can be used to address different parts of a Kafka record.
To use these expressions, they must be framed in ${…}, for example:
transforms.add_src_hdr.value=src:${HEADER.remoteAddr}
Three additional transformers are available:
RegexFilter | Filter messages with regular expressions |
RegexRouter | Route a message to a different topic based on regular expressions |
RegexRules | Apply regular-expression-based transformation rules |
The simple data masking transformation example below can give you an idea of how to use transformations:
transforms=data_mask transforms.data_mask.type=com.mckesson.kafka.connect.transform.RegexRules transforms.data_mask.applyTo=VALUEtransforms.data_mask.rules=cc16,ssnus
transforms.data_mask.rules.cc16.find=\b\d{4}(-?\s?)\d{4}(-?\s?)\d{4}(-?\s?)\d{4}\b transforms.data_mask.rules.cc16.replacement=XXXX$1XXXX$2XXXX$3XXXX
transforms.data_mask.rules.ssnus.find=\b[0-9]{3}([\s-])[0-9]{2}([\s-])[0-9]{4}\b transforms.data_mask.rules.ssnus.replacement=SSS$1SS$2SSSS
More transformation configurations and documentation can be found on GitHub.
Collecting data can be a difficult task, especially if your data is in various forms across different places and you need to run it at scale. Kafka Connect is a brilliant solution for bringing your data into Kafka for further processing and analytics. The connectors and approaches described in this blog post will help you collect data with less effort. You can easily adopt the connectors to your needs both for “push” and “pull” data types. This is ultimately what makes for a rich data collecting ecosystem.
If you enjoyed this article and would like to learn more, watch my Kafka Summit talk: Feed Your SIEM Smart with Kafka Connect.
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.
In this third installment of a blog series examining Kafka Producer and Consumer Internals, we switch our attention to Kafka consumer clients, examining how consumers interact with brokers, coordinate their partitions, and send requests to read data from Kafka topics.