AWS Cloudformation for S3 Import to RedShift using DataPipeline & Lambda

Introduction

In this article we will explore the implementation behind setting up AWS DataPipeline for importing CSV files to a RedShift cluster.
This application is useful for data recovery, data backup or incremental updates in a production AWS environment.
The end goal is to have the ability for a user to upload a csv (comma separated values) file to a folder within an S3 bucket and have an automated process immediately import the records into a redshift database.

GitHub repository / setup

You can try this cloudformation template using ansible and awscli as a driver.  The software requirements are Ansible 2.3.0, psql 9.6.2 and awscli 1.11.61.

git clone https://github.com/dmccue/s3-datapipe-redshift.git && cd s3-datapipe-redshift
vi ansible/vars.yml
To create application – ./stack_update.sh
To delete application – ./stack_delete.sh

Pull requests are welcome!

List of Resources

s3-datapipe-redshift.yml: AWS::CloudFormation::Stack – Used for nested stack – S3
s3-datapipe-redshift.yml: AWS::CloudFormation::Stack – Used for nested stack – RS
s3-datapipe-redshift.yml: AWS::CloudFormation::Stack – Used for nested stack – AWSDP
nested-awsdp.yml: AWS::DataPipeline::Pipeline – Resource to create DataPipeline
nested-s3.yml: AWS::S3::Bucket – Bucket resource to store incoming/ and logs/
nested-s3.yml: AWS::Lambda::Permission – Permission for bucket to call Lambda
nested-s3.yml: AWS::Logs::LogGroup – LogGroup to store Lambda logs
nested-s3.yml: AWS::Lambda::Function – Lambda python function to call AWSDP
nested-s3.yml: AWS::IAM::Role – Role to allow Lambda to interact with AWSDP
nested-redshift.yml: AWS::Redshift::Cluster – Single or Multi-node RedShift cluster
nested-redshift.yml: AWS::Redshift::ClusterParameterGroup – Settings to apply to RS
nested-redshift.yml: AWS::Redshift::ClusterSubnetGroup – Interconnect for multi
nested-redshift.yml: AWS::EC2::VPC – External network fabric for IG to attach to
nested-redshift.yml: AWS::EC2::Subnet – Subnet within this external network
nested-redshift.yml: AWS::EC2::InternetGateway – Gateway to interface to internet
nested-redshift.yml: AWS::EC2::VPCGatewayAttachment – Attachment VPC to IGW
nested-redshift.yml: AWS::EC2::RouteTable – Routing table to route via IGW
nested-redshift.yml: AWS::EC2::Route – Route to route via IGW
nested-redshift.yml: AWS::EC2::SubnetRouteTableAssociation – Map subnet > route table
nested-redshift.yml: AWS::EC2::SecurityGroup – Open TCP port to IGW then Internet

Bucket creation

The S3 bucket that is required for this use-case is simplistic, the only main alteration is the addition of a LambdaConfiguration to the bucket’s NotificationConfiguration.  The LambdaConfiguration can be configured to only alert when an item named *.csv is fully uploaded to a subfolder named ‘incoming’.  This convention goes back to early FTP servers but it’s suitable to differentiate the folder name from others.  Without this the lambda function would trigger across all folders in the bucket.

So to clarify, the lambda trigger will only execute if the CSV file is uploaded to [BucketName]/incoming/*.csv.  Where * is a wildcard representing any filename.

Default security disallows the bucket to invoke Lambda, so we can modify that using the BucketPermission resource which grants that ability.

The final part of the bucket configuration is to link the LambdaConfiguration to the lambda function created in the next section using the function’s ARN.

Lambda function creation

In order to run code in response to a notification from a bucket, a lambda function is created in python for this purpose.  The function is passed an event object containing properties and circumstances which caused the event.  You can observe the code to check for various relevant properties, however the primary ones we’ll be interested in are the bucket name and object key.  These will be passed to the DataPipeline as a source during activation.

The last remaining info that the function will need is the Pipeline ID so it knows which pipeline to activate.  This is done by setting an environment variable ‘PipelineName’ at cloudformation creation for the lambda function.  During execution the lambda function will read this name and perform a lookup to retrieve the pipeline ID e.g. df-06107451OSPIQL2YG7Ne.  This is to avoid a circular dependency as the ID can only be known at creation once the pipeline has been created and the pipeline requires a bucket to be present to write logs to.

By default the lambda execution environment has limited permissions, listing and activating AWS DataPipeline service isn’t one of those.  For this reason we’ll need to create an IAM Lambda execution role policy named ‘LambdaExecutionRole’.

Redshift creation

There isn’t much that differs from the standard RedShift cloudformation template, all we pass to the template is the variable ‘MasterUserPassword’ to allow for adjustment.  The template itself will create a single node redshift cluster and will create a postgres database, called by default ‘devredshift’.  There’s a number of constraints on variables such as passwords requiring a minimum of 8 characters and lowercase characters for database names, so it’s best to read the documentation if these are altered from current.

DataPipeline creation

The datapipeline is quite an ordeal to configure via cloudformation, however the config can be broken into three categories:

  1. ParameterObjects – Similar to configuration keys
  2. ParameterValues – Similar to configuration values mapped from ParameterObjects
  3. PipelineObjects – Subcomponents of the pipeline relating to input, processing, output – These are configured using the keys & values in 1&2

The best way to create your cloudformation configuration for DataPipeline is to use the console wizard and select a pre-defined template.  After this has displayed in the GUI, you can then click on the export button to download the definition in json format.  At this point I used cfn-flip to covert to yaml and began the process of ordering, tidying then substituting variables.  The documentation for DataPipeline is currently lacking, without a great deal of real world examples outside the standard wizard templates.

Debugging the pipeline

There’s a number of places to look in order to diagnose any unexpected behaviour.

  1. CloudWatch logs from output of Lambda trigger function, this is stored in LogGroup /aws/lambda/[lambda function name]
  2. DataPipeline bucket logs, stored in [BucketName]/logs/[PipelineID]
  3. Retrieving diagnostic information from an import table within redshift named stl_load_errors

    select starttime, filename, err_reason, line_number, colname, type, col_length, position, raw_field_value, raw_line, err_code
    from stl_load_errors
    order by starttime desc;

Healthy lambda logs from CloudWatch logs (1) look like:

START RequestId: 96ff5c05-07fc-11e7-9203-636bc0cedaa9 Version: $LATEST
…[debug output]…
END RequestId: 96ff5c05-07fc-11e7-9203-636bc0cedaa9
REPORT RequestId: 96ff5c05-07fc-11e7-9203-636bc0cedaa9 Duration: 4269.46 ms Billed Duration: 4300 ms Memory Size: 128 MB Max Memory Used: 34 MB

Just to point out, the lambda function ran for a billed duration of 4.3 seconds with 128MB RAM.  We’ll explore costings related to this activity later in the article.

Healthy DataPipeline logs stored in the s3 /logs/ folder (2) look like:

13 Mar 2017 14:56:18,660 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.taskrunner.TaskPoller: Executing: amazonaws.datapipeline.activity.RedshiftCopyActivity@69d8e38c 13 Mar 2017 14:56:19,307 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.database.ConnectionFactory: Created connection jdbc:postgresql://dsm-test-myredshiftstack-e36uihst-redshiftcluster-1wote419upvtr.ccgmhmmtpvte.eu-west-1.redshift.amazonaws.com:5439/devredshift?tcpKeepAlive=true 13 Mar 2017 14:56:41,796 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.taskrunner.HeartBeatService: Finished waiting for heartbeat thread @RedshiftLoadActivity_2017-03-13T14:51:53_Attempt=1 13 Mar 2017 14:56:41,797 [INFO] (TaskRunnerService-resource:df-02713511AAE1QOR9PEQ2_@Ec2Instance_2017-03-13T14:51:53-0) df-02713511AAE1QOR9PEQ2 amazonaws.datapipeline.taskrunner.TaskPoller: Work RedshiftCopyActivity took 0:23 to complete

Cloudformation properties

This entire application consisting of S3, lambda, redshift, datapipeline, IAM, cloudwatch has been deployed using cloudformation.  This makes it easy for multiple components to be packaged into a singular construct.  To assist with this further, nested stacks are used to split similar components into discreet reusable templates, ideal for alternative projects.  An easy way to visualise cloudformation, as with most topics in computer science is the input-processing-output mindset although these are labelled Parameters, Resources and Outputs in AWS.
This specific application uses ansible to manage the creation/update/deletion workflow and in addition uses SNS notifications to alert administrative users of the stack status via email or SMS.  Ansible has benefits in that it can update a currently completed or updated stack.  Re-running the ./stack_update.sh helper script will update existing resources without having to wait for around 10 minutes for the serial creation of resources, i.e. only resources that have been modified in the update are replaced.  Furthermore ansible assists by initiating the s3 upload and waits until the pipeline has finished importing.

As part of the ansible script, it will ensure the stack has been created then upload the file contained in data/data.csv.  It is a simple file as the scope of this article won’t explore the variation in field types or database internals.  Feel free to modify this to your own values.

Pricing considerations

One of the largest benefits with lambda is that you only pay for when the function is executing, there’s no additional costs after the function returns.  Typically our lambda trigger function, with debug messages, takes around 5000ms (5s) to execute.  Lets calculate the cost per trigger using the AWS Lambda pricing calculator, excluding free tier:
https://s3.amazonaws.com/lambda-tools/pricing-calculator.html

100000000 executions, using 128MB memory, taking 5000 milliseconds would work out at $1061.88 per month
We’re not even close to this number so lets divide the total by 100 million to get the cost per file import.

1061.88 / 100000000 = $0.0000106188 per execution, it’s a no-brainer to use lambda!

Comparing this to a minimal EC2 instance performing the tasks of a daemon, and after coding it works out at roughly $20 per month.   You could get close on 1.9 million file triggers per month before using an EC2 instance becomes more cost effective.

Conclusion

In summary the resources needed to set this workflow are non-trivial, however once it’s running there is minimal maintenance required.  The entire stack can be updated or destroyed easily allowing for modification.

Practical operation would allow users to be given AWS credentials which point to a user which has a group IAM policy to allow writes to the /incoming folder of the created bucket.

Further optimisations could include lambda to alert SNS notifications when files have been uploaded.  Lambda function could insert into a table derived from the filename when considering multi-table imports.  Although there doesn’t appear to be a post-import trigger for lambda, for cleanup purposes once the import has completed successfully it could run an s3 deletion on the file.

With AWS Glue on the horizon, it may be able to replace this workflow with a fully managed CSV import from S3, however it’s not yet known what level of customisation and flexibility this will offer.  I’ve signed up for the AWS Glue preview to be one of the first to experience what it can offer, that could result in a comparison blog post.

Eucalyptus Object Storage (S3) via ceph-radosgw

Introduction

Eucalyptus object storage has typically been managed by our walrus software component. Recently support for riakcs has been added which also provides an s3 interface.

In this blog entry we’ll explore the settings needed to configure ceph-radosgw with eucalyptus, and what’s needed to configure an S3 client to interface to eucalyptus.

Tested version is Eucalyptus 4.2.1 general release interfacing against ceph version 0.94.5-9.el7cp

Ceph storage servers &    < — >  Ceph radosgw  < — > Eucalyptus OSG < — > S3 Client

Ceph monitor servers

Prior to this installation, follow the setup steps detailed in: http://docs.ceph.com/docs/master/radosgw/config/

Ceph configuration

As the radosgw is REST based, it’s relatively easy to setup either active or passive loadbalancing.  In this particular setup we’ll be using round-robin DNS against two radosgw servers on the backend. Won’t be going into how to interface the ceph gateways with the ceph cluster as that’s out of scope.

1. Configure the server with the rados gateway configuration:

ceph-rados-gateway:/etc/ceph/ceph.conf

[global]

fsid = aaaa00a0-a0aa-0a0a-a000-00000a0000a0

mon_initial_members = host1, host2, host3

mon_host = 192.168.0.1,192.168.0.2,192.168.0.3

auth_cluster_required = cephx

auth_service_required = cephx

auth_client_required = cephx

filestore_xattr_use_omap = true

max_open_files = 150000

[client.rgw.cephgw01]

rgw print continue = false

rgw_frontends = "civetweb port=80"

[client.rgw.cephgw02]

rgw print continue = false

rgw_frontends = "civetweb port=80"

2. Start the rados gateway and check it’s responding on port 80:

service ceph-radosgw restart

curl http://localhost:80

This will leave us with two servers that are running civetweb on port 80 which will in turn communicate to the ceph cluster using librados

In order for eucalyptus to communicate using authentication to the rados gateways, we need to create authentication credentials.

3. To do this run the following and take note of the access and secret keys:

radosgw-admin user create --uid=eucalyptus --display-name="Eucalyptus"

radosgw-admin user info --uid=eucalyptus | grep 'access\|secret'

Eucalyptus configuration

If you’re currently running walrus as your walrusbackend, please migrate all data off of it prior. You’ll also need to run euserv-deregister-service <walrusname> and delete /var/lib/eucalptus/bukkit to clear storage space.

Once this is completed, please run ‘euserv-register-service -t objectstorage -h <UFS_IP>’.

You can run ‘euserv-describe-services –filter service-type=objectstorage’ which should show broken.

euctl settings:

euctl objectstorage.providerclient=riakcs

euctl objectstorage.s3provider.s3endpoint=http://cephgw:80

euctl objectstorage.s3provider.s3accesskey=<RADOSGW_ACCESS_KEY>

euctl objectstorage.s3provider.s3secretkey=<RADOSGW_SECRET_KEY>

euctl objectstorage.s3provider.s3usehttps=false

Please check to ensure that your object storage gateways can resolve cephgw to one of the ceph gateways you’ve setup. You can either do this using round robin DNS or use an internal load balancer (look into eulb-create-lb).

Finally if you run ‘euserv-describe-services –filter service-type=objectstorage’ you should see the objectstorage services are showing enabled.

Client configuration

At this point you’ll want to test out the s3 functionality and make sure you can create buckets and files.

As advice before starting this please ensure you can connect to the objectstorage provider host IP on port 8773. If you’ve setup your resolv.conf correctly, the resolver will contact one of the UFS servers to get an answer to the s3 prefix.

e.g. curl http://s3.euca.domain.com:8773

1. Python-Boto library is needed to use python to test out functionality:

yum install -y python-boto

2. Create a file named s3test.py and populate the values to match your environment:

#!/bin/env python

import boto

import boto.s3.connection

access_key = 'AAAAAAAAAAAAAAAAAAAA'

secret_key = 'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB'

conn = boto.connect_s3(

aws_access_key_id = access_key,

aws_secret_access_key = secret_key,

host = ‘s3.euca.domain.com’,

port = 8773,

is_secure=False,

calling_format = boto.s3.connection.OrdinaryCallingFormat()

)

print “* Creating bucket”

bucket = conn.create_bucket(‘my-new-bucket’)

print “* Creating files in bucket”

for x in range(1, 20):

key = bucket.new_key(‘hello_’ + str(x) + ‘.txt’)

print key.name

key.set_contents_from_string(‘Hello World!’)

print “* Displaying buckets and files”

for bucket in conn.get_all_buckets():

print “{name}\t{created}”.format(

name = bucket.name,

created = bucket.creation_date,

)

for file_key in bucket.list():

print “\t” + file_key.name

print file_key.get_contents_as_string()

print “* Deleting all files in bucket”

bucket = conn.get_bucket(‘my-new-bucket’)

for key in bucket.list():

print “Deleting – ” + key.name.encode(‘utf-8’)

bucket.delete_key(key.name)

print “* Deleting bucket”

conn.delete_bucket(‘my-new-bucket’)

This should run as expected, you may also want to install s3cmd https://github.com/s3tools/s3cmd or configure the awscli to use s3 functionality.

Now would be a good time to update your esi image using ‘esi-install-image –install-default –region euca.domain.com’ and install some images using ‘bash <(curl -Ls eucalyptus.com/install-emis)’

If you’ve any questions please get in touch!