Redshift Spectrum Cost of Data Scanned

With Redshift Spectrum, you are billed at $5 per terabyte of data scanned, rounded up to the next megabyte, with a 10 megabyte minimum per query. For example, if you scan 10 gigabytes of data, you will be charged $0.05. If you scan 1 terabyte of data, you will be charged $5.

To find how much data is being transferred by Redshift spectrum queries you will have to look at system table ‘SVL_S3QUERY_SUMMARY’ and a specific field called ‘s3_scanned_bytes’ (number of bytes scanned from Amazon S3). The cost of a Redshift Spectrum query is reflected in the amount of data scanned from Amazon S3.

s3_scanned_bytes – The number of bytes scanned from Amazon S3 and sent to the Redshift Spectrum layer, based on compressed data.

For eg: You can run the below query to determine number of bytes transferred by 1 particular spectrum query

— select s3_scanned_bytes from svl_s3query_summary where query= ;

To determine sum of bytes of all queries from Redshift spectrum

— select sum(s3_scanned_bytes) from svl_s3query_summary ;

To determine sum of bytes of all queries in last one day from Redshift spectrum

— select sum(s3_scanned_bytes) from svl_s3query_summary where starttime >= – interval ’24 hours’;

sum

621900000000
(1 row)

Let’s say we have the figure like the above for one day of spectrum use, then using the Redshift spectrum pricing of $5 per terabyte of data scanned, rounded up to the next megabyte, with a 10 megabyte minimum per query. So we can calculate the cost for one day like below,

621900000000 bytes = 621900000000/1024 = 607324218.75 kilobytes
607324218.75 kilobytes = 607324218.75/1024 = 593090.057373046875 megabytes
593090.057373046875 megabytes = 593090.057373046875 /1024 = 579.189509153366089 gigabytes
579.189509153366089 gigabytes = 579.189509153366089/1024 = 0.565614755032584 terabytes

In this case you will be charged for 0.5657 terabytes.(since it is rounded to the next megabyte) $5*0.5657= 2.83$

So 2.83$ is what you will pay for scanning 0.5657 terabytes of data daily from S3 via Spectrum.

Though there is no SQL query to calculate cost, I have created one to easily summarize the approximate cost of data scanned by loading the bytes value into a cost table and running below SQL:

— create table test_cost (s3_scanned_bytes float8 ) ;
— insert into test_cost values (621900000000);
— select sum(s3_scanned_bytes/1024/1024/1024/1024) s3_scanned_tb, 5*sum(s3_scanned_bytes/1024/1024/1024/1024) cost_in_usd from test_cost ;

s3_scanned_tb | cost_in_usd
——————-+————————————————-
0.565614755032584 | 2.82807377516292
(1 row)

-+ Final Approximate Scan in Terabytes and Cost in USD +-

— select round(sum(s3_scanned_bytes/1024/1024/1024/1024),4) s3_scanned_tb, round(5*sum(s3_scanned_bytes/1024/1024/1024/1024),2) cost_in_usd from test_cost ;

s3_scanned_tb | cost_in_usd
——————-+————————————————-
0.5656 | 2.83
(1 row)

This is the easiest way to find the data transferred through Spectrum and the cost associated with it.

P.S: The system table is rotated so it is better calculate per day and store it in another table, if you need to maintain record of the bytes transferred via Spectrum.

References:

[1] Amazon Redshift pricing – https://aws.amazon.com/redshift/pricing/#Redshift_Spectrum_Pricing
[2] Monitoring Metrics in Amazon Redshift Spectrum – https://docs.aws.amazon.com/redshift/latest/dg/c-spectrum-metrics.html
[3] SVL_S3QUERY_SUMMARY – https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_S3QUERY_SUMMARY.html

Create-Modify-Destroy Redshift Cluster Using Terraform

Devops tool have become quite popular in the last few years. Infrastructure automation tools like Chef, Ansible, Cloudformation and Terraform are increasingly being used to provision cloud infrastructure. Once only used for provisioning compute resources but nowadays due to the agile data analytics organizational need even resources like Data warehouses are being added to the devops cycle. Most of these tools eg: Saltstack, Ansible, Chef, Puppet etc are widely used in the industry one of them stands out among the rest : Terraform.

What makes Terraform different from others including our very own Cloudformation is it’s declarative nature, most of infrastructure automation tools are procedural in nature not declarative. Let me explain the difference between declarative and procedural

 Lets say you want to provision 10 ec2 instances using an automation approach. With a tool like Ansible your template would like something below using a procedural declaration.

 – ec2:
 count: 10
 image: ami-v1
 instance_type: t2.micro

Same code in Terraform using a declarative approach looks like

 resource “aws_instance” “example” {
 count = 10
 ami = “ami-v1”
 instance_type = “t2.micro”
}

The difference is that even though both approaches look similar, lets say you want to add additional 5 servers to the configuration. The ansible code is essentially useless since ansible does not maintain state. For ansible if you change the count and increase it to 15, it will create 15 new additional EC2 instances. Ansible has no way to know what it did in the past. For creating total 15 servers you need to add additional 5.

 – ec2:
 count: 5
 image: ami-v1
 instance_type: t2.micro

With Terraform this is the big game changer. Terraform maintains state of your infrastructure. Terraform is aware of any state it created in the past. Therefore, to deploy additional 5 more servers, all you have to do is go back to the same Terraform template and update the count from 10 to 15:

 resource “aws_instance” “example” {
 count = 15
 ami = “ami-v1”
 instance_type = “t2.micro”
}

When you execute this template Terraform knows it created 10 instances before so it will add only the 5 new instances. With declarative approach the end goal matters. This makes Terraform the winner IMHO from all others. So in this example once we are done with the test , to delete the cluster we just have to run one command without specifying any additional details. Becuase Terraform maintains a record that it created a Redshift cluster with so and so name.

 Let’s now jump in and create a Redshift dc1.large cluster in region ‘us-east-1’ using Terraform

1. Download and Install Terraform for Linux from the Terraform Website : https://www.terraform.io/downloads.html

Note : Install awscli and configure your AWS credentials before we begin

On Linux the download is a zip file containing only 1 file. Unzip to any directory and copy the file ‘terraform’ to /usr/bin

 2. Create a Terraform configuration file in a new directory

 mkdir redshift_tf

 cd redshift_tf

 vim redshift.tf

 provider “aws” {
 region = “us-east-1”
}
resource “aws_redshift_cluster” “default” {
 cluster_identifier = “terraform-rs-cluster”
 database_name = “testdb”
 master_username = “awsuser”
 master_password = “SomePassword1”
 node_type = “dc1.large”
 cluster_type = “single-node”
 skip_final_snapshot = true
}

 3. Initiate Terraform

$ terraform init

 Initializing the backend…

Initializing provider plugins…
– Checking for available provider plugins…
– Downloading plugin for provider “aws” (terraform-providers/aws) 2.14.0…

The following providers do not have any version constraints in configuration,
so the latest version was installed.

To prevent automatic upgrades to new major versions that may contain breaking
changes, it is recommended to add version = “…” constraints to the
corresponding provider blocks in configuration, with the constraint strings
suggested below.

* provider.aws: version = “~> 2.14”

Terraform has been successfully initialized!

 4. Apply Terraform Configuration

Note 1: From Terraform 0.11 and above you do not have to run ‘terraform plan’ command

Note
 2 : For security purpose it is not good practice to store access_key or
 secret_key in the .tf file. If you have installed awscli then Terraform
 will take your AWS credentials from ‘~/.aws/credentials’ or IAM
credentials.

$ terraform apply

 An execution plan has been generated and is shown below.
Resource actions are indicated with the following symbols:
 + create

Terraform will perform the following actions:

 # aws_redshift_cluster.default will be created
 + resource “aws_redshift_cluster” “default” {
 + allow_version_upgrade = true
 + automated_snapshot_retention_period = 1
 + availability_zone = (known after apply)
 + bucket_name = (known after apply)
 + cluster_identifier = “terraform-rs-cluster”
 + cluster_parameter_group_name = (known after apply)
 + cluster_public_key = (known after apply)
 + cluster_revision_number = (known after apply)
 + cluster_security_groups = (known after apply)
 + cluster_subnet_group_name = (known after apply)
 + cluster_type = “single-node”
 + cluster_version = “1.0”
 + database_name = “testdb”
 + dns_name = (known after apply)
 + enable_logging = (known after apply)
 + encrypted = false
 + endpoint = (known after apply)
 + enhanced_vpc_routing = (known after apply)
 + iam_roles = (known after apply)
 + id = (known after apply)
 + kms_key_id = (known after apply)
 + master_password = (sensitive value)
 + master_username = “awsuser”
 + node_type = “dc1.large”
 + number_of_nodes = 1
 + port = 5439
 + preferred_maintenance_window = (known after apply)
 + publicly_accessible = true
 + s3_key_prefix = (known after apply)
 + skip_final_snapshot = false
 + vpc_security_group_ids = (known after apply)
 }

Plan: 1 to add, 0 to change, 0 to destroy.

 aws_redshift_cluster.default: Creation complete after 3m33s [id=terraform-rs-cluster]

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

 5. Check the state of your infrastructure

You can go
check in your AWS console > Redshift Dashboard and you will see the cluster. To see it from terraform run the below command

$ terraform show

 6. Destroy the Redshift cluster
Like i mentioned in the beginning of this article, the beauty of Terraform is it maintains
state of your infrastructure. You can remove the Redshift cluster by running
 just one simple command

$ terraform destroy

Check Redshift Table and Send SMS Programmatically using Amazon SNS

Requirement : Check if an UPDATE was run on a Redshift Table and Send SMS Programmatically using Amazon SNS. This script can be used in a variety of different scenarios, for eg: you can use the same logic to check for load errors on you cluster or check for INSERTS or DELETE commands.

Check if an UPDATE has occurred on a Table and send an SMS everytime the table is updated

Pre-requisites :

1. BOTO3 Python SDK installed for Python3.7

2. AWS CLI installed

3. Pscyopg2 package installed for Python3.7

4. Redshift Cluster

5. Amazon SNS configured to send SMS

6. Basic understanding of Python scripting, BOTO3 and Redshift.

Environment:

EC2 Instance Running CENTOS

Python3.7 Installed

AWS CLI installed and Configured Account credentials

Solution:

We will create a Python script to check svl_statementtext for update statements on a table ‘TEST’. The script can be configured to run in crontab every minute and if it an UPDATE occurs it dispatches an SMS using SNS.

1. Create the TEST table in your Redshift cluster and Insert some data into it

testdb=# create table test (id int8, name varchar(20));
CREATE TABLE
testdb=# insert into test values(1,’John’);
INSERT 0 1
testdb=# insert into test values(2,’Matt’);
INSERT 0 1
testdb=# insert into test values(3,’Chris’);

2. Run an UPDATE statement on the table and check svl_statementtext

testdb=# select * from test;
 id | name
—-+——-
  1 | John
  2 | Matt
  3 | Chris
(3 rows)

testdb=# update test set name=’Tim’ where id=1;
UPDATE 1


testdb=# select * from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);
 userid |  xid   |  pid  |             label              |         starttime
       |          endtime           | sequence | type  |
                                                                           text
    100 | 506858 | 20017 | default                        | 2019-05-25 14:44:41.
657139 | 2019-05-25 14:44:49.955101 |        0 | QUERY | update test set name=’T
im’ where id=1;


(1 row)

As you can see the table logs the update command and displays it. Now we will run another update command and check the same table but using count(*)

testdb=# update test set name=’John’ where id=1;
UPDATE 1


testdb=# select count(*) from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);
 count
——-
     1
(1 row)

So it correctly display that 1 row was updated in the last minute on table ‘TEST’. Using this logic we can poll the table every minute to see if a transaction hit the table svl_statementtext. And if it did we will send an SMS via SNS

3. Python Script (#Attached svl_statementtext_1min.py) to Check for UPDATE statements in last one minute and if COUNT is not ‘0’ then send an SMS

import boto3
import psycopg2

#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= ‘testdb’, host=’redshift-dc2-test.ctzrqaulg0u6.us-east-1.redshift.amazonaws.com’,
port= ‘5439’, user= ‘awsuser’, password= ‘********’)


#Opening a cursor and run sql query
cur = con.cursor()
cur.execute(“select count(*) from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);”)
data = str(cur.fetchone())
print(data)
con.commit()

#Close the cursor and the connection
cur.close()
con.close()

# Compare data variable for threshold
if  data == ‘(0,)’:
    print(“NO UPDATES IN LAST 1 MINUTE ON TABLE TEST”)

else:
    print(“UPDATES IN LAST 1 MINUTE ON TABLE TEST”)
    # Create an SNS client
    client = boto3.client(
    “sns”,”us-east-1″
    )
    # Create the topic if it doesn’t exist
    topic = client.create_topic(Name=”invites-for-push-notifications”)
    topic_arn = topic[‘TopicArn’]  # get its Amazon Resource Name
    # Get List of Contacts
    list_of_contacts = [“+6144*********”] # <– You can add a list of mutiple mobile numbers here
    # Add SMS Subscribers
    for number in list_of_contacts:
        client.subscribe(
        TopicArn=topic_arn,
        Protocol=’sms’,
        Endpoint=number  # <– numbers who’ll receive an SMS message.
        )
    # Publish a message.
    client.publish(Message=”Hello World!”, TopicArn=topic_arn, MessageAttributes={
    ‘AWS.SNS.SMS.SenderID’: {
    ‘DataType’: ‘String’,
    ‘StringValue’: ‘EASYORADBA’ # <– Name of Sender, Not Available in USA
    },’AWS.SNS.SMS.SMSType’: {‘DataType’: ‘String’, ‘StringValue’: ‘Transactional’}})

Open 2 sessions and from one session run an UPDATE command on table ‘TEST’ and from another session execute the Python script. If you configured every properly, you will get an SMS from Sender ‘EASYORADBA’ wuth message text “Hello World”

4. Save Script & Schedule to run every minute in Crontab

* * * * * /usr/local/bin/python3.7 svl_statementtext_1min.py

This script can be used in a variety of different scenarios to dispatch SMS based on some count logic. Another scenario is to schedule this script to check load errors on your Redshift cluster. Example run this script in your Data loading window and check for errors in STL_LOAD_ERRORS table. If there was a data loading issue then the Data Engineering team can be notified via SMS. I am attaching the script (stl_load_errors.py)  to check for data loading errors. You can change the granuliarity of time in which it should check for load errors by simply changing the time intervsl in date_trunc function.

SQL : select count(*) from stl_load_errors where starttime > date_trunc(‘minute’, sysdate);

Redshift IAM role for Copy Unload to S3

Creating IAM Policies and Roles & Associating the Role to the Redshift Cluster

———————————————————————————————————

In order to perform operations such as “COPY” and “UNLOAD” to/from a Redshift cluster, the user must provide security credentials that authorize the Amazon Redshift cluster to read data from or write data to your target destination, in this case an Amazon S3 bucket.

Step 1: Creating the policy to allow access on S3

  • On the Services menu, chose IAM  (Under security, Identity & Compliance)
  • On the left side of the IAM Console, go to “Policies” 
  • Select “Create Policy” on the top of the page
  • Select  JSON tab, and paste below in JSON. Replace ‘redshift-testing-bucket-shadmha’ with your bucket name which you are using for unload and copy

 {

   “Version”:”2012-10-17″,

   “Statement”:[

      {

         “Effect”:”Allow”,

         “Action”:[

            “s3:PutObject”,

            “s3:DeleteObject”

         ],

         “Resource”:[

            “arn:aws:s3:::redshift-testing-bucket-shadmha*”

         ]

      },

      {

         “Effect”:”Allow”,

         “Action”:[

            “s3:ListBucket”

         ],

         “Resource”:[

            “arn:aws:s3:::redshift-testing-bucket-shadmha*”

         ]

      }

   ]

  • Click on “Review Policy” and provide “Name” and “Description” for the policy
  • Click “Create Policy” and keep this name handy we will need the name of this policy to add to the IAM role in next step

Step 2: Creating the IAM Role such that the Redshift Service can request it

  • On the left menu of your IAM Console, select “Roles”
  • Select “Create Role” on the top of the page
  • Select type of trusted entity as “AWS Service” > Select the service which will be used for this role as “Amazon Redshift”
  • Select your use case as “Redshift – Customizable Allows Redshift clusters to call AWS services on your behalf.” and click “Permissions”
  • Search the policy that was previously created, select it and click on “Next”
  • Specify a “Role name”
  • Select “Create Role”

Step 3: Associating the created Role to a Redshift Cluster

  • On your AWS Console, on the Services menu, choose “Redshift”
  • On the AWS Redshift console, select the cluster in question and click on “Manage IAM roles”
  • On the pop-up screen, click on the drop box “Available roles” and select the Role created in the previous step
  • Select “Apply changes”

Considerations

——————–

As soon as the “Status” for the IAM role on the “Manage IAM roles” shows as “in-sync”, you can try “COPY” or “UNLOAD” using as CREDENTIALS the created role ARN.

Example:

Note: Modify the details such as schema and table_name, the bucket_name, and “<arn>” to the role ARN (example: “arn:aws:iam::586945000000:role/role_name”), to suit your case scenario.

Below is the example from my test cluster, Role name ‘REDSHIFTNEWROLE’ is one created in Step 2 and S3 bucket ‘redshift-testing-bucket-shadmha’ is the one we assigned policy to in Step 1.

eg:

unload (‘select * from test_char’)

to ‘s3://redshift-testing-bucket-shadmha/test_char.csv’

credentials ‘aws_iam_role=arn:aws:iam::775867435088:role/REDSHIFTNEWROLE’

delimiter ‘|’ region ‘ap-southeast-2’

parallel off:

Most common error associated when trying to copy or unload data from Redshift:

ERROR: S3ServiceException:Access Denied,Status 403,Error AccessDenied

Python Script to Create a Data Pipeline Loading Data From RDS Aurora MySQL To Redshift

In this tutorial we will create a Python script which will build a data pipeline to load data from Aurora MySQL RDS to an S3 bucket and copy that data to a Redshift cluster.

One of the assumptions is you have basic understanding of AWS, RDS, MySQL, S3, Python and Redshift. Even if you don’t it’s alright I will explain briefly about each of them to the non-cloud DBA’s

AWS- Amazon Web Services. It is the cloud infrastructure platform from Amazon which can be used to build and host anything from a static website to a globally scalable service like Netflix

RDS – Relational Database Service or RDS or short is Amazons managed relational database service for databases like it’s own Aurora, MySQL, Postgres, Oracle and SQL Server

S3- Simple Storage Service is AWS’s distributed storage which can scale almost infinitely. Data in S3 is stored in Buckets. Think of buckets as Directories but DNS name compliant and cloud hosted

Python – A programming language which is now the defacto standard for data science and engineering

Redshift- AWS’s Petabyte scale Data warehouse which is binary compatible to PostgreSQL but uses a columnar storage engine

The source in this tutorial is a RDS Aurora MySQL database and target is a Redshift cluster. The data is staged in an S3 bucket. With Aurora MySQL you can unload data directly to a S3 bucket but in my script I will offload the table to a local filesystem and then copy it to the S3 bucket. This will give you flexibility in-case you are not using Aurora but a standard MySQL or Maria DB

Environment:

  1. Python 3.7.2 with pip
  2. Ec2 instance with the Python 3.7 installed along with all the Python packages
  3. Source DB- RDS Aurora MySQL 5.6 compatible
  4. Destination DB – Redshift Cluster
  5. Database : Dev , Table : employee in both databases which will be used for the data transfer
  6. S3 bucket for staging the data
  7. AWS Python SDK Boto3

Make sure both the RDS Aurora MySQL and Redshift cluster has security groups which have have IP of the Ec2 instance for inbound connections (Host and Port)

  1. Create the table ’employee’ in both the Aurora and Redshift Clusters

Aurora MySQL 5.6

CREATE TABLE `employee` (
  `id` int(11) NOT NULL,
  `first_name` varchar(45) DEFAULT NULL,
  `last_name` varchar(45) DEFAULT NULL,
  `phone_number` varchar(45) DEFAULT NULL,
  `address` varchar(200) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Redshift

DROP TABLE IF EXISTS employee CASCADE;

CREATE TABLE employee
(
   id            bigint         NOT NULL,
   first_name    varchar(45),
   last_name     varchar(45),
   phone_number  bigint,
   address       varchar(200)
);

ALTER TABLE employee
   ADD CONSTRAINT employee_pkey
   PRIMARY KEY (id);

COMMIT;

2. Install Python 3.7.2 and install all the packages needed by the script

sudo /usr/local/bin/python3.7 -m pip install boto3
sudo /usr/local/bin/python3.7 -m pip install psycopg2-binary
sudo /usr/local/bin/python3.7 -m pip install pymysql
sudo /usr/local/bin/python3.7 -m pip install json
sudo /usr/local/bin/python3.7 -m pip install pymongo

3. Insert sample data into the source RDS Aurora DB

$ mysql -u awsuser -h shadmha-cls-aurora.ap-southeast-2.rds.amazonaws.com -p dev

INSERT INTO `employee` VALUES (1,'shadab','mohammad','04447910733','Randwick'),(2,'kris','joy','07761288888','Liverpool'),(3,'trish','harris','07766166166','Freshwater'),(4,'john','doe','08282828282','Newtown'),(5,'mary','jane','02535533737','St. Leonards'),(6,'sam','rockwell','06625255252','Manchester');

SELECT * FROM employee;

4. Download and Configure AWS command line interface

The AWS Python SDK boto3 requires AWS CLI for the credentials to connect to your AWS account. Also for uploading the file to S3 we need boto3 functions. Install AWS CLI on Linux and configure it.

$ aws configure
AWS Access Key ID [****************YGDA]:
AWS Secret Access Key [****************hgma]:
Default region name [ap-southeast-2]:
Default output format [json]:

5. Python Script to execute the Data Pipeline (datapipeline.py)

import boto3
import psycopg2
import pymysql
import csv
import time
import sys
import os
import datetime
from datetime import date
datetime_object = datetime.datetime.now()
print ("###### Data Pipeline from Aurora MySQL to S3 to Redshift ######")
print ("")
print ("Start TimeStamp")
print ("---------------")
print(datetime_object)
print ("")


# Connect to MySQL Aurora and Download Table as CSV File
db_opts = {
    'user': 'awsuser',
    'password': '******',
    'host': 'shadmha-cls-aurora.ap-southeast-2.rds.amazonaws.com',
    'database': 'dev'
}

db = pymysql.connect(**db_opts)
cur = db.cursor()

sql = 'SELECT * from employee'
csv_file_path = '/home/centos/my_csv_file.csv'

try:
    cur.execute(sql)
    rows = cur.fetchall()
finally:
    db.close()

# Continue only if there are rows returned.
if rows:
    # New empty list called 'result'. This will be written to a file.
    result = list()

    # The row name is the first entry for each entity in the description tuple.
    column_names = list()
    for i in cur.description:
        column_names.append(i[0])

    result.append(column_names)
    for row in rows:
        result.append(row)

    # Write result to file.
    with open(csv_file_path, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        for row in result:
            csvwriter.writerow(row)
else:
    sys.exit("No rows found for query: {}".format(sql))


# Upload Generated CSV File to S3 Bucket
s3 = boto3.resource('s3')
bucket = s3.Bucket('mybucket-shadmha')
s3.Object('mybucket-shadmha', 'my_csv_file.csv').put(Body=open('/home/centos/my_csv_file.csv', 'rb'))


#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= 'dev', host='redshift-cluster-1.ap-southeast-2.redshift.amazonaws.com',
port= '5439', user= 'awsuser', password= '*********')

#Copy Command as Variable
copy_command="copy employee from 's3://mybucket-shadmha/my_csv_file.csv' credentials 'aws_iam_role=arn:aws:iam::775888:role/REDSHIFT' delimiter '|' region 'ap-southeast-2' ignoreheader 1 removequotes ;"

#Opening a cursor and run copy query
cur = con.cursor()
cur.execute("truncate table employee;")
cur.execute(copy_command)
con.commit()

#Close the cursor and the connection
cur.close()
con.close()

# Remove the S3 bucket file and also the local file
DelLocalFile = 'aws s3 rm s3://mybucket-shadmha/my_csv_file.csv --quiet'
DelS3File = 'rm /home/centos/my_csv_file.csv'
os.system(DelLocalFile)
os.system(DelS3File)


datetime_object_2 = datetime.datetime.now()
print ("End TimeStamp")
print ("-------------")
print (datetime_object_2)
print ("")

6. Run the Script or Schedule in Crontab as a Job

$ python3.7 datapipeline.py

Crontab to execute Job daily at 10:30 am

30 10 * * * /usr/local/bin/python3.7 /home/centos/datapipeline.py &>> /tmp/datapipeline.log

7. Check the table in destination Redshift Cluster and all the records should be visible their

SELECT * FROM employee;


This tutorial was done using a small table and very minimum data. But with S3’s distributed nature and massive scale and Redshift as a Data warehouse you can build data pipelines for very large datasets. Redhsift being an OLAP database and Aurora OLTP, many real-life scenarios requires offloading data from your OLTP apps to data warehouses or data marts to perform Analytics on it.

AWS also has an excellent managed solution called Data Pipelines which can automate the movement and transform of Data. But many a times for developing customized solutions Python is the best tool for the job.

Enjoy this script and please let me know in your comments or on Twitter (@easyoradba) if you have any issues or what else would you like me to post for data engineering.

Python Script to Copy-Unload Data to Redshift from S3

import psycopg2
import time
import sys
import datetime
from datetime import date
datetime_object = datetime.datetime.now()
print ("Start TimeStamp")
print ("---------------")
print(datetime_object)
print("")

#Progress Bar Function
def progressbar(it, prefix="", size=60, file=sys.stdout):
    count = len(it)
    def show(j):
        x = int(size*j/count)
        file.write("%s[%s%s] %i/%i\r" % (prefix, "#"*x, "."*(size-x), j, count))
        file.flush()
    show(0)
    for i, item in enumerate(it):
        yield item
        show(i+1)
    file.write("\n")
    file.flush()

#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= 'dev', host='redshift.amazonaws.com',
port= '5439', user= 'awsuser', password= '*****')

#Copy Command as Variable
copy_command="copy users from 's3://redshift-test-bucket/allusers_pipe.txt' credentials 'aws_iam_role=arn:aws:iam::775088:role/REDSHIFTROLE' delimiter '|' region 'ap-southeast-2';"

#Unload Command as Variable
unload_command="unload ('select * from users') to 's3://redshift-test-bucket/users_"+str(datetime.datetime.now())+".csv' credentials 'aws_iam_role=arn:aws:iam::7755088:role/REDSHIFTROLE' delimiter '|' region 'ap-southeast-2';"

#Opening a cursor and run copy query
cur = con.cursor()
cur.execute("truncate table users;")
cur.execute(copy_command)
con.commit()

#Display Progress Bar and Put a sleep condition in seconds to make the program wait
for i in progressbar(range(100), "Copying Data into Redshift: ", 10):
    time.sleep(0.1) # any calculation you need

print("")

#Display Progress Bar and Put a sleep condition in seconds to make the program wait
for i in progressbar(range(600), "Unloading Data from Redshift to S3: ", 60):
    time.sleep(0.1) # any calculation you need

print("")

#Opening a cursor and run unload query
cur.execute(unload_command)

#Close the cursor and the connection
cur.close()
con.close()

datetime_object_2 = datetime.datetime.now()
print ("End TimeStamp")
print ("-------------")
print(datetime_object_2)
print("")

Simple Script to Connect & Query AWS Redshift from Python 3.7

Pre-requisite

You need psycopg2 adapter for Python which is compatible with any PostgreSQL database

To install psycopg2

$ sudo python3.7 -m pip install psycopg2

Sometimes it gives an error when you install from the source, in that case you can install from the binary

$ sudo python3.7 -m pip install psycopg2-binary

import psycopg2

#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= 'dev', host='your-redshift-cluster.redshift.amazonaws.com',
port= '5439', user= 'YourUser', password= 'YourPassword')

#Opening a cursor and running a query
cur = con.cursor()
cur.execute("SELECT SYSDATE;")
#Printing the output
print(cur.fetchall())
cur.close()
con.close()

Output:

[(datetime.datetime(2019, 4, 18, 23, 57, 0, 289196),)]

Generate Fake Data using Python

Being a data engineer, one of the tasks which you have to do almost on a daily basis is load huge amounts of data into your data warehouse or data lakes. Sometimes to do benchmark load times or emulate performance tuning issues in your test environment, you need to use test datasets. While their is a lot of very good huge open datasets available on Kaggle and AWS

But instead of having actual data all you need is a CSV file with dummy data in it. Fear not, up comes Python to the resuce. Python is the golden goose in the age of information not only can it help you sort through massive amounts of data it can also help you generate data.

Faker is a Python package which can generate fake data for you. First you need to pip install faker. For this excercise we are using Python 3.7.2

$ python -m pip install faker

— Script to Generate a CSV file with Fake Data and 1 Billion Rows —

Caution : The file size will be about 1.3GB and it can really hammer your machine. I have an Ec2 instance on which i generate this test data and let it leave running in the background. You can use multiprocessor in Python and hammer all cores but that is a discussion worthy of it’s own blog post.

import csv
 import random
 from time import time
 from decimal import Decimal
 from faker import Faker
RECORD_COUNT = 1000000000
 fake = Faker()
    writer.writeheader()
    for i in range(RECORD_COUNT):
        writer.writerow(
            {
                'userid': fake.ean8(),
                'username': fake.user_name(),
                'firstname': fake.first_name(),
                'lastname': fake.last_name(),
                'city': fake.city(),
                'state': fake.state_abbr(),
                'email': fake.email(),
                'phone': fake.phone_number(),
                'cardno': fake.credit_card_number(card_type=None),
                'likesports': fake.null_boolean(),
                'liketheatre': fake.null_boolean(),
                'likeconcerts': fake.null_boolean(),
                'likejazz': fake.null_boolean(),
                'likeclassical': fake.null_boolean(),
                'likeopera': fake.null_boolean(),
                'likerock': fake.null_boolean(),
                'likevegas': fake.null_boolean(),
                'likebroadway': fake.null_boolean(),
                'likemusicals': fake.null_boolean(),
            }
        )
if name == 'main':
     create_csv_file()

This will create a file users1.csv with a billion rows and generated fake data which is almost like real data

Attached Script :

AWS Reshift Insert into Table without S3

To generate random data into a table without using S3 for doing some quick tests

drop table if exists seed;

create table seed ( n int8 );

insert into seed (
SELECT
p0.n
+ p1.n*2
+ p2.n * POWER(2,2)
+ p3.n * POWER(2,3)
+ p4.n * POWER(2,4)
+ p5.n * POWER(2,5)
+ p6.n * POWER(2,6)
+ p7.n * POWER(2,7)
as number
FROM
(SELECT 0 as n UNION SELECT 1) p0,
(SELECT 0 as n UNION SELECT 1) p1,
(SELECT 0 as n UNION SELECT 1) p2,
(SELECT 0 as n UNION SELECT 1) p3,
(SELECT 0 as n UNION SELECT 1) p4,
(SELECT 0 as n UNION SELECT 1) p5,
(SELECT 0 as n UNION SELECT 1) p6,
(SELECT 0 as n UNION SELECT 1) p7
Order by 1
);

commit;

drop table if exists test_table;

create table test_table(
ingest_time timestamp encode zstd,
doi date encode zstd,
id int encode bytedict,
value float encode zstd,
data_sig varchar(32) encode zstd
) DISTKEY(id) SORTKEY(ingest_time);

commit;

insert into test_table (
select dateadd(‘msec’, – 10n , getdate() ) as ingest_time, trunc(dateadd(‘msec’, – 10n , getdate() )) as doi,id,
n::float / 1000000 as value, ‘sig-‘ || to_hex(n % 16) as data_sig
FROM (select (a.n + b.n + c.n + d.n) as n, (random() * 1000)::int as id from seed a cross join (select n256 as n from seed) b cross join (select n65536 as n from seed) c
cross join (select n*16777216 as n from ( select distinct (n/16)::int as n from seed ) ) d)
) order by ingest_time;

commit;

analyze test_table;

select count(*) from test_table;

–Consecutive run on of above insert query will 268 million rows for each execution–

You can create a table with about 1 billion rows in 8 minutes on a ds2.xlarge cluster