Développez l'apprentissage automatique prédictif avec Flink | Atelier du 18 déc. | S'inscrire

How to Source Data From Amazon DynamoDB to Confluent

Écrit par

Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database service that is highly available and scalable. It is designed to deliver single-digit millisecond query performance at any scale. It offers a fast and flexible way to store and retrieve data, making it a popular choice for hundreds of thousands of applications and customers around the world. As with any database, it is important to have a way to track changes to the data over time. This is where change data capture (CDC) with Confluent comes in. 

Confluent’s cloud-native, complete data streaming platform offers 10x the elasticity, storage, and resiliency of Apache Kafka®. It helps you move data from wherever it’s created to where it’s needed so different teams can create, share, and reuse data. In addition, it delivers GBps+ of throughput, which is essential for processing large amounts of streaming data in real time. This high throughput capability enables Confluent to handle data from various sources, process it quickly, and deliver it to the desired destination without delay. With Confluent, organizations can leverage streaming data to gain real-time insights, make informed decisions, and drive business outcomes.

Why CDC for DynamoDB data?

CDC is the process of capturing changes to data from a database and publishing them to an event stream, making them available for other systems to consume. By combining CDC with Confluent, you can create a powerful architecture that enables real-time data processing and analysis. This pattern can be used to power many use cases. The following are some examples:

  • Real-time analytics on the data stored in DynamoDB: This can be useful for applications that require immediate insights into the data, such as financial trading platforms, fraud detection systems, and e-commerce applications.

  • Integration with other systems: Confluent provides 120+ prebuilt connectors to other systems (25+ are available as fully managed sink connectors with Confluent on AWS), this allows you to easily connect DynamoDB data to other data sources and sinks making it easy to create complex real-time data pipelines.

  • Disaster recovery: DynamoDB global tables, provides a fully managed, multi-region, and multi-active database that delivers fast, local, read and write performance for massively scaled, global applications. Global tables replicate your DynamoDB tables automatically across your choice of AWS Regions. However, global tables do not support replication across multiple AWS accounts. In such scenarios, CDC with Confluent and DynamoDB can be used to capture and replicate changes in real time to another account. This approach allows organizations to ensure disaster recovery across multiple AWS accounts

  • Hydrating data lakes: with CDC and Confluent provides a real-time, scalable solution for processing and analyzing DynamoDB data. By ingesting changes to DynamoDB tables in real time to Confluent, organizations can then transform and write the data to a data lake for use in analytics, reporting, and other applications. Confluent’s Amazon S3 Sink Connector supports popular data formats like Parquet, enabling efficient storage and management of large volumes of data. This approach enables organizations to make faster and better data-driven decisions by making data available for analytics quickly. 

  • Multi-cloud strategy: this has become a popular approach for many organizations, allowing different business units to operate on different cloud platforms. However, this does not mean that data must be isolated and inaccessible across clouds. One of Confluent's core value propositions is its availability everywhere, including the three major cloud providers. This enables seamless integration and data sharing between different clouds, making it easier for organizations to manage and share their data across different environments. CDC with Confluent can make data from DynamoDB in one business unit available in real time to another application living in a different cloud provider.

  • Realtime data enrichment: CDC allows you to enrich data sourced from DynamoDB with data from almost any other data source, either on premises or in other native cloud services. Confluent’s ksqlDB allows you to easily modify, enrich, or enhance data that you have from DynamoDB with a simple SQL-like interface. Our connectors allow you to easily integrate with relational databases, object stores, messaging, and literally dozens of legacy systems to then incorporate with your DynamoDB data all in one place.

In this blog post, we will discuss three different methods to source data from DynamoDB to Confluent, their respective pros and cons, and guidelines on when to use one versus the other.

Challenges

DynamoDB provides two options for capturing changes to its tables: DynamoDB Streams and Kinesis Data Streams. These options enable the capture of a time-ordered sequence of item-level modifications in any DynamoDB table. However, both options come with certain limitations.

Kinesis Data Streams provides at-least-once delivery guarantees, implying that the stream may contain duplicate records. To prevent duplicates, consumers should either handle deduplication or be built in an idempotent manner. On the other hand, DynamoDB Streams guarantees exactly-once delivery but has a data retention limit of 24 hours, which may not be sufficient for many use cases that require higher retention needs. This could be mitigated by ingesting events from DynamoDB streams to a platform that offers longer retention periods, such as Confluent.

Furthermore, both options do not offer a seamless way for applications running on Azure or GCP to utilize the data and lack the same number of integrations and connectors (for native AWS services and third-party services) as Confluent. 

The availability of infinite storage in Confluent Cloud allows customers to store historical data without being constrained by retention limits. This is a key factor in Confluent's widespread adoption among thousands of organizations for their mission-critical use cases, many of whom also use DynamoDB. Consequently, these organizations are seeking ways to implement CDC with Confluent to efficiently ingest and process DynamoDB data in real time.

1. Using DynamoDB Streams and AWS Lambda

This method uses DynamoDB Streams to capture changes from the source DynamoDB table, and a Lambda function to replicate these changes to Confluent. AWS Lambda is a serverless compute service that lets you run code without provisioning or managing infrastructure..

To understand how this method works, we need to know how DynamoDB and DynamoDB Streams work. DynamoDB stores data into partitions, which are based on either a partition key or a composite key consisting of a partition key and a sort key. DynamoDB Streams are split into shards (unit of parallelism). DynamoDB creates at least one shard per partition in the source DynamoDB table. For instance, if the table has three partitions and DynamoDB Streams is enabled on this table, at least three shards will be created. It's worth noting that AWS fully manages both shards and partitions, and their numbers are dynamically set by the service to accommodate your workload. 

After the data is written to DynamoDB Streams, the Lambda Service reads records from the stream in batches and synchronously invokes a Lambda function with an event that contains the stream records. The function then processes the records from the batch and publishes the updates to Confluent.

The following Python code snippet demonstrates how to read events from a DynamoDB stream, process the events, and publish the results to Confluent using a Lambda function:

‘’’

from confluent_kafka import Producer
import json

Input_kafka_topic = "online.transactions"

# Initialize the producer

# Set up the Confluent Cloud producer configuration
conf = {
    'bootstrap.servers': '<your_bootstrap_servers>',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '<your_sasl_username>',
    'sasl.password': '<your_sasl_password>',
    'transactional.id': '<transactional_id>'
}

producer = Producer(conf)
def lambda_handler(event, context):
    try:
    # Initialize Kafka producer with transactional ID
        producer.init_transactions()
        producer.begin_transaction()
    # Process DynamoDB stream events

        for record in event['Records']:

            # Do some processing on the new image
            processed_record = process(record)

            # Publish the processed image to Confluent Cloud
            producer.produce('online.transactions', value=json.dumps(processed_record).encode('utf-8'))

        # Flush any outstanding messages before the Lambda function ends
        producer.flush()
        producer.commit_transaction()

    except Exception as e:
        # Catch exceptions
        print(e)

‘’’

Kafka producers are typically long-lived, and sending a message to a Kafka topic is an asynchronous process. Since Lambda functions are ephemeral, ensuring that any messages submitted to the producer are fully flushed before the Lambda function ends is important. This can be achieved by calling the producer.flush() method.

To optimize performance and reduce startup latency, it's recommended to initialize the Kafka producer outside of the Lambda function handler. This allows any objects created during initialization to remain in memory and be reused across multiple Lambda invocations, potentially improving performance and reducing resource usage.

Pros

The following are a few reasons when to choose this architecture:

  • Fully managed services: All components used in this method are fully managed and serverless services, which not only minimizes operational burden but also allows you to take advantage of the scalability and cost-effectiveness of serverless computing.

  • End-to-end exactly-once semantics: Lambda service invokes the function in batches. If any errors occur while processing a batch, the entire batch will be retried, potentially causing duplicates in Confluent. However, the function can leverage the support for Kafka transactions provided by Confluent to achieve end-to-end exactly-once semantics. In this mode, if an error occurs while processing the batch or the function fails for any reason, the transaction won't be marked as complete. Consequently, any events in the retried batch that were written to Confluent prior to the failure won't be visible to downstream consumers with isolation.level set to read_committed in the consumer configuration.

  • Item level ordering guarantees: Any changes to the number of shards in DynamoDB streams will not have an impact on the ordering of the actual modifications to the item. Additionally, Lambda will automatically consume older messages before newer ones without the need for any custom code.

  • Cost efficient: DynamoDB Streams do not charge for GetRecords API calls invoked by AWS Lambda as part of DynamoDB triggers. This means the use of DynamoDB streams is free with Lambda. 

Considerations

The following are a few considerations to take into account when working with this architecture:

  • You need to build and maintain code which could add complexity.

  • Error handling should be implemented in the function code. Otherwise, you might end up with stalled partitions.

2. Using Kinesis Data Streams connector

In this method, data changes in the source DynamoDB table are captured using Kinesis Data Streams. The fully managed Amazon Kinesis Source Connector for Confluent Cloud is then used to pull the changes from Amazon Kinesis and persist them in Confluent.

The following is a sample JSON that contains the connector configuration properties for the connector:

{
{
    "name" : "confluent-kinesis-source",
    "connector.class": "KinesisSource",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "kafka.topic" : "kinesis-testing",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.key.id": "<my-aws-access-key-secret>",
    "kinesis.stream": "my-kinesis-stream",
    "kinesis.region" : "<my-aws-region>",
    "kinesis.position": "AT_TIMESTAMP",
    "kinesis.shard.timestamp.ms": "1590692978237"
    "tasks.max" : "1"
}
  }
‘’’

An alternative to using the connector, Kinesis Data Streams can trigger a Lambda, which enables data transformation before writing changes to Confluent. However, this approach requires writing and maintaining code.

Pros

The following are a few reasons when to choose this architecture:

  • All components used in this method are fully managed and serverless services, which not only minimizes operational burden but also allows you to take advantage of the scalability and cost-effectiveness of serverless computing.

  • No-code real-time data pipeline eliminates the need for writing or maintaining any code.

Considerations

The following are a few considerations to take into account when working with this architecture:

  • In the Kinesis Source connector, every shard is assigned to a single task. If the number of specified shards exceeds the number of tasks, the connector will fail and throw an exception. Therefore, the connector cannot be utilized with Kinesis on demand, which does not require capacity planning and automatically scales the number of shards to meet throughput requirements. Employing the connector with Kinesis on demand can be impossible, as ensuring the number of tasks equals the number of shards can be challenging.

  • This architecture does not support end-to-end exactly-once semantics. Due to the at-least-once delivery semantics of Kinesis, duplicate records may already exist in Kinesis, and if the connector is restarted, there may be further duplicates in Confluent.

  • In addition to the regular costs of using DynamoDB and Kinesis Data Streams, DynamoDB imposes supplementary fees for using CDC with Kinesis Data Streams. These charges correspond to the number of change data capture units utilized by DynamoDB, where each 1KB write operation incurs one unit charge. It is noteworthy that when taking into account the charges for Confluent's connector and cluster, this solution becomes comparatively more expensive than other approaches discussed in this post. 

  • The architecture incorporates two different data streaming platforms, which can make it challenging to comprehend. Nevertheless, numerous organizations employ both platforms in their environments. In such scenarios, a few teams within the organization can utilize Kinesis (or the equivalent first-party cloud service provider service) while Confluent can function as the central nervous system that interconnects these environments.

3. Using the open source connector

In this method, data changes in the source DynamoDB table are captured using DynamoDB Streams. A self-managed open source Kafka connector is then used to read this data and replicate the changes to Confluent.

The most straightforward way to host a self-managed connector in AWS is to use a combination of the Elastic Kubernetes Service (EKS) on Fargate and Confluent for Kubernetes (CFK) as covered in this blog post (we recently launched Custom Connectors aka bring your own connector for Confluent Cloud which could also be used in lieu of EKS and CFK). This gives you the benefits of low operations Kubernetes and the ease of deployment and management via CFK. Instead of deploying the S3 source connector as covered in that post, you will deploy an open source DynamoDB connector from the following GitHub repository.

Here is an example configuration for the connector:

‘’’
{
    "connector.class": "DynamoDBSourceConnector",

    "aws.region": "eu-west-1",
    "aws.access.key.id": "",
    "aws.secret.key": "",

    "dynamodb.table.env.tag.key": "environment",
    "dynamodb.table.env.tag.value": "dev",
    "dynamodb.table.ingestion.tag.key": "datalake-ingest",
    "dynamodb.table.whitelist": "",
    "dynamodb.service.endpoint": "",

    "kcl.table.billing.mode": "PROVISIONED",

    "resource.tagging.service.endpoint": "",

    "kafka.topic.prefix": "dynamodb-",
    "tasks.max": "1",

    "init.sync.delay.period": 60,
    "connect.dynamodb.rediscovery.period": "60000"
}
‘’’

Here are the required permissions for the associated IAM role you will attach to the EKS task:

‘’’
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeTable",
                "dynamodb:DescribeStream",
                "dynamodb:ListTagsOfResource",
                "dynamodb:DescribeLimits"
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator", 
                "dynamodb:Scan"
            ],
            "Resource": [
                "arn:aws:dynamodb:*:*:table/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:*:*:table/datalake-KCL-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:ListStreams",
                "dynamodb:ListTables",
                "dynamodb:ListGlobalTables",
                "tag:GetResources"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
‘’’

Pros

The following are a few reasons when to choose this architecture:

  • By running this Kafka connector you do not need to build and maintain the code like the Lambda option.

  • Kafka Connect has built-in scaling mechanisms (though not automatic).

  • The connector supports connect.dynamodb.rediscovery.period which automatically discovers added or removed DynamoDB tables and updates the connector configuration automatically.

Considerations

The following are a few considerations to take into account when working with this architecture:

  • You will need to build automation for scaling up and down connector tasks and/or EKS task definition to allocate enough resources to match the events emanating from DynamoDB.

  • The code is open source and therefore not directly supported by Confluent or AWS, leaving troubleshooting issues to be handled internally.

  • Amazon Kinesis Client (KCL) keeps metadata in a separate dedicated DynamoDB table for each DynamoDB stream it's tracking. Meaning that there will be one additional table created for each table this connector is tracking.

  • The current connector implementation supports only one Kafka Connect task (= KCL worker) reading from one table at any given time. Due to this limitation, the maximum throughput from one table will be ~2,000 records (change events) per second.

  • Synced (Source) DynamoDB table unit capacity must be large enough to ensure INIT_SYNC is finished in around 16 hours. Otherwise, there is a risk of INIT_SYNC being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours. Although INIT_SYNC is only used when the task is started for the first time, due to the limited retention of the changes, the correct throughput must be configured to ensure this task finishes in 16 hours or less, otherwise, the process will restart and loop indefinitely.

Conclusion

By leveraging CDC with DynamoDB data, various advantages are gained, ranging from hydrating data lakes to facilitating real-time data processing. Pairing CDC with Confluent not only expands the range of data sources that can be used to enrich DynamoDB data, but also broadens the potential destinations to which the data can be sent.

At the time of this blog post, there is currently no specific standard for capturing data changes from DynamoDB and feeding them into Confluent. To address this we have outlined three different architectural patterns in this post for implementing CDC with Confluent on DynamoDB data.

The first and most recommended option is using DynamoDB streams with Lambda, which provides a highly scalable solution. With Lambda's efficient auto-scaling capabilities, this option can effortlessly handle varying workloads. Additionally, this option is a cost-effective choice since AWS does not impose any additional charges for using DynamoDB streams with Lambda as part of DynamoDB triggers. If you prefer not to write code but need an option that supports exactly-once semantics, we suggest you use the open source connector option. Finally, if your use case has fairly consistent throughput and you want a fully managed, no-code solution, then we advise you to use the Kinesis option.

  • Ahmed Zamzam is a staff partner solutions architect at Confluent, specializing in integrations with cloud service providers (CSPs). Leveraging his expertise, he collaborates closely with CSP service teams to develop and refine integrations, empowering customers to leverage their data in real time effectively. Outside of work, Zamzam enjoys traveling, playing tennis, and cycling.

  • Joseph Morais started early in his career as a network/solution engineer working for FMC Corporation and then Urban Outfitters (UO). At UO, Joseph joined the e-commerce operations team, focusing on agile methodology, CI/CD, containerization, public cloud architecture, and infrastructure as code. This led to a greenfield AWS opportunity working for a startup, Amino Payments, where he worked heavily with Kafka, Apache Hadoop, NGINX, and automation. Before joining Confluent, Joseph helped AWS enterprise customers scale through their cloud journey as a senior technical account manager. At Confluent, Joseph serves as cloud partner solutions architect and Confluent Cloud evangelist.

Avez-vous aimé cet article de blog ? Partagez-le !