AWS Lambda Function to Load Data from ‘Requestor Pays’ S3 bucket in One Account to Redshift Cluster in Another Account

Part A : Create Redshift Spectrum Cross-Account Access for S3 


Company Account A: Redshift Cluster Account: 24xxxxxx16
Role: RoleA

Company Account B: S3 Bucket Account: 8xxxxxxxx11
Role: RoleB
Bucket Name (Create with Option “Requestor Pays”): s3://shadmha-us-east-2

Use Case:  1. Read Data from S3 Bucket in different account into Spectrum Table
                   2. Unload Data from Redshift Cluster to S3 bucket in different account

======================================================

Step 1: In Redshift Cluster Account 24xxxxxx16, do this

a)    Go to IAM > Roles > Create Role
b)    Create Role > Redshift > Redshift – Customizable.
c)    No need to add policies or tags, go ahead and save this role as “RoleA”
d)    Add this role to your Redshift cluster. Goto Redshift Console > Select Cluster > Manage IAM > Add “RoleA” to Cluster

Step 2: In account which has the S3 Bucket Account 8xxxxxxxx11, do this: 

a)    Go to IAM > Policies > Create policy
b)    Select the JSON tab and add below IAM policy, replace my bucket name ‘shadmha-us-east-2’ with your bucket name

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {
            “Effect”: “Allow”,
            “Action”: [
                “s3:GetBucketLocation”,
                “s3:GetObject”,
                “s3:PutObject”,
                “s3:ListMultipartUploadParts”,
                “s3:ListBucket”,
                “s3:ListBucketMultipartUploads”
            ],
            “Resource”: [
                “arn:aws:s3:::shadmha-us-east-2”,
                “arn:aws:s3:::shadmha-us-east-2/*”
            ]
        },
        {
            “Effect”: “Allow”,
            “Action”: [
                “glue:CreateDatabase”,
                “glue:DeleteDatabase”,
                “glue:GetDatabase”,
                “glue:GetDatabases”,
                “glue:UpdateDatabase”,
                “glue:CreateTable”,
                “glue:DeleteTable”,
                “glue:BatchDeleteTable”,
                “glue:UpdateTable”,
                “glue:GetTable”,
                “glue:GetTables”,
                “glue:BatchCreatePartition”,
                “glue:CreatePartition”,
                “glue:DeletePartition”,
                “glue:BatchDeletePartition”,
                “glue:UpdatePartition”,
                “glue:GetPartition”,
                “glue:GetPartitions”,
                “glue:BatchGetPartition”
            ],
            “Resource”: “*”
        }
    ]
}

Chose Review Policy & Save the policy as let’s say ‘s3-cross-account-policy’

c)    Go to Roles > Create Role > Select type of trusted entity as ‘Another AWS Account’ tab
d)    Enter Account ID of Redshift Cluster Account ‘24xxxxxx16’ > Permissions > Search policy created in a)  “s3-cross-account-policy’
e)    Go next > create role > save it as “RoleB”
f)    Go to Roles > Select “RoleB” > “Trust Relationships” tab > Edit trust telationships. Add the below policy:

{
  “Version”: “2012-10-17”,
  “Statement”: [
    {
      “Effect”: “Allow”,
      “Principal”: {
        “AWS”: “arn:aws:iam::24xxxxxx16:root”
      },
      “Action”: “sts:AssumeRole”,
      “Condition”: {}
    }
  ]
}

Update the trust policy

Step 3: Go back to Account under which Redshift Cluster is created

a)    Go to IAM > Roles > Select role which you created earlier “RoleA”
b)    Add inline policy to this role and add the below policy and save it

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {
            “Sid”: “Stmt1487639602000”,
            “Effect”: “Allow”,
            “Action”: [
                “sts:AssumeRole”
            ],
            “Resource”: “arn:aws:iam::80xxxxx11:role/RoleB”
        }
    ]
}
c)    Create policy and Save it to role

Part B: Deploy a Lambda Function Using Attached Code(S3-to-Redshift.zip). And Change Your Cluster and Bucket Details Accordingly

Add a Cloud Watch Event Trigger with Cron Expression : cron(0 2 ? * FRI *)

Increase Timeout & Memory of Lambda Function

Configure Test Event

Execute the Lambda Function to Test

Python Code for Lambda Function

#######################################################################################
# Author         :      Shadab Mohammad
# Create Date    :      13-05-2019
# Modified Date  :      26-09-2019
# Name           :      Load Dataset from AWS S3 bucket to your Redshift Cluster
# Dependencies   :      Requires Python 3.6+. Python Libraries required ‘psycopg2’
#######################################################################################
import psycopg2
import csv
import time
import sys
import os
import datetime
from datetime import date
datetime_object = datetime.datetime.now()


print (“###### Load Data From S3 to Redshift ######”)
print (“”)
print (“Start TimeStamp”)
print (“—————“)
print(datetime_object)
print (“”)

def lambda_handler(event, context):
        #Obtaining the connection to RedShift
    con=psycopg2.connect(dbname= ‘testdb’, host=’shadmha-us-east-2.crhzd8dtwytq.us-east-2.redshift.amazonaws.com’, port= ‘5439’, user= ‘awsuser’, password= ‘SomeP@ssword’)

    copy_command_1=”copy connection_log from ‘s3://shadmha-us-east-2/cross-acct-test/connection_events.csv’ delimiter ‘,’ csv iam_role ‘arn:aws:iam::241135536116:role/RoleA,arn:aws:iam::804739925711:role/RoleB’ ignoreheader 1;”

    #Opening a cursor and run truncate query
    cur = con.cursor()
    query= f”’
    DROP TABLE IF EXISTS connection_log CASCADE;

    CREATE TABLE connection_log(
    username varchar(50),
    event varchar(50),
    count int8);
    COMMIT;”’
    cur.execute(query)
    con.commit()

    #Opening a cursor and run copy query
    cur.execute(copy_command_1)
    con.commit()
    #Close the cursor and the connection
    cur.close()
    con.close()

    # Progress Bar Code Ends here

    datetime_object_2 = datetime.datetime.now()
    print (“End TimeStamp”)
    print (“————-“)
    print (datetime_object_2)
    print (“”)

Lambda Function Code : https://github.com/shadabshaukat/serverless/blob/98f42c7867d6eb4d9e602d2b703764ad891fdfed/S3-to-Redshift.zip

Redshift Health-Check SQL Queries

-- Query Performance Review --

$ psql -h redshift-private-2a.c2nh0wlf4z7g.ap-southeast-2.redshift.amazonaws.com -p 5439 -U awsuser -f review_query_pf.sql testdb

$ vim review_query_pf.sql


\o redshiftxxx.txt
\set vpattern 1678
\qecho -- Query Text - stl_explain
select * from stl_querytext where query = :vpattern;
\qecho -- Explain plan - stl_explain
select userid,query,nodeid,parentid,trim(plannode) plannode,trim(info) info from stl_explain where query = :vpattern;
\qecho --Review WLM Queuing for above queries - stl_wlm_query
SELECT TRIM(DATABASE) AS DB,
       w.query,
       SUBSTRING(q.querytxt,1,100) AS querytxt,
       w.queue_start_time,
       w.service_class AS class,
       w.slot_count AS slots,
       w.total_queue_time / 1000000 AS queue_seconds,
       w.total_exec_time / 1000000 exec_seconds,
       (w.total_queue_time + w.total_exec_time) / 1000000 AS total_seconds
FROM stl_wlm_query w
  LEFT JOIN stl_query q
         ON q.query = w.query
        AND q.userid = w.userid
WHERE w.query = :vpattern
--AND w.total_queue_time > 0
ORDER BY w.total_queue_time DESC,
         w.queue_start_time DESC;
\qecho --Get information about commit stats - stl_commit_stats
select startqueue,node, datediff(ms,startqueue,startwork) as queue_time, datediff(ms, startwork, endtime) as commit_time, queuelen
from stl_commit_stats
where xid in (select xid from stl_querytext where query = :vpattern)
order by queuelen desc , queue_time desc;
\qecho --Compile Time
select userid, xid,  pid, query, segment, locus,
datediff(ms, starttime, endtime) as duration, compile
from svl_compile
where query = :vpattern;
--\qecho --Understand other operations within the same PID - svl_statementtext
--select userid,xid,pid,label,starttime,endtime,sequence,type,trim(text) from svl_statementtext where pid in (select pid from stl_querytext where query = :vpattern);
\qecho --Review query work - STL_PLAN_INFO
select * from STL_PLAN_INFO where query = :vpattern;
\qecho --Review query work - svl_query_report
select * from svl_query_report where query = :vpattern order by segment,step,slice;
\qecho --Review query work - svl_query_summary
select * from svl_query_summary where query = :vpattern order by seg,step;
\qecho -- Review alert
select * from stl_alert_event_log where query = :vpattern;
\qecho -- Review STL_ERROR
select userid,process,recordtime,pid,errcode,trim(file),linenum,trim(context),trim(error) from stl_error where recordtime between (select starttime from stl_query where query = :vpattern) and (select endtime from stl_query where query = :vpattern);
\q

Federated Query from Redshift to Aurora PostgreSQL

Create Public Accessible Redshift Cluster and Aurora PostgreSQL/ RDS PostgreSQL cluster. The RDS PostgreSQL or Aurora PostgreSQL must be in the same VPC as your Amazon Redshift cluster. If the instance is publicly accessible, configure its security group’s inbound rule to: Type: PostgreSQL, Protocol: TCP, Port Range: 5432, Source: 0.0.0.0/0. Otherwise, if the instance is not publicly accessible, you don’t need to configure an inbound rule.

  1. Go to AWS Console > Secrets Manager > Create Secret Managers for RDS Database and Select your PostgreSQL database
  2. Create IAM policy with ARN of above Secrets manager
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AccessSecret",
"Effect": "Allow",
"Action": [
"secretsmanager:GetResourcePolicy",
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret",
"secretsmanager:ListSecretVersionIds"
],
"Resource": "arn:aws:secretsmanager:us-east-1:111111111111:secret:federated-query-L9EBau"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"secretsmanager:GetRandomPassword",
"secretsmanager:ListSecrets"
],
"Resource": "*"
}
]
}
  1. Create IAM Redshift customizable role and attach the Above Policy to it.
  2. Attach the Role to your Redshift Cluster
  3. Create External Schema in Redshift
CREATE EXTERNAL SCHEMA IF NOT EXISTS myRedshiftSchema
FROM POSTGRES
DATABASE 'testdb' SCHEMA 'aurora_schema'
URI 'federated-cluster-instance-1.c2txxxxupg1.us-east-1.rds.amazonaws.com' PORT 5432
OPTIONS 'application_name=psql'
IAM_ROLE 'arn:aws:iam::1111111111111:role/federated-query-role'
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:11111111111111:secret:federated-query-L9EBau';
  1. In RDS Aurora PostgreSQL create the schema which will hold the objects for federated query access
testdb=> create schema aurora_schema;
CREATE SCHEMA
testdb=> create table aurora_schema.federatedtable (id int8, name varchar(50), log_txn_date timestamp);
CREATE TABLE
testdb=> insert into aurora_schema.federatedtable values(1,'shadab','2019-12-26 00:00:00');
INSERT 0 1
testdb=> select * from aurora_schema.federatedtable;
id | name | log_txn_date
----+--------+---------------------
1 | shadab | 2019-12-26 00:00:00
(1 row)
  1. Login to your Redshift cluster and Run the Federated Query from Redshift to PostgreSQL —
testdb=# select * from myredshiftschema.federatedtable;
id | name | log_txn_date
----+--------+---------------------
1 | shadab | 2019-12-26 00:00:00
(1 row)
-- Trying out CTAS from Redshift to PostgreSQL --
testdb=# create table test as select * from myredshiftschema.federatedtable;
SELECT
testdb=# select * from test;
id | name | log_txn_date
----+--------+---------------------
1 | shadab | 2019-12-26 00:00:00
testdb=# select pg_last_query_id();
         1251

(1 row)

select * from svl_s3query_summary where query='1251';
-[ RECORD 1 ]-----------+---------------------------
userid | 100
query | 1251
xid | 7338
pid | 11655
segment | 0
step | 0
starttime | 2019-12-27 06:17:30.800652
endtime | 2019-12-27 06:17:30.947853
elapsed | 147201
aborted | 0
external_table_name | PG Subquery
file_format | Text
.
.
.

References:

[1]
Federated Query in Amazon Redshift (Preview) – https://docs.aws.amazon.com/redshift/latest/dg/federated-overview.html
[2]
CREATE EXTERNAL SCHEMA – https://docs.aws.amazon.com/redshift/latest/dg/federated-external-schema.html
[3]
Create a Secret and an IAM Role for Federated Query – https://docs.aws.amazon.com/redshift/latest/dg/federated-create-secret-iam-role.html

Check Redshift Table and Send SMS Programmatically using Amazon SNS

Requirement : Check if an UPDATE was run on a Redshift Table and Send SMS Programmatically using Amazon SNS. This script can be used in a variety of different scenarios, for eg: you can use the same logic to check for load errors on you cluster or check for INSERTS or DELETE commands.

Check if an UPDATE has occurred on a Table and send an SMS everytime the table is updated

Pre-requisites :

1. BOTO3 Python SDK installed for Python3.7

2. AWS CLI installed

3. Pscyopg2 package installed for Python3.7

4. Redshift Cluster

5. Amazon SNS configured to send SMS

6. Basic understanding of Python scripting, BOTO3 and Redshift.

Environment:

EC2 Instance Running CENTOS

Python3.7 Installed

AWS CLI installed and Configured Account credentials

Solution:

We will create a Python script to check svl_statementtext for update statements on a table ‘TEST’. The script can be configured to run in crontab every minute and if it an UPDATE occurs it dispatches an SMS using SNS.

1. Create the TEST table in your Redshift cluster and Insert some data into it

testdb=# create table test (id int8, name varchar(20));
CREATE TABLE
testdb=# insert into test values(1,’John’);
INSERT 0 1
testdb=# insert into test values(2,’Matt’);
INSERT 0 1
testdb=# insert into test values(3,’Chris’);

2. Run an UPDATE statement on the table and check svl_statementtext

testdb=# select * from test;
 id | name
—-+——-
  1 | John
  2 | Matt
  3 | Chris
(3 rows)

testdb=# update test set name=’Tim’ where id=1;
UPDATE 1


testdb=# select * from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);
 userid |  xid   |  pid  |             label              |         starttime
       |          endtime           | sequence | type  |
                                                                           text
    100 | 506858 | 20017 | default                        | 2019-05-25 14:44:41.
657139 | 2019-05-25 14:44:49.955101 |        0 | QUERY | update test set name=’T
im’ where id=1;


(1 row)

As you can see the table logs the update command and displays it. Now we will run another update command and check the same table but using count(*)

testdb=# update test set name=’John’ where id=1;
UPDATE 1


testdb=# select count(*) from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);
 count
——-
     1
(1 row)

So it correctly display that 1 row was updated in the last minute on table ‘TEST’. Using this logic we can poll the table every minute to see if a transaction hit the table svl_statementtext. And if it did we will send an SMS via SNS

3. Python Script (#Attached svl_statementtext_1min.py) to Check for UPDATE statements in last one minute and if COUNT is not ‘0’ then send an SMS

import boto3
import psycopg2

#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= ‘testdb’, host=’redshift-dc2-test.ctzrqaulg0u6.us-east-1.redshift.amazonaws.com’,
port= ‘5439’, user= ‘awsuser’, password= ‘********’)


#Opening a cursor and run sql query
cur = con.cursor()
cur.execute(“select count(*) from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);”)
data = str(cur.fetchone())
print(data)
con.commit()

#Close the cursor and the connection
cur.close()
con.close()

# Compare data variable for threshold
if  data == ‘(0,)’:
    print(“NO UPDATES IN LAST 1 MINUTE ON TABLE TEST”)

else:
    print(“UPDATES IN LAST 1 MINUTE ON TABLE TEST”)
    # Create an SNS client
    client = boto3.client(
    “sns”,”us-east-1″
    )
    # Create the topic if it doesn’t exist
    topic = client.create_topic(Name=”invites-for-push-notifications”)
    topic_arn = topic[‘TopicArn’]  # get its Amazon Resource Name
    # Get List of Contacts
    list_of_contacts = [“+6144*********”] # <– You can add a list of mutiple mobile numbers here
    # Add SMS Subscribers
    for number in list_of_contacts:
        client.subscribe(
        TopicArn=topic_arn,
        Protocol=’sms’,
        Endpoint=number  # <– numbers who’ll receive an SMS message.
        )
    # Publish a message.
    client.publish(Message=”Hello World!”, TopicArn=topic_arn, MessageAttributes={
    ‘AWS.SNS.SMS.SenderID’: {
    ‘DataType’: ‘String’,
    ‘StringValue’: ‘EASYORADBA’ # <– Name of Sender, Not Available in USA
    },’AWS.SNS.SMS.SMSType’: {‘DataType’: ‘String’, ‘StringValue’: ‘Transactional’}})

Open 2 sessions and from one session run an UPDATE command on table ‘TEST’ and from another session execute the Python script. If you configured every properly, you will get an SMS from Sender ‘EASYORADBA’ wuth message text “Hello World”

4. Save Script & Schedule to run every minute in Crontab

* * * * * /usr/local/bin/python3.7 svl_statementtext_1min.py

This script can be used in a variety of different scenarios to dispatch SMS based on some count logic. Another scenario is to schedule this script to check load errors on your Redshift cluster. Example run this script in your Data loading window and check for errors in STL_LOAD_ERRORS table. If there was a data loading issue then the Data Engineering team can be notified via SMS. I am attaching the script (stl_load_errors.py)  to check for data loading errors. You can change the granuliarity of time in which it should check for load errors by simply changing the time intervsl in date_trunc function.

SQL : select count(*) from stl_load_errors where starttime > date_trunc(‘minute’, sysdate);

Python Script to Create a Data Pipeline Loading Data From RDS Aurora MySQL To Redshift

In this tutorial we will create a Python script which will build a data pipeline to load data from Aurora MySQL RDS to an S3 bucket and copy that data to a Redshift cluster.

One of the assumptions is you have basic understanding of AWS, RDS, MySQL, S3, Python and Redshift. Even if you don’t it’s alright I will explain briefly about each of them to the non-cloud DBA’s

AWS- Amazon Web Services. It is the cloud infrastructure platform from Amazon which can be used to build and host anything from a static website to a globally scalable service like Netflix

RDS – Relational Database Service or RDS or short is Amazons managed relational database service for databases like it’s own Aurora, MySQL, Postgres, Oracle and SQL Server

S3- Simple Storage Service is AWS’s distributed storage which can scale almost infinitely. Data in S3 is stored in Buckets. Think of buckets as Directories but DNS name compliant and cloud hosted

Python – A programming language which is now the defacto standard for data science and engineering

Redshift- AWS’s Petabyte scale Data warehouse which is binary compatible to PostgreSQL but uses a columnar storage engine

The source in this tutorial is a RDS Aurora MySQL database and target is a Redshift cluster. The data is staged in an S3 bucket. With Aurora MySQL you can unload data directly to a S3 bucket but in my script I will offload the table to a local filesystem and then copy it to the S3 bucket. This will give you flexibility in-case you are not using Aurora but a standard MySQL or Maria DB

Environment:

  1. Python 3.7.2 with pip
  2. Ec2 instance with the Python 3.7 installed along with all the Python packages
  3. Source DB- RDS Aurora MySQL 5.6 compatible
  4. Destination DB – Redshift Cluster
  5. Database : Dev , Table : employee in both databases which will be used for the data transfer
  6. S3 bucket for staging the data
  7. AWS Python SDK Boto3

Make sure both the RDS Aurora MySQL and Redshift cluster has security groups which have have IP of the Ec2 instance for inbound connections (Host and Port)

  1. Create the table ’employee’ in both the Aurora and Redshift Clusters

Aurora MySQL 5.6

CREATE TABLE `employee` (
  `id` int(11) NOT NULL,
  `first_name` varchar(45) DEFAULT NULL,
  `last_name` varchar(45) DEFAULT NULL,
  `phone_number` varchar(45) DEFAULT NULL,
  `address` varchar(200) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Redshift

DROP TABLE IF EXISTS employee CASCADE;

CREATE TABLE employee
(
   id            bigint         NOT NULL,
   first_name    varchar(45),
   last_name     varchar(45),
   phone_number  bigint,
   address       varchar(200)
);

ALTER TABLE employee
   ADD CONSTRAINT employee_pkey
   PRIMARY KEY (id);

COMMIT;

2. Install Python 3.7.2 and install all the packages needed by the script

sudo /usr/local/bin/python3.7 -m pip install boto3
sudo /usr/local/bin/python3.7 -m pip install psycopg2-binary
sudo /usr/local/bin/python3.7 -m pip install pymysql
sudo /usr/local/bin/python3.7 -m pip install json
sudo /usr/local/bin/python3.7 -m pip install pymongo

3. Insert sample data into the source RDS Aurora DB

$ mysql -u awsuser -h shadmha-cls-aurora.ap-southeast-2.rds.amazonaws.com -p dev

INSERT INTO `employee` VALUES (1,'shadab','mohammad','04447910733','Randwick'),(2,'kris','joy','07761288888','Liverpool'),(3,'trish','harris','07766166166','Freshwater'),(4,'john','doe','08282828282','Newtown'),(5,'mary','jane','02535533737','St. Leonards'),(6,'sam','rockwell','06625255252','Manchester');

SELECT * FROM employee;

4. Download and Configure AWS command line interface

The AWS Python SDK boto3 requires AWS CLI for the credentials to connect to your AWS account. Also for uploading the file to S3 we need boto3 functions. Install AWS CLI on Linux and configure it.

$ aws configure
AWS Access Key ID [****************YGDA]:
AWS Secret Access Key [****************hgma]:
Default region name [ap-southeast-2]:
Default output format [json]:

5. Python Script to execute the Data Pipeline (datapipeline.py)

import boto3
import psycopg2
import pymysql
import csv
import time
import sys
import os
import datetime
from datetime import date
datetime_object = datetime.datetime.now()
print ("###### Data Pipeline from Aurora MySQL to S3 to Redshift ######")
print ("")
print ("Start TimeStamp")
print ("---------------")
print(datetime_object)
print ("")


# Connect to MySQL Aurora and Download Table as CSV File
db_opts = {
    'user': 'awsuser',
    'password': '******',
    'host': 'shadmha-cls-aurora.ap-southeast-2.rds.amazonaws.com',
    'database': 'dev'
}

db = pymysql.connect(**db_opts)
cur = db.cursor()

sql = 'SELECT * from employee'
csv_file_path = '/home/centos/my_csv_file.csv'

try:
    cur.execute(sql)
    rows = cur.fetchall()
finally:
    db.close()

# Continue only if there are rows returned.
if rows:
    # New empty list called 'result'. This will be written to a file.
    result = list()

    # The row name is the first entry for each entity in the description tuple.
    column_names = list()
    for i in cur.description:
        column_names.append(i[0])

    result.append(column_names)
    for row in rows:
        result.append(row)

    # Write result to file.
    with open(csv_file_path, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        for row in result:
            csvwriter.writerow(row)
else:
    sys.exit("No rows found for query: {}".format(sql))


# Upload Generated CSV File to S3 Bucket
s3 = boto3.resource('s3')
bucket = s3.Bucket('mybucket-shadmha')
s3.Object('mybucket-shadmha', 'my_csv_file.csv').put(Body=open('/home/centos/my_csv_file.csv', 'rb'))


#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= 'dev', host='redshift-cluster-1.ap-southeast-2.redshift.amazonaws.com',
port= '5439', user= 'awsuser', password= '*********')

#Copy Command as Variable
copy_command="copy employee from 's3://mybucket-shadmha/my_csv_file.csv' credentials 'aws_iam_role=arn:aws:iam::775888:role/REDSHIFT' delimiter '|' region 'ap-southeast-2' ignoreheader 1 removequotes ;"

#Opening a cursor and run copy query
cur = con.cursor()
cur.execute("truncate table employee;")
cur.execute(copy_command)
con.commit()

#Close the cursor and the connection
cur.close()
con.close()

# Remove the S3 bucket file and also the local file
DelLocalFile = 'aws s3 rm s3://mybucket-shadmha/my_csv_file.csv --quiet'
DelS3File = 'rm /home/centos/my_csv_file.csv'
os.system(DelLocalFile)
os.system(DelS3File)


datetime_object_2 = datetime.datetime.now()
print ("End TimeStamp")
print ("-------------")
print (datetime_object_2)
print ("")

6. Run the Script or Schedule in Crontab as a Job

$ python3.7 datapipeline.py

Crontab to execute Job daily at 10:30 am

30 10 * * * /usr/local/bin/python3.7 /home/centos/datapipeline.py &>> /tmp/datapipeline.log

7. Check the table in destination Redshift Cluster and all the records should be visible their

SELECT * FROM employee;


This tutorial was done using a small table and very minimum data. But with S3’s distributed nature and massive scale and Redshift as a Data warehouse you can build data pipelines for very large datasets. Redhsift being an OLAP database and Aurora OLTP, many real-life scenarios requires offloading data from your OLTP apps to data warehouses or data marts to perform Analytics on it.

AWS also has an excellent managed solution called Data Pipelines which can automate the movement and transform of Data. But many a times for developing customized solutions Python is the best tool for the job.

Enjoy this script and please let me know in your comments or on Twitter (@easyoradba) if you have any issues or what else would you like me to post for data engineering.

Simple Script to Connect & Query AWS Redshift from Python 3.7

Pre-requisite

You need psycopg2 adapter for Python which is compatible with any PostgreSQL database

To install psycopg2

$ sudo python3.7 -m pip install psycopg2

Sometimes it gives an error when you install from the source, in that case you can install from the binary

$ sudo python3.7 -m pip install psycopg2-binary

import psycopg2

#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= 'dev', host='your-redshift-cluster.redshift.amazonaws.com',
port= '5439', user= 'YourUser', password= 'YourPassword')

#Opening a cursor and running a query
cur = con.cursor()
cur.execute("SELECT SYSDATE;")
#Printing the output
print(cur.fetchall())
cur.close()
con.close()

Output:

[(datetime.datetime(2019, 4, 18, 23, 57, 0, 289196),)]

AWS Reshift Insert into Table without S3

To generate random data into a table without using S3 for doing some quick tests

drop table if exists seed;

create table seed ( n int8 );

insert into seed (
SELECT
p0.n
+ p1.n*2
+ p2.n * POWER(2,2)
+ p3.n * POWER(2,3)
+ p4.n * POWER(2,4)
+ p5.n * POWER(2,5)
+ p6.n * POWER(2,6)
+ p7.n * POWER(2,7)
as number
FROM
(SELECT 0 as n UNION SELECT 1) p0,
(SELECT 0 as n UNION SELECT 1) p1,
(SELECT 0 as n UNION SELECT 1) p2,
(SELECT 0 as n UNION SELECT 1) p3,
(SELECT 0 as n UNION SELECT 1) p4,
(SELECT 0 as n UNION SELECT 1) p5,
(SELECT 0 as n UNION SELECT 1) p6,
(SELECT 0 as n UNION SELECT 1) p7
Order by 1
);

commit;

drop table if exists test_table;

create table test_table(
ingest_time timestamp encode zstd,
doi date encode zstd,
id int encode bytedict,
value float encode zstd,
data_sig varchar(32) encode zstd
) DISTKEY(id) SORTKEY(ingest_time);

commit;

insert into test_table (
select dateadd(‘msec’, – 10n , getdate() ) as ingest_time, trunc(dateadd(‘msec’, – 10n , getdate() )) as doi,id,
n::float / 1000000 as value, ‘sig-‘ || to_hex(n % 16) as data_sig
FROM (select (a.n + b.n + c.n + d.n) as n, (random() * 1000)::int as id from seed a cross join (select n256 as n from seed) b cross join (select n65536 as n from seed) c
cross join (select n*16777216 as n from ( select distinct (n/16)::int as n from seed ) ) d)
) order by ingest_time;

commit;

analyze test_table;

select count(*) from test_table;

–Consecutive run on of above insert query will 268 million rows for each execution–

You can create a table with about 1 billion rows in 8 minutes on a ds2.xlarge cluster