Développez l'apprentissage automatique prédictif avec Flink | Atelier du 18 déc. | S'inscrire
In part one of this series, we walked through how to use Streamlit, Apache Kafka®, and Apache Flink® to create a live data-driven user interface for a market data application to select a stock (e.g., SPY) and discussed the structure of the app at a high level. First, data with information on stock bid prices is moved via an Alpaca websocket, then, it’s produced to a Kafka topic in Confluent Cloud where it is also processed with Flink SQL.
Now comes the tricky part: running the Kafka consumer and producer in the same application.
We will use async.io to help manage multiple threads in our Streamlit application. One of async.io’s functions is to allow developers to run multiple coroutines concurrently. A coroutine is a type of subroutine that can be entered, accessed, and resumed at many different points. A subroutine is a function or a block of code that you can call.
This is what is needed to run multiple coroutines concurrently: one coroutine implements a Kafka producer, and the other implements a Kafka consumer.
The producer needs to produce data to Confluent Cloud at the same time that the consumer reads from it. Before my co-worker, Gilles Philippart, introduced async.io to my application, I encountered an issue: when I ran a consumer, the producer wouldn’t work. This was the behavior of the single-threaded application.
How does async.io help run multiple threads?
By virtue of its event loop, or scheduler, async.io can help you run these threads. For example, say a developer writes a piece of Python code like the following:
Because of the async
declaration, the code block is a coroutine.
I wrote two top-level coroutines: one named on_select
in kafkaproducer.py, containing subscribe_quotes, which calls back to fn, which in turn calls back to quote_data_handler
, which produces data:
And the other named display_quotes
in app.py, which starts the consumer:
Both of these coroutines are scheduled together, using the async.gather()
method:
As the event loop runs, it schedules the coroutines to operate concurrently. Note the order of the coroutines here: the order of their results will arrive in the same order as they are listed in the .gather
method, although it is not guaranteed that they’ll be executed in this order.
There’s another place in the code where async.io is used, right after polling the consumer:
asyncio.sleep()
is not the same as time.sleep()
. While time.sleep()
pauses an entire application, asyncio.sleep
pauses only the coroutine while the rest of the application runs.
You can try out .sleep()
for yourself using the Python REPL:
Why is this needed after polling the consumer? consumer.poll()
specifies the number of milliseconds the records spend waiting—by default, it sends them immediately, although it was set to .1 milliseconds. In an async.io program, multiple coroutines are executed by the event loop. asyncio.sleep() allows other coroutines to run while one coroutine is paused, facilitating cooperative multitasking.
There was another issue while running Flink SQL. The original query was written like this:
In using Flink SQL in Confluent Cloud to create tables, the corresponding topic here would be created with six partitions by default. Normally, you’d want to take advantage of multiple partitions, but this simplified demo only required one.
You can create the Flink table with one partition like so (distribution into a single bucket ensures the Kafka topic has a single partition), and then just subscribe to the topic. You can use this same syntax to create a topic with more partitions for production use cases.
Streamlit has a large array of frontend components in its library. In order to use them, you must import Streamlit after installation:
And then use call methods to create the components. Here’s what I wrote to title my page:
In order to create a stock visualization, st.altair_chart was used, which displays a chart using the Altair library, a declarative visualization library for Python. The basis of the Altair chart is a pandas DataFrame, a tabular data structure from the pandas library with rows and columns. Here’s the code that declares the chart before it is passed to Altair:
There are a few different things going on here. The .mark_line()
method ensures that the end result will be a line chart. The .encode
method sets the x and y axes. Here, the price is determined by the variables domain_start
and domain_end
, which is calculated as the largest and smallest price in the price history. Next, .transform_window
introduces a sliding window (note: the window used in Flink SQL is tumbling, but visually, this is going to be sliding). This sorts and ranks the objects in descending order by window end. Last, .transform_filter
makes sure there are only 20 items in the window at a time.
Once all of this was running locally, I was ready to deploy my project to the outside world. You can view it here! Deploying this app was not a complex process, however, there were a couple gotchas in the development process. All I had to do was point Streamlit to my GitHub repository, but if I changed the name of the repository, or changed the visibility, the deployment no longer worked.
I stored my keys in an environment variable. For deployment, I moved them. Locally, they are kept in a .streamlit/secrets.toml
file, and in deployment, they’re stored in a settings/secrets
file that I set up during the deployment process.
Upon loading the webpage and selecting the stockname, the code for the app runs.
This means that the websocket gets a new connection. I used the Alpaca API free account so my app wouldn’t work with more than one connection. That’s why I created the website as a simulation rather than a live application. I captured some data from the behavior of SPY in a Kafka topic, then assigned the consumer to start at a specific offset.
Writing this demo was both a challenging and rewarding experience because it pushed my understanding of Flink SQL, Python, and data visualization. In reflecting on some of the trickiest parts, a few stand out. The first was my misunderstanding of Flink tables—thinking that they provide a description of how to view data stored in a Kafka topic but instead unlocked the basic functionality of the application. The second was using the async.io library. It was a challenge to figure out which pieces of code were fundamentally on different threads. I’m really glad I worked it out, though, because I’m certain I’ll continue to use the library in future Python apps that use Kafka.
There’s no better way to learn technology than to get your hands dirty and build something. Streamlit is available to you whenever you want to quickly build and deploy a Confluent Cloud-powered data visualization application. You can also experiment with threads. If you want to learn data streaming by building more projects yourself, here are some recommended resources:
GitHub README: see this project in its context and run it for yourself!
The website where this simulation is deployed
The Confluent Developer Demos page: build more data streaming projects like this one
Vega-Altair: data visualization with Python
Tutorials: learn how to use Flink SQL for your use case
Confluent Cloud signup
In part 1 of this series, we’ll make an app, powered by Kafka and FlinkSQL in Confluent Cloud and visualized with Streamlit, that allows a user to select a stock, in this case SPY, or the SPDR S&P 500 ETF Trust. Upon selection, a live chart of the stock’s bid prices, calculated every five seconds...
Learn why stream processing is such a critical component of the data streaming stack, why developers are choosing Apache Flink as their stream processing framework of choice, and how to use Flink with Kafka.