In this article we will explore the implementation behind setting up AWS DataPipeline for importing CSV files to a RedShift cluster.
This application is useful for data recovery, data backup or incremental updates in a production AWS environment.
The end goal is to have the ability for a user to upload a csv (comma separated values) file to a folder within an S3 bucket and have an automated process immediately import the records into a redshift database.
GitHub repository / setup
You can try this cloudformation template using ansible and awscli as a driver. The software requirements are Ansible 2.3.0, psql 9.6.2 and awscli 1.11.61.
git clone https://github.com/dmccue/s3-datapipe-redshift.git && cd s3-datapipe-redshift
To create application – ./stack_update.sh
To delete application – ./stack_delete.sh
Pull requests are welcome!
List of Resources
s3-datapipe-redshift.yml: AWS::CloudFormation::Stack – Used for nested stack – S3
s3-datapipe-redshift.yml: AWS::CloudFormation::Stack – Used for nested stack – RS
s3-datapipe-redshift.yml: AWS::CloudFormation::Stack – Used for nested stack – AWSDP
nested-awsdp.yml: AWS::DataPipeline::Pipeline – Resource to create DataPipeline
nested-s3.yml: AWS::S3::Bucket – Bucket resource to store incoming/ and logs/
nested-s3.yml: AWS::Lambda::Permission – Permission for bucket to call Lambda
nested-s3.yml: AWS::Logs::LogGroup – LogGroup to store Lambda logs
nested-s3.yml: AWS::Lambda::Function – Lambda python function to call AWSDP
nested-s3.yml: AWS::IAM::Role – Role to allow Lambda to interact with AWSDP
nested-redshift.yml: AWS::Redshift::Cluster – Single or Multi-node RedShift cluster
nested-redshift.yml: AWS::Redshift::ClusterParameterGroup – Settings to apply to RS
nested-redshift.yml: AWS::Redshift::ClusterSubnetGroup – Interconnect for multi
nested-redshift.yml: AWS::EC2::VPC – External network fabric for IG to attach to
nested-redshift.yml: AWS::EC2::Subnet – Subnet within this external network
nested-redshift.yml: AWS::EC2::InternetGateway – Gateway to interface to internet
nested-redshift.yml: AWS::EC2::VPCGatewayAttachment – Attachment VPC to IGW
nested-redshift.yml: AWS::EC2::RouteTable – Routing table to route via IGW
nested-redshift.yml: AWS::EC2::Route – Route to route via IGW
nested-redshift.yml: AWS::EC2::SubnetRouteTableAssociation – Map subnet > route table
nested-redshift.yml: AWS::EC2::SecurityGroup – Open TCP port to IGW then Internet
The S3 bucket that is required for this use-case is simplistic, the only main alteration is the addition of a LambdaConfiguration to the bucket’s NotificationConfiguration. The LambdaConfiguration can be configured to only alert when an item named *.csv is fully uploaded to a subfolder named ‘incoming’. This convention goes back to early FTP servers but it’s suitable to differentiate the folder name from others. Without this the lambda function would trigger across all folders in the bucket.
So to clarify, the lambda trigger will only execute if the CSV file is uploaded to [BucketName]/incoming/*.csv. Where * is a wildcard representing any filename.
Default security disallows the bucket to invoke Lambda, so we can modify that using the BucketPermission resource which grants that ability.
The final part of the bucket configuration is to link the LambdaConfiguration to the lambda function created in the next section using the function’s ARN.
Lambda function creation
In order to run code in response to a notification from a bucket, a lambda function is created in python for this purpose. The function is passed an event object containing properties and circumstances which caused the event. You can observe the code to check for various relevant properties, however the primary ones we’ll be interested in are the bucket name and object key. These will be passed to the DataPipeline as a source during activation.
The last remaining info that the function will need is the Pipeline ID so it knows which pipeline to activate. This is done by setting an environment variable ‘PipelineName’ at cloudformation creation for the lambda function. During execution the lambda function will read this name and perform a lookup to retrieve the pipeline ID e.g. df-06107451OSPIQL2YG7Ne. This is to avoid a circular dependency as the ID can only be known at creation once the pipeline has been created and the pipeline requires a bucket to be present to write logs to.
By default the lambda execution environment has limited permissions, listing and activating AWS DataPipeline service isn’t one of those. For this reason we’ll need to create an IAM Lambda execution role policy named ‘LambdaExecutionRole’.
There isn’t much that differs from the standard RedShift cloudformation template, all we pass to the template is the variable ‘MasterUserPassword’ to allow for adjustment. The template itself will create a single node redshift cluster and will create a postgres database, called by default ‘devredshift’. There’s a number of constraints on variables such as passwords requiring a minimum of 8 characters and lowercase characters for database names, so it’s best to read the documentation if these are altered from current.
The datapipeline is quite an ordeal to configure via cloudformation, however the config can be broken into three categories:
- ParameterObjects – Similar to configuration keys
- ParameterValues – Similar to configuration values mapped from ParameterObjects
- PipelineObjects – Subcomponents of the pipeline relating to input, processing, output – These are configured using the keys & values in 1&2
The best way to create your cloudformation configuration for DataPipeline is to use the console wizard and select a pre-defined template. After this has displayed in the GUI, you can then click on the export button to download the definition in json format. At this point I used cfn-flip to covert to yaml and began the process of ordering, tidying then substituting variables. The documentation for DataPipeline is currently lacking, without a great deal of real world examples outside the standard wizard templates.
Debugging the pipeline
There’s a number of places to look in order to diagnose any unexpected behaviour.
- CloudWatch logs from output of Lambda trigger function, this is stored in LogGroup /aws/lambda/[lambda function name]
- DataPipeline bucket logs, stored in [BucketName]/logs/[PipelineID]
- Retrieving diagnostic information from an import table within redshift named stl_load_errors
select starttime, filename, err_reason, line_number, colname, type, col_length, position, raw_field_value, raw_line, err_code
order by starttime desc;
Healthy lambda logs from CloudWatch logs (1) look like:
START RequestId: 96ff5c05-07fc-11e7-9203-636bc0cedaa9 Version: $LATEST
END RequestId: 96ff5c05-07fc-11e7-9203-636bc0cedaa9
REPORT RequestId: 96ff5c05-07fc-11e7-9203-636bc0cedaa9 Duration: 4269.46 ms Billed Duration: 4300 ms Memory Size: 128 MB Max Memory Used: 34 MB
Just to point out, the lambda function ran for a billed duration of 4.3 seconds with 128MB RAM. We’ll explore costings related to this activity later in the article.
Healthy DataPipeline logs stored in the s3 /logs/ folder (2) look like:
13 Mar 2017 14:56:18,660 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.taskrunner.TaskPoller: Executing: amazonaws.datapipeline.activity.RedshiftCopyActivity@69d8e38c 13 Mar 2017 14:56:19,307 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.database.ConnectionFactory: Created connection jdbc:postgresql://dsm-test-myredshiftstack-e36uihst-redshiftcluster-1wote419upvtr.ccgmhmmtpvte.eu-west-1.redshift.amazonaws.com:5439/devredshift?tcpKeepAlive=true 13 Mar 2017 14:56:41,796 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.taskrunner.HeartBeatService: Finished waiting for heartbeat thread @RedshiftLoadActivity_2017-03-13T14:51:53_Attempt=1 13 Mar 2017 14:56:41,797 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.taskrunner.TaskPoller: Work RedshiftCopyActivity took 0:23 to complete
This entire application consisting of S3, lambda, redshift, datapipeline, IAM, cloudwatch has been deployed using cloudformation. This makes it easy for multiple components to be packaged into a singular construct. To assist with this further, nested stacks are used to split similar components into discreet reusable templates, ideal for alternative projects. An easy way to visualise cloudformation, as with most topics in computer science is the input-processing-output mindset although these are labelled Parameters, Resources and Outputs in AWS.
This specific application uses ansible to manage the creation/update/deletion workflow and in addition uses SNS notifications to alert administrative users of the stack status via email or SMS. Ansible has benefits in that it can update a currently completed or updated stack. Re-running the ./stack_update.sh helper script will update existing resources without having to wait for around 10 minutes for the serial creation of resources, i.e. only resources that have been modified in the update are replaced. Furthermore ansible assists by initiating the s3 upload and waits until the pipeline has finished importing.
As part of the ansible script, it will ensure the stack has been created then upload the file contained in data/data.csv. It is a simple file as the scope of this article won’t explore the variation in field types or database internals. Feel free to modify this to your own values.
One of the largest benefits with lambda is that you only pay for when the function is executing, there’s no additional costs after the function returns. Typically our lambda trigger function, with debug messages, takes around 5000ms (5s) to execute. Lets calculate the cost per trigger using the AWS Lambda pricing calculator, excluding free tier:
100000000 executions, using 128MB memory, taking 5000 milliseconds would work out at $1061.88 per month
We’re not even close to this number so lets divide the total by 100 million to get the cost per file import.
1061.88 / 100000000 = $0.0000106188 per execution, it’s a no-brainer to use lambda!
Comparing this to a minimal EC2 instance performing the tasks of a daemon, and after coding it works out at roughly $20 per month. You could get close on 1.9 million file triggers per month before using an EC2 instance becomes more cost effective.
In summary the resources needed to set this workflow are non-trivial, however once it’s running there is minimal maintenance required. The entire stack can be updated or destroyed easily allowing for modification.
Practical operation would allow users to be given AWS credentials which point to a user which has a group IAM policy to allow writes to the /incoming folder of the created bucket.
Further optimisations could include lambda to alert SNS notifications when files have been uploaded. Lambda function could insert into a table derived from the filename when considering multi-table imports. Although there doesn’t appear to be a post-import trigger for lambda, for cleanup purposes once the import has completed successfully it could run an s3 deletion on the file.
With AWS Glue on the horizon, it may be able to replace this workflow with a fully managed CSV import from S3, however it’s not yet known what level of customisation and flexibility this will offer. I’ve signed up for the AWS Glue preview to be one of the first to experience what it can offer, that could result in a comparison blog post.