Développez l'apprentissage automatique prédictif avec Flink | Atelier du 18 déc. | S'inscrire

Building a Full-Stack Application With Kafka and Node.js

Écrit par

A well-known debate: tabs or spaces? Sure, we could set up a Google Form to collect this data, but where’s the fun in that? Let’s settle the debate, Kafka-style. We’ll use the new confluent-kafka-javascript client (not in general availability yet) to build an app that produces the current state of the vote counts to a Kafka topic and consumes from that same topic to surface them to a JavaScript frontend. 

Why are we using this client in particular? It comes from Confluent and is intended for use with Apache Kafka® and Confluent Platform. It’s compatible with Confluent’s cloud offering as well. It builds on concepts from the two most popular Kafka JavaScript client libraries: KafkaJS and node-rdkafka. The functionality is based on node-rdkafka, however, it also provides a way to interface with the library via methods similar to those in KafkaJS due to their developer-friendy nature. There are two APIs: the first implements the functionality based on node-rdkafka; the second is a promisified API with the methods akin to those in KafkaJS. By choosing this client, we can access wide functionality and have a smooth developer experience via the dev-friendly methods.

If you’re migrating from either of these two APIs, you can find the migration guide here.

Producing the events

We use a version of the API that is convertible from node-rdkafka on the producing side. All of the code in this section is written in producer.js. You’ll get an opportunity to see how we can use the promisified API when we create the code for consuming the events.

First, you need to get set up in Confluent Cloud. There are detailed steps in the GitHub repository README which holds the code in its entirety.

Here’s the basic shape of one of our Kafka messages:

{
  "question-1": {
    "Tabs": 19,
    "Spaces": 0,
    "lastClicked": false
  },
  "question-2": {
    "merge": 6,
    "rebase": 148,
    "lastClicked": false
  },
  "question-3": {
    "debugger": 5,
    "print": 2,
    "lastClicked": false
  }
}

It’s truncated (there are seven questions total) but it gives you the idea. Each question object holds the number of votes for each option, as well as a “lastClicked” boolean to keep track of the interface state.

Now, we’re not using stream processing this time around (we’ll leave that as an exercise for you, dear reader), instead we update the state in the backend here, and consume the last message in the topic each time before changing it to get the last message. We want to be able to conditionally start from the first offset, or perhaps in the future, and re-consume the last committed message if we have to restart, so here we use manual commits.

   consumer.on("ready", function (arg) {
    // Assuming partition 0, adjust as necessary 
    const topicPartitions = [{ topic: "total_count", partition: 0 }];
    consumer.committed(topicPartitions, 5000, (err, topicPartitions) => {
      if (err) {
        console.error("Error fetching committed offsets:", err);
        return;
      }
[1]   const { offset } = topicPartitions[0];  
[2]    		     if (offset === Kafka.CODES.RD_KAFKA_OFFSET_INVALID) { 
     			          consumer.assign([{ topic: "total_count", partition: 0, offset: 1 }]);
       consumer.consume();
     } else {
        // Adjust the offset if needed. Here we start from the committed offset directly 
       consumer.assign([{ topic: "total_count", partition: 0, offset }]);
       consumer.consume();
     }
   });
  });

Here you can see that by using the API which avails itself to methods similar to those in rd-kafka, we can obtain the offset [1] via the consumer.committed method. Then, we [2] check to see if it’s an invalid offset. If so, we restart at the beginning. Otherwise, we assign the consumer its topic, partition, and offset and run consumer.consume() to start the consumption process.

   consumer.on("data", function (message) {
[1]  decoded = JSON.parse(message.value.toString()); 
    // Manually commit offset after processing 

    // Here, you might adjust to commit the current message's offset - 1 if you want to re-consume the last message upon restart 
     const commitOffset = message.offset;

[2]  consumer.commit({ 
       topic: message.topic,
       partition: message.partition,
       offset: commitOffset,
     });
   });
   consumer.on("offset.commit", function () {
    console.log("committed");
   });

Here you can see that we’re [2] manually committing as well as updating a decoded variable [1]. 

It’s that variable that we will update and produce to our topic in our producer code:

   async function sendMessage(data) {
[1] let question_id_string = `${data.data.question_id}`; 
[1] let vote = data.data.vote; 
[2] decoded[question_id_string][vote] = decoded[question_id_string][vote] + 1; 
[3] Object.entries(decoded).forEach( 
     (voteObj) => (voteObj[1].lastClicked = false),
    );

[4] decoded[question_id_string]["lastClicked"] = true; 

[5] producer.produce("total_count", 0, Buffer.from(JSON.stringify(decoded))); 

    producer.on("event.error", (err) => {
      console.error(err);
    });

    producer.on("disconnected", function (arg) {
      console.log("producer disconnected. " + JSON.stringify(arg));
    });
   }

First [1], we retrieve the vote and count from the user interface, then [2] we update the count. Next [3], we clear the lastClicked state before [4] updating it for our specific vote. Last [5], we produce the message containing the updated state to the topic.

[1] app.post("/send-to-kafka-topic", async function (req, res, next) { 
     res.header("Access-Control-Allow-Origin", "*");
     res.header("Access-Control-Allow-Methods", "PUT, GET, POST, DELETE, OPTIONS");
     res.header("Access-Control-Allow-Headers", "Content-Type");
[2]  var data = await req.body; 
     res.json(data);
     producer.poll(100);
     sendMessage(data).catch(console.error);
     next;
    });

[1] This logic is called in an express route, which will pass information from the user interface to the producer via the [2] request body.

Consuming the events

For the consumer side, we will use the promisified API. Important: this is not the same API the example uses in the above section.

If you’re migrating from node-rdkafka, you import like so:

const { Kafka } = require("@confluentinc/kafka-javascript")

However, if you’re using the promisified API, like in the following code snippets we’re about to highlight, you import Kafka in the following manner:

const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;

This code, along with its context, lies in the file consumer.js.

   async function consumerStart() {
[1] await consumer.connect(); 
[2] await consumer.subscribe({ topic: "total_count" }); 
[3] await consumer.run({ 
      eachMessage: async ({ topic, partition, message }) => {
        total_count = JSON.parse(message.value.toString());
        const messageFiltered = Object.entries(total_count).filter(
[4]      (vote) => vote[1].lastClicked === true, 
      );
[5]     question_id = messageFiltered[0][0]; 
[5]     count = messageFiltered[0][1];       
[6]     io.sockets.emit("event", { 
          message: { question_id, count },
        });
      },
    });
   }

We [1] connect to the consumer, [2] subscribe to the topic, [3] run the consumer and manage each message coming in from the topic, [4] find the last clicked message, and [5] extract the question ID and vote count. [6] These pieces of information are emitted to our websocket.

  // Set up signals for a graceful shutdown.
  const disconnect = () => {
    process.off('SIGINT', disconnect);
    process.off('SIGTERM', disconnect);
    [2] consumer.commitOffsets()
      .finally(() =>
        [3] consumer.disconnect()
      )
      .finally(() =>
        console.log("Disconnected successfully")
      );
  }
 [1] process.on('SIGINT', disconnect);
 [1]  process.on('SIGTERM', disconnect);

The code above demonstrates a very important piece of logic for consumer management. We [1] call the disconnect method when exiting the terminal. The code within the disconnect method ensures that the consumer [2] commits the offset from the message in the topic before [3] disconnecting. That way the consumer can properly pick up from where it left off.

Showing the events in the user interface

How do we get these events to surface to the user interface in a readable manner? For the frontend, we use mostly vanilla JavaScript and CSS. That way, if someone wants to use this project as an example but they’re using a frontend framework in JavaScript, they can replicate the logic easily no matter what framework they’re using.

We use a bit of jQuery to post the vote information to our route which we saw in producer.js:

 $.post(
         `https://lb.lets-settle-this.com:3000/send-to-kafka-topic`,
            {
              data: {
               [1] vote: vote,
               [2] question_id: VoteBtns[i].parentElement.id,
               [3] ts: timestamp,
              },
            },
            function (data) {
              console.log("data being produced by click", data);
            },
          );

On a user’s click, this sends the [1] vote object, the [2] question ID (gleaned from the interface’s element ID), and the [3] timestamp to the send-to-kafka-topic route for the Kafka producer to send to the topic. The timestamp is present should we ever want to revisit this project and, say, perform some windowing on the data stream.

      [1] socket.on("event", function (message) {

        [2] progress_bar_id = `${message.message.question_id}`;

        [3] let progressBar = document.getElementById(
          `${progress_bar_id}-progress`,
        );

        [4] let labelForBar = document.getElementById(`${progress_bar_id}-label`);
         let max = Object.values(message.message.count)[0] + Object.values(message.message.count)[1];
        //if length is one then ignore as result of retraction in potential FlinkSQL developments in later versions
        if (Object.keys(message.message.count).length == 1) {
          //don't do anything
       [5] } else if (
          Object.values(message.message.count)[0] >
          Object.values(message.message.count)[1]
        ) {
          value = Object.values(message.message.count)[0];
         [a] labelForBar.innerHTML = `${Object.keys(message.message.count)[0]} wins  with ${Object.values(message.message.count)[0]} votes out of ${max}`;
        [6] } else if (
          Object.values(message.message.count)[0] ===
          Object.values(message.message.count)[1]
        ) {
          value = Object.values(message.message.count)[0];
         [a] labelForBar.innerHTML = `It's a tie! There were ${max} total votes`;
       [7] } else {
          value = Object.values(message.message.count)[1];
         [a] labelForBar.innerHTML = `${Object.keys(message.message.count)[1]} wins  with ${Object.values(message.message.count)[1]} votes out of ${max}`;
        }
       [a] progressBar.setAttribute("value", value);
       [a] progressBar.setAttribute("max", max);
        if (clicked === true)
       { showProgressBar(progress_bar_id);
        clicked = false;
       }
      });

Here in the frontend, we write some logic that triggers when the [1] socket receives an event. [2] Then we create a variable for the question_id, and [3] retrieve the DOM element corresponding to that id. [4] We retrieve the label for that bar as well, which we'll [a] update in the following chain of logic. Then, we check if the [5] left-hand option has more votes, [6] if there's been a tie, or if in the remaining case, [7], the right-hand side has more votes. [a] We then update the DOM accordingly.

Side note: you might wonder where it’s all deployed. We deployed the frontend on Amplify, and the backend (the consumer.js and producer.js files) on EC2. They’re connected via a load balancer.

Where can you go from here? There are lots of opportunities to grow this project. For example, you could implement a visualization: X amount of devs who voted for tabs  also voted for spaces. Or you could fingerprint the user to prevent them from cheating (don’t worry, we haven’t done this, we trust you not to vote multiple times). A third possibility is to create a mechanism for when a user wants to change their vote. 

If you’re interested in learning more about Kafka or confluent-kafka-javascript, we recommend these resources:

Avez-vous aimé cet article de blog ? Partagez-le !