[Atelier] Le traitement des flux en toute simplicité avec Flink | S'inscrire
Amazon OpenSearch is a popular fully managed analytics engine that makes it easier for customers to do interactive log analytics, real-time application monitoring, and semantic and keyword searches. It can also be used as a vector engine that helps organizations build and augment GenAI applications without managing infrastructure (we’ll talk about this in future blogs). Additionally, the service provides a reliable, scalable infrastructure designed to handle massive data volumes.
Confluent provides a fully managed end-to-end data streaming platform. Until recently, our customers did not have an easy way to stream data from Confluent Cloud to OpenSearch. They had to either refactor the HTTP-Sink connector or deploy a Lambda function that reads from Confluent and writes it to OpenSearch.
Although both methods work fine, the customer would need to have deep expertise on how OpenSearch ingestion works. To address this, we worked closely with the Amazon OpenSearch team and our internal Connect team to enhance the integration experience and simplify data sinking from Confluent to OpenSearch.
Today, we are pleased to announce an easy way to sink data from Confluent to OpenSearch using a new fully managed OpenSearch Sink Connector. Besides Amazon OpenSearch, this connector supports other OpenSearch deployment options like self-managed or other third-party managed OpenSearch services.
This blog post delves into how the OpenSearch Sink Connector works, the configuration process, as well as the benefits and considerations it offers. But before we dive into the technical details, let’s take a step back and share why this is important to our customers.
Integrating Confluent with OpenSearch empowers a myriad of use cases. Here is a non-exhaustive list of some of the use cases that could be unlocked by marrying both platforms together:
Fraud detection and security analytics: This is one of the top use cases for customers using Confluent today. Confluent is used to ingest and aggregate security logs, network traffic data, and other security-related events from various sources. Customers can then index this data for real-time analysis and correlation. This enables you to detect security threats, identify suspicious patterns, and respond to security incidents promptly.
GenAI with retrieval-augmented generation (RAG): Organizations looking to use their internal knowledge base (e.g., customer service calls and application logs) to power GenAI applications can make use of this integration to vectorize this data while in motion and sink it OpenSearch vector storage. Doing so allows customers to make sure that their LLMs are always fed to the latest information and the model responses are the most accurate.
Real-time log analytics: Confluent can be used to aggregate log and event data from multiple sources such as applications, servers, databases, and devices. By integrating Confluent with OpenSearch, customers can index this data in real time for search, analysis, and visualization. This use case is valuable for monitoring system health, troubleshooting issues, and detecting anomalies.
Social media monitoring: Confluent is already used by hundreds of customers to collect social media streams from platforms like X (formerly Twitter), Facebook, and Instagram. By indexing this data to OpenSearch, customers can do real-time sentiment analysis, trend detection, and brand monitoring. This use case is valuable for marketing campaigns, reputation management, and customer feedback analysis.
The OpenSearch Sink Connector is a fully managed service provided by Confluent that enables you to stream data from Confluent Cloud to an OpenSearch index. This index could be on Amazon OpenSearch, a self-managed OpenSearch instance, or any third-party OpenSearch service. The connector supports Avro, JSON Schema, JSON (schemaless), and Protobuf data formats.
The connector reads data from multiple Confluent Cloud topics and uses HTTP POST to sink the data into one or more OpenSearch indexes. Currently, the connector allows up to five topic-to-index combinations per instance. This feature makes it cost-effective to send data from multiple topics to multiple indexes simultaneously. If you require more than five topic-to-index combinations, you can deploy additional connector instances to accommodate your needs.
The following are the options that could be used to map topics to indexes. We’re using Amazon OpenSearch for illustrative purposes, but the same could be applied to any OpenSearch deployment configured with Basic Authentication or without any authentication:
1:1 Topic to index mapping: This is the most common mapping, where data from each input topic in Confluent Cloud is sinked into a single corresponding OpenSearch index.
M:1 Topic to index mapping (consolidate): With this option, data from multiple (up to five) Confluent Cloud topics is consolidated into a single OpenSearch index.
1:M Topic to index mapping (fan-out): Although less common, this option allows the connector to sink data from a single topic into multiple (up to five) OpenSearch indexes.
N:M Topic to index mapping: You can also combine the above options, using a mix of 1:1, M:1, and 1:M mappings together within the limit of five topic-to-index combinations per connector instance.
The connector's flexibility in mapping topics to indexes allows you to optimize your data ingestion strategy based on your specific requirements and workloads.
The OpenSearch Sink Connector's scalability is achieved through Connector Tasks, which can be added for workloads with higher throughput. Despite this, a single task could be used with all the mappings mentioned above. You can scale the connector by adding more tasks using the console, CLI, API, or Terraform.
The connector's pricing is based on two factors: an hourly charge for each running task and the volume of data processed. By allowing you to configure multiple topic-to-index combinations within a single task, the connector offers cost savings as you don't need to run additional tasks and thus incur additional cost
Determining the optimal number of tasks depends on your specific use case and requirements. We recommend testing the connector with different task configurations to identify the setup that best suits your needs, balancing scalability and cost-effectiveness.
The following are a few things to consider before using the connector:
Amazon OpenSearch Serverless support: Currently, the connector does not support Amazon OpenSearch Serverless as a destination. This is because OpenSearch Serverless only accepts IAM or SAML identities for authentication, which the connector does not currently support. However, we are working on adding support for assuming IAM roles to our fully managed connectors this year. Once this feature is implemented, the connector will be able to assume a cross-account IAM role with permissions to access OpenSearch Serverless collection, enabling full support for OpenSearch Serverless as a destination.
OpenSearch Sink Connector with private networking
For OpenSearch Sink Connectors deployed with PrivateLink networking, there is currently a limitation in connecting to private OpenSearch clusters. This is because PrivateLink only allows one-way connectivity from the customer VPC to the service provider. To address this limitation, Confluent introduced AWS Egress Access Points, which enable outbound AWS PrivateLink connections to supported AWS services and other endpoint services powered by AWS PrivateLink, such as AWS S3, SaaS services, or custom PrivateLink services. However, this approach does not work with Amazon OpenSearch, as it uses service-managed PrivateLink instead of customer-managed PrivateLink. We are collaborating with AWS to add support for customer-managed PrivateLink for OpenSearch, allowing OpenSearch Sink Connectors deployed with PrivateLink networking to connect to private Amazon OpenSearch domains seamlessly.
OpenSearch Sink Connectors deployed with AWS Transit Gateway (TGW) or VPC peering do not share the same limitation mentioned above. If the Confluent Cloud cluster is deployed with either of these networking options (TGW or VPC peering), to use the connector with a private OpenSearch domain, ensure that the Confluent Cloud VPC can connect to the VPC containing the OpenSearch cluster, either via a direct peering connection or through the TGW.
Batch inserts: Currently, the connector does not support batch inserts and can only insert records one at a time. We are working to add batch insert functionality to cater to higher throughput use cases.
Now that we covered how the connector works, let’s put it to the test. In this setup, there are three topics that we want to sink into two different OpenSearch indexes. First, the ‘product’ topic holds information about different products, and we’ll sink data in this topic to the ‘product-index’. Second, we have the ‘chat_discussion’ and ‘chat_summary’ topics that hold information about customer chat interactions. We will sink data from these two topics to the ‘Chat-index’.
Confluent Cloud cluster
An OpenSearch cluster with Basic authentication. Here we use Amazon OpenSearch Service. If you are using Amazon OpenSearch, make sure the access policy allows connections from Confluent Cloud. The following is an example policy:
You can use the data access policy to limit access to Confluent Cloud public egress IP addresses.
You can deploy the connector using Console, CLI, or Terraform. Here we will use the Confluent console to deploy the connector.
In the Connector page, choose the Confluent Cloud environment and cluster that you want to tie the connector to.
Next, choose the three topics you want to sink.
For Kafka credentials, we chose one of the three options. In production, it is best practice to use Service accounts with Connectors. If the credentials are tied to a user account, and the user leaves and the user account is deleted, all API keys created with that user account are deleted and therefore break the connector.
In the OpenSearch Authentication page, enter the following:
OpenSearch Instance endpoint: The Amazon OpenSearch Domain endpoint.
Endpoint Authentication Type: Choose Basic.
Enter your OpenSearch username and password.
In the Connector configuration page, choose JSON as input format. The connector also supports AVRO, JSON Schema Registry, and Protobuf as other options.
Set the number of indexes to 3 and configure them as follows:
Index 1 configuration
Index → products-index
Topic → products
Index 2 configuration
Index → Chat-index
Topic → Chat_discussion
Index 3 configuration
Index → Chat-index
Topic → Chat_discussion
Leave the number of tasks as 1. In production, make sure to test the connector for your workload.
Review the Connector config and click Continue to create the connector.
Verifying the connector
To verify that the connector is working correctly and that data from the topics is being sinked into the corresponding OpenSearch indexes, follow these steps:
In the OpenSearch Dashboards navigation pane, click on "Dev Tools" under the "Management" section.
In the Dev Tools console, run the following command to retrieve data from the products-index:
This command will perform a search across all documents in the index and return up to 300 results.
Voila, now you will see the data from the products topic displayed in the right pane.
To verify the data from the other topics, run a similar command, but this time against chat-index:
You can further explore the Dev Tools console to perform more advanced queries, filter the data, or analyze the indexed documents as per your requirements.
The following are just a few benefits of using this connector:
Reduces operational burden: The connector is fully managed, which not only minimizes operational burden but also allows you to take advantage of the scalability and cost-effectiveness of serverless computing.
No-code integration: A no-code real-time data pipeline eliminates the need for writing or maintaining any code to move data.
Simple transformations before sending to OpenSearch: The connector leverages the Single Message Transformations (SMTs) feature of Kafka Connect, allowing you to apply stateless transformations to events before sending them to OpenSearch. This capability enables you to perform various data transformations without writing any code. For example, filtering messages by field to reduce storage costs in OpenSeach or masking PII information.
Versatile OpenSearch support: The connector supports Amazon OpenSearch Service, self-managed, or any third-party OpenSearch service making the connector a versatile option.
Cost-effective data ingestion: The connector offers a cost-effective solution for ingesting data from multiple Kafka topics into multiple OpenSearch indexes. With a single connector instance, you can sink data up to five topics to five indexes simultaneously. It also allows you to easily and instantly change the number of tasks using the API, CLI, console, or Terraform, providing flexibility to adjust resources based on your needs. Furthermore, the connector allows you to have fewer tasks than the number of topics, potentially reducing the overall cost. For example, if you have five tasks (one for each topic, though testing is recommended for optimal configuration), and the cost for each task is $0.1/hour, the total hourly cost for the five tasks would be $0.50, plus any applicable data processing charges.
We will continue to work on the connector to make it easier for customers to integrate both platforms. We plan to expand the OpenSearch authentication mechanisms supported by the connector. Adding support for AWS IAM Assume role as authentication backends for OpenSearch, would be useful for Amazon OpenSearch Serverless customers or any customer using AWS IAM as the main authentication/authorization mechanism for OpenSearch.
Additionally, we will add the option for customers to select Document IDs to be based on Kafka keys or a combination of topic+partition+offset. By doing this, the connector will provide exactly-once delivery semantics. This is because writes to OpenSearch are idempotent, which means even if an input event was processed more than once by the connector, it will only be written to one document in OpenSearch, as subsequent processing will override previous writes with the same document ID.
The new Confluent Cloud OpenSearch Sink Connector provides a seamless and fully managed way to stream data from Confluent Cloud to Amazon OpenSearch domains or any other OpenSearch deployment. The connector significantly reduces development cycles needed to build your own custom connector, as well as the operational burden of managing the integration.
By simplifying the integration between Confluent and OpenSearch, this connector unlocks a wide range of powerful use cases—from real-time log analytics, security monitoring, and social media analysis, to emerging areas like GenAI with RAG. With its flexibility, scalability, and cost-effectiveness, the OpenSearch Sink Connector represents a significant step forward in streamlining real-time data streaming to OpenSearch.
Not yet a Confluent customer? Try Confluent Cloud in AWS Marketplace. New sign-ups receive $400 in free credits to spend during their first 30 days. Your credits will be immediately visible on your Confluent account after subscribing to Confluent through the AWS Marketplace.
The rise of fully managed cloud services fundamentally changed the technology landscape and introduced benefits like increased flexibility, accelerated deployment, and reduced downtime. Confluent offers a portfolio of fully managed...
Learn the best practices for integrating Confluent with AWS Lambda to build event-driven architectures.