Développez l'apprentissage automatique prédictif avec Flink | Atelier du 18 déc. | S'inscrire

How to Process GitHub Data with Kafka Streams

Écrit par

Have you ever wondered how to track events in a large codebase? I gave it a shot using Apache Kafka®! Read on to learn how to use GitHub data as a source, process it using a Kafka Streams topology, and send it to a Kafka topic.

Why use GitHub data? 

GitHub’s data sources (REST + GraphQL APIs) are not only developer-friendly, but a goldmine of interesting statistics on the health of developer communities. Companies like OpenSauced, linearb, and TideLift can measure the impact of developers and their projects using statistics gleaned from GitHub’s APIs. The results of GitHub analysis can change both day-to-day and over time. 

Apache Kafka is a large and active open source project with nearly a million lines of code. It also happens to be an event streaming platform. So why not use Apache Kafka to, well, monitor itself? And learn a bit about Kafka Streams along the way?  

How would we get started? Confluent has a GitHub source connector which approximates real-time event streaming by making up to 5,000 requests to the API per hour. We can use that to get GitHub events (such as pull requests) into a Kafka topic. But then what? That’s when the power of Kafka Streams and its Processor API comes in! Kafka Streams is the native stream processing library that runs as a client, separate from the Kafka broker. (To process live data, we could also use Apache Flink®—more on using Flink in a future demo!) We’ll use it to analyze the data and obtain an up-to-date ratio of open pull requests to closed pull requests. This could tell us a couple of things about the Apache Kafka project: how many PRs are being opened by contributors, and maintainers’ ability to close them. 

The structure of the project

The structure of the project will look like this: 

  1. Implementing the GitHub source connector 

  2. Obtaining the pull request events from that connector, and 

  3. Processing those events and putting them into a new topic, “state”. 

Adding a source connector

Data pipelines have both sources—the GitHub connector in the diagram above—and sinks, like the state topic. A source is like a kitchen faucet, it’s where you get the water, or data, from. A sink, on the other hand, is like, well, a kitchen sink! It holds the data flowing from the source. If you want to learn more about data pipelines, sources, and sinks, you can view this video by Wade Waldron. It’s a module from the Building Flink Apps in Java course, but much of what is said applies to Kafka and data pipelines in general. 

Our source is the Confluent GitHub source connector. If you’d like to set one up for yourself, follow the instructions in this demo. At a high level, note that we will be pulling from the github-pull_requests resource in the GitHub source connector. This resource grants us a lot of information about GitHub pull requests, including their status: open or closed. 

Code structure

What is a topology?

 Now, we can connect Kafka Streams to the events flowing in from our source connector and manipulate these events using streams to get the information we want. The final form of the information will determine the shape of our topology. 

Confluent’s Kafka Streams docs define a stream processing topology:

“A processor topology or simply topology defines the computational logic of the data processing that needs to be performed by a stream processing application.”

Basically, a topology is a graph of what happens to your data. Here’s what our nodes look like (the arrows represent streams of data). There’s a GitHub source processor, a stream processor performing the analysis, and a sink processor sending the analyzed data to a new topic. 

This topology is defined on line 68 of GitHubPrRatio.java. The code on line 84 holds the pertinent methods stringing the topology together:

       builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), jsonNodeSerde))
               .peek((key, value) -> LOG.info("Incoming key[{}] value[{}]", key, value))
               .mapValues(valueMapper)
               .process(new MyProcessorSupplier())
               .peek((key, value) -> LOG.info("Outgoing value key[{}] value[{}]", key, value))
               .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), prStateCounterSerde));

The .stream method here takes in the input topic from the GitHub source connector. The .peek method isn’t something we’d leave in for a production-level app, but it allows us to see the incoming keys and values when we’re running the demo. Next, the values are mapped to a JSON object with .mapValues. Then, they’re processed and we use another .peek method to see the outgoing keys and values, and then a .to method sends them to our output topic. 

In defining this topology, we’ll have a myProcessorSupplier class providing the Processor instance in the .process method above. It will filter out the open and closed pull requests and create a count of each, to finally output a ratio of open/closed pull requests.

On line 102 in GitHubPrRatio.java, this MyProcessorSupplier class provides a processor to the main class which ingests the github-pull_request stream in order to pull out the open and closed states.

static class MyProcessorSupplier implements ProcessorSupplier<String, GitHubPRInfo, String, GitHubPRStateCounter> {

On line 130 inside the MyProcessorSupplier class, we've got a state store established, which will hold the current state of the open/closed pull request ratio. The init method on line 112 schedules a punctuation to fire every second, printing the ratio in the store. The process method on line 123 will take in the events, mark them as open or closed and increment the count, and stash them in the state store.

The process method on line 123 will take in the events, mark them as open or closed and increment the count, and stash them in the state store.

public void process(final Record<String, GitHubPRInfo> record) {

                   GitHubPRInfo prInfo = record.value();
                   if(!prInfo.state().equals(NO_STATE)) {
                       GitHubPRStateCounter stateCounter = kvStore.get(STORE_KEY);
                       if (stateCounter == null) {
                           stateCounter = new GitHubPRStateCounter();
                       }
                       if (prInfo.state().equalsIgnoreCase("open")) {
                           stateCounter.setOpen(stateCounter.getOpen() + 1);
                       } else {
          stateCounter.setClosed(stateCounter.getClosed() + 1);
                       }
                       kvStore.put(STORE_KEY, stateCounter);
                   }
               }
           };
       }

The Processor API: A closer look

There are two APIs for interacting with Kafka Streams: the more abstracted one is the Streams DSL (Domain Specific Language), which is built on top of the lower-level Processor API. In this case, we want to implement a state store, because calculating a final count of pull requests will mean that we need to involve state—the processing of each record is dependent on how the former records are processed. 

Note: Not all Kafka Streams topologies need to involve state. For example, if all the topology is doing is filtering messages, then the record processing is independent of how the former records have been processed. 

The Streams DSL automatically creates state stores for you in operations like aggregate(), but the Processor API requires the developer to create the state store

       public Set<StoreBuilder<?>> stores() {
           return Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(STORE_NAME), Serdes.String(), prStateCounterSerde));
       }
   }

Once that’s done, we can access the state store when we initialize the processor using init

Note: The Processor API is closer to the metal, and like most lower-level APIs, provides the developer more control while necessitating the management of details like state stores.

The init method, defined here in the demo on line 112, is called when Kafka Streams starts up. We want our instance to include any necessary info for task construction. In this case, we provide the state store context, then forward the new key value pair to the downstream processors every second, using context.schedule. Note that we’d take out the line that prints to the console for a production app, but since the purpose of this application is to show how Kafka Streams works, we leave it in. 

               public void init(final ProcessorContext<String, GitHubPRStateCounter> context) {
                   this.kvStore = context.getStateStore(STORE_NAME);
                   context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
                       GitHubPRStateCounter entry = kvStore.get(STORE_KEY);
                       System.out.printf("Store value %s%n", entry);
                       context.forward(new Record<>("pr", entry, timestamp));

                   });
               }

Our process method on line 123 contains the logic used to count the open and closed PRs and create the ratio.

Activating the processor in the main method

To activate the processor, we declare a new instance of KafkaStreams in the main method.  We pass in the topology we’ve built, then run streams.start().

       final KafkaStreams streams = new KafkaStreams(gitHubPrRatio.topology(props), props);
       final CountDownLatch latch = new CountDownLatch(1);
       Runtime.getRuntime().addShutdownHook(new Thread(() -> {
           streams.close(Duration.ofSeconds(5));
           latch.countDown();
       }));
       try {
           streams.start();
           latch.await();
       } catch (final Throwable e) {
           e.printStackTrace();
           System.exit(1);
       }

Let’s talk about serialization

Kafka stores bytes. This makes it highly performant and has the side effect of enabling it to take in data in many different formats. But you’ll also need a serializer/deserializer every time you work with Kafka as a source or a sink. In the code we’ve featured so far, you’ll have noticed a custom serializer/deserialiser: `prStateCounterSerde`. It’s created on line 59:

  static final Serde<GitHubPRStateCounter> prStateCounterSerde = StreamsSerde.serdeFor(GitHubPRStateCounter.class);

This uses a custom Serde, defined in serde/StreamsSerde.java, for the pull request state counter. Do take a look at the file to learn how it’s been implemented. For more on serialization, view this video from Confluent Developer’s Kafka Streams course. 

Adding a sink

That concludes the Streams portion of the project, but I’d love to hear the ways you’d push it further! One way to extend it is by adding a sink. Just as a Kafka Streams topology can have a sink, so can your entire Kafka application via a sink connector which automatically exports results. Below, I’ve fed the state topic into Elasticsearch, which provides a cool graphical representation of the data via its visualize library: 

Where to go next 

I mentioned earlier that you can also process real-time data with Flink. I’ve got another sample repository in the works! In the meantime, here are some other resources to check out: 

Avez-vous aimé cet article de blog ? Partagez-le !