Tuesday, October 3, 2023

Import CSV files to AWS DynamoDB

I have seen many articles about loading CSV file in DynamoDB, but most of them are not real life examples, many of them usages row-by-row processing which is not possible in real life scenario. Some of them are using Lambda function to load from S3 but haven't mentioned that a lambda function can run at max 15 minutes, So what will happen if you fail to load a huge dataset? Any of them doesn't tell about error handling, how do you load the erroneous rows after correcting the data? what are the security measures you should take? What are the options available to you and when and How to use them?

Today, I am going to narrate a real world project which will address all these points and will tell how I have approached to solve the problem.

In my current company IBM, our client is one of the largest Hotel chain in the world. The requirement is business users should be able to upload a CSV file to a DynamoDB table. The users should be able to load the CSV file from their computer to DynamoDB table, the CSV file name and the DynamoDB table name will be provided by the users dynamically and the solution should be able to load those in "append" mode. There should be an error log file in CSV format with all the fields and the error message, so that the users can correct the values latter and can load that delta rows. The CSV files are huge with millions of rows. There are no specific frequency of loading, it should be as and when required or the user has the need of uploading a CSV.

So, first I have documented all the possible approaches and architecture to solve the problem.

There are several methods to importing a CSV file into DynamoDB. All methods have its own pros and cons, requires different level of expertise and programming ability.

Some of the methods that can be used to import CSV into DynamoDB are:

  • Using NoSQL Workbench, manual upload
  • Using S3, Lambda
  • AWS Data Pipeline
  • DynamoDB Table Import from S3
  • Using Amazon EMR (Elastic MapReduce)
  • AWS Database Migration Service (AWS DMS)
  • AWS Glue.
  • Load CSV from local machine using Python.

All the above options may have some cost implications (specially for new and sophisticated services). I will discuss few of them very quickly.

Option 1: Using NoSQL Workbench (least recommended)

Pros:

  • The easiest method requires zero or minimum expertise in AWS and DynamoDB.
  • Small chunks will be uploaded, so easy to verify each upload.
  • No extra service even S3 is required, can be directly uploaded from local computer.
  • Error handling is possible.

Cons:

  •   Manual intervention is required.
  • There is a limit of 150 rows at a time, so requires splitting of the CSV files.
  • Manually has to click the button for every CSV to be uploaded.
  • Need to repeat the steps for every environment (dev, test and prod etc.)
  • Manual error handling.

Documents & Resources: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/workbench.Visualizer.ImportCSV.html

Option 2: Using S3 and Lambda 

Pros:

  • No or minimum manual intervention.
  • Fully automated and scalable option can be deployed in various environment.
  • Automated error handling can be customized according to need.
  • No limit of rows or files. New files will be automatically uploaded to database.
  • CDC (Change Data Capture – new files latter) can be implemented.

Cons:

  • Intermediate knowledge of AWS is required.
  • Extra service like S3 and Lambda is needed, python programming knowledge is required.
  • Looks a bit slower. Need to configure lambda with higher memory (cost) and test.
  • Timeout for a Lambda function is 15 minutes at max.

Documents & Resources:

https://aws.amazon.com/blogs/database/implementing-bulk-csv-ingestion-to-amazon-dynamodb/

https://dev.to/aws-builders/how-to-import-csv-data-into-dynamodb-using-lambda-and-s3-event-triggers-24io

https://docs.aws.amazon.com/lambda/latest/dg/configuration-function-common.html#configuration-timeout-console

Option 3: DynamoDB Table Import

Pros:

  • Semi-automated, minimal manual intervention and very easy to implement.
  • Minimal AWS knowledge is sufficient, may be some admin support required.
  • No row limit. No number of csv files limit.
  • Error log can be generated.

Cons:

  • Not fully automated, has to do same operations for all the environments.
  • Checking of CloudWatch logs and correcting the BAD rows required.
  • New table will be created, can not append into existing table.
  • CDC (Change Data Capture – new files latter) need some workaround.

Documents & Resources:

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.Requesting.html

https://aws.amazon.com/blogs/database/amazon-dynamodb-can-now-import-amazon-s3-data-into-a-new-table/

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.HowItWorks.html

Option 4: AWS Data Pipeline 

This is another option but not recommended, close to DynamoDB Table Import, but it supports some selective regions and has extra cost. Also, in maintenance mode.

 

NOTE:

Option 5: AWS Database Migration Service - AWS DMS


Pros:

  • Fully automated process.
  • No limit of rows or files.
  • CDC (Change Data Capture – new files latter) can be implemented.
  • Multiple files with multiple tables can be configured.

Cons:

  • Need Advanced knowledge of AWS.
  • Need DMS which may incur certain cost.

Documents & Resources:

https://aws.amazon.com/blogs/database/migrate-delimited-files-from-amazon-s3-to-an-amazon-dynamodb-nosql-table-using-aws-database-migration-service-and-aws-cloudformation/

https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.S3.html

 

Option 6: Loading CSV from local machine using Python


Pros:
No need to use any AWS service, you can upload CSV to DynamoDB from your local computer. Cost free.
No limit of rows or files.
CDC (Change Data Capture – new files latter) can be implemented.
Multiple files with multiple tables can be programmed.
Error handling can be customized.
You can spawn a EC2 instance and run it from there for better performance and no need to monitor from local computer, else can use a VDI.

Cons:
Need knowledge of AWS and Python, development and testing effort to be considered.
Need to run this in a computer for hours, power failure, internet disconnection and speed to be considered.

 

We have discussed with our client and decided to go for the Option 6, as the main and original requirement is uploading csv from business users computer to DynamoDB with option to mention various tables and various csv files, customized error handling with error csv is must. The Cons in this approach can be handled easily by using a VDI.

This code also can be changed to a Lambda function with minimum effort.

Now, the coding part!! The code I made is sufficiently documented, So, you can understand. I have changed some table names and the CSV file names to protect the identity of the client.

Also, this code is not the original code that I have deployed into production, but this has all the elements you required to deliver a similar solution. Read every comment carefully!


# import required libraries..
# Some libraries are NOT used in the example code, but in real life you may need those.
import boto3
import csv
import sys
import pandas as pd
import ast
import botocore
from botocore.config import Config
from botocore.exceptions import ClientError

# Saubhik: This will load csv file from local to AWS DynamoDB.

# Access Credentials
###################

# Change the access key as per your need.
a_key = "AKIA6E4JE5XXXXXX"

# Change the sccret key as per your need.
a_S_key = "mmDFZ6mNLWa+eviXXXXXX+oYYYYYYdHAM"

# Change the region as required
region = 'ap-south-1'

"""
The above section is just an example to show you that, you need AWS access credentials.
You can not do that is real world.
In real project a profile is configured using AWS CLI: aws configure --profile your_profile_name
Download and install AWS CLI from https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html
Then the profile name is read from a configuration file called config.ini using ConfigParser
"""

# Dataset contains lots of duplicate, So overwrite_keys provided. You cannot perform multiple
# operations on the same item in the same BatchWriteItem request.
overwrite_keys = ["xpmID"]

# CSV file name, change this as per your need
filename = "C:\Saubhik\Project\DynamoDB\CSV\mvw_at04162023.csv"


# Error file name.
errorfile = "C:\Saubhik\Project\DynamoDB\CSV\mycsv_error.csv"

"""
The above two and the below DynamoDB table mane is command line argument for the Python
script in real project. This is done by importing sys and using sys.arv

"""

# Connecting to DynamoDB
try:
dynamodb = boto3.resource("dynamodb",
aws_access_key_id=a_key,
aws_secret_access_key=a_S_key,
region_name=region,
config=Config(
retries={"max_attempts": 5, "mode": "adaptive"},
max_pool_connections=1024,
)
)

except Exception as error:
print(error)
raise
# DynamoDB table name, change it as per need.

try:
table = dynamodb.Table("mwcsv8jun")
except Exception as error:
print("Error loading DynamoDB table. Check if table was created correctly and environment variable")
print(error)
raise

# CSV Reading using Panda data frame - SB.
# Note the data types.
Customers = pd.read_csv(filename, dtype={'createDate': 'Int32',
'cognitoSub': str,
'customerIdHash': str,
'username': str,
'mpmID': str}
)


# Here you may get "Float types are not supported. Use Decimal types instead." This is
# a very common error in DynamoDB.

# Trying to write in batch to get faster inserts. overwrite_by_pkeys=overwrite_keys
try:
with table.batch_writer(overwrite_by_pkeys=overwrite_keys) as batch:
for i, record in enumerate(Customers.to_dict("records")):
# add to dynamodb
try:

batch.put_item(
Item=record
)

except Exception as error:
print(error)
print(record)
print("End of Data Load.. starting new batch automatically..")
# Writing an error file. This is written more dynamically in real project.
try:
with open(errorfile, 'a') as f:
f.writelines(
str(record["xpmID"]) + "," + str(record["cognitoSub"]) + "," + str(record["customerIdHash"])
+ "," + str(record["username"]) + "," + str(record["createDate"]) + "," + str(error) + '\n')
except Exception as error:
print(error)
except Exception as error:
print(error)