[Webinar] Build Your GenAI Stack with Confluent and AWS | Register Now
In Data Science projects, we distinguish between descriptive analytics and statistical models running in production. Overall, these can be seen as one process. You start with analyzing historical data to gain insights, find correlations, and finally develop and optimize your model. Then you transfer it and use it in your running system. A key point for every data scientist is not just the mathematical skills themselves, but also how to get the data into your analytics program.
In this blog post, we focus exactly on this crucial step: retrieving the data. In a second article, we’ll talk about running your model on real-time data.
Python, with its Jupyter Notebooks, is commonly used for descriptive analytics. However, the statistical software R also provides deep statistical libraries, and it is my personal first choice when analyzing data. In this tutorial, I’ll explain two ways to create data pipelines from Apache Kafka® into RStudio.
In one method, we use MongoDB as a layer in between, and then we use the R package mongolite to request the data. Using the other method, we consume the data directly using the rkafka package. We also highlight the advantages and drawbacks of each approach (with MongoDB and without MongoDB).
On GitHub, you can find all code for the Mongo DB and rkafka pipelines.
Prerequisites: docker, docker-compose, (MongoDB Compass)
For configurations, we focus on simplicity so that our settings here can be used as a baseline for similar projects. For example, the Kafka topic is in JSON value format so that we do not need a schema registry. Our starting position is a simple Kafka producer producing data every two seconds of a truck driving from Hamburg to Munich in Germany. One example event looks like this:
{ “latitude”: 53.56067, “longitude”: 9.9908, “speed_kmh”: 105.564, “timestamp”: “2021-05-23T11:39:00.000Z” }
With docker-compose up -d, we start all containers:
For with MongoDB we also start:
For without MongoDB we also start:
The Kafka Producer is written in Kotlin using Kafka Streams (version 2.7.0) and has the following properties:
KEY_SERIALIZER_CLASS_CONFIG: “org.apache.kafka.common.serialization.StringSerializer”, VALUE_SERIALIZER_CLASS_CONFIG: “org.apache.kafka.common.serialization.StringSerializer”, BOOTSTRAP_SERVER_CONFIG: “broker:29092”
We define how to serialize our key and value as well as setting the broker address. Notice that we use the external port of the broker, instead of the internal one. The article Running Kafka in Docker Machine, by Marcelo Hossomi, explains the reason very well—I wish I would have read it before making the same mistakes.
We can verify that data is correctly produced into the Kafka topic truck-topic by running the commands docker-compose exec broker bash and kafka-console-consumer --bootstrap-server broker:9092 --topic truck-topic.
When using the pipeline option with MongoDB, we need to configure a MongoDB Sink Connector and start it. Our connector has the following configurations:
{ "name": "TestData", "config": { "name": "TestData", "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "topics": "truck-topic", "consumer.override.auto.offset.reset": "earliest", "connection.uri": "mongodb://user:password@mongo:27017/admin", "database": "TruckData", "collection": "truck_1" } }
This takes the truck-topic from the beginning and stores it in the MongoDB database TruckData in the truck_1 collection. When working with the Avro format, you can find the additional configurations in the MongoDB documentation. We then start the connector via the command curl -X POST -H "Content-Type: application/json" --data @MongoDBConnector.json http://localhost:8083/connectors | jq, and verify that the connector is running using the command curl localhost:8083/connectors/TestData/status | jq.
Now, we start MongoDB Compass and create a new connection with username: user, password: password, authentication database: admin, or directly using URI: mongodb://user:password@localhost:27017/admin. We see the data in the TruckData database in the truck_1 collection.
Moreover, we can apply aggregations and export those for other programs. Here, we filter the data to have a timestamp greater than or equal to 2021-05-23. Unfortunately, it is not possible to export the query for R, but we’ll see later on how we can adapt it so that it is executable.
To start the descriptive analysis, we now need to request the data, which is stored in our Kafka topic truck-topic (as well as in MongoDB). We can start RStudio on localhost:8787 with user: user and password: password. Under the /home directory, we find our corresponding R files.
We create a connection to MongoDB and then request all of the data with connection$find(). We also can use the aggregation pipeline defined in MongoDB Compass. To do so, we replace all ‘ (single quotes) with ‘’ (double quotes) and then paste the query into connection$aggregate(‘ ’).
We want to be able to read the data multiple times, so we first create a simple Consumer with some configurations:
kafkaServerURL: “broker”, kafkaServerPort: “9092”, connectionTimeOut: “10000”, kafkaProducerBufferSize: “100000”, clientId: “truck-topic”
We then iterate over the offset from partition 0, consuming the data from the beginning and creating a data frame out of it. To convert the JSON string, we use the jsonlite package.
There are pros and cons for both styles of pipeline. The most straightforward way is to use rkafka and consume the data from the topic itself. By iterating over the offset, we can define how large our data frame maximal will be and specify that we want to consume from the beginning (offset = 0). However, we cannot directly query the data, such as for a specific time interval or the total number of events. Moreover, we created the pipeline locally without configuring any authentication. Reading the package documentation, I could not find how to deal with this issue.
Using MongoDB as a layer in between results in a more complicated setup. We need to start Kafka Connect, add the connector as a plugin, configure it, and start it. However, we can directly see in MongoDB Compass how large the data set is, we can do aggregations on the data using the aggregation pipeline, and we can convert those when requesting the data in RStudio.
In the end, the best approach depends on the circumstances of the project, as well as personal preferences. We now have the data as a data frame in RStudio, and we can start our actual work: analyzing it.
In this tutorial, we implemented two methods of transferring historical data from Apache Kafka into R. In the future, we will take a look at using our defined models on real-time data. If you have problems when executing the tutorial or have any questions, feel free to reach out in the Community Forum or Slack.
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.
In this third installment of a blog series examining Kafka Producer and Consumer Internals, we switch our attention to Kafka consumer clients, examining how consumers interact with brokers, coordinate their partitions, and send requests to read data from Kafka topics.