Federated Query from Redshift to Aurora PostgreSQL

Create Public Accessible Redshift Cluster and Aurora PostgreSQL/ RDS PostgreSQL cluster. The RDS PostgreSQL or Aurora PostgreSQL must be in the same VPC as your Amazon Redshift cluster. If the instance is publicly accessible, configure its security group’s inbound rule to: Type: PostgreSQL, Protocol: TCP, Port Range: 5432, Source: 0.0.0.0/0. Otherwise, if the instance is not publicly accessible, you don’t need to configure an inbound rule.

  1. Go to AWS Console > Secrets Manager > Create Secret Managers for RDS Database and Select your PostgreSQL database
  2. Create IAM policy with ARN of above Secrets manager
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AccessSecret",
"Effect": "Allow",
"Action": [
"secretsmanager:GetResourcePolicy",
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret",
"secretsmanager:ListSecretVersionIds"
],
"Resource": "arn:aws:secretsmanager:us-east-1:111111111111:secret:federated-query-L9EBau"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"secretsmanager:GetRandomPassword",
"secretsmanager:ListSecrets"
],
"Resource": "*"
}
]
}
  1. Create IAM Redshift customizable role and attach the Above Policy to it.
  2. Attach the Role to your Redshift Cluster
  3. Create External Schema in Redshift
CREATE EXTERNAL SCHEMA IF NOT EXISTS myRedshiftSchema
FROM POSTGRES
DATABASE 'testdb' SCHEMA 'aurora_schema'
URI 'federated-cluster-instance-1.c2txxxxupg1.us-east-1.rds.amazonaws.com' PORT 5432
OPTIONS 'application_name=psql'
IAM_ROLE 'arn:aws:iam::1111111111111:role/federated-query-role'
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:11111111111111:secret:federated-query-L9EBau';
  1. In RDS Aurora PostgreSQL create the schema which will hold the objects for federated query access
testdb=> create schema aurora_schema;
CREATE SCHEMA
testdb=> create table aurora_schema.federatedtable (id int8, name varchar(50), log_txn_date timestamp);
CREATE TABLE
testdb=> insert into aurora_schema.federatedtable values(1,'shadab','2019-12-26 00:00:00');
INSERT 0 1
testdb=> select * from aurora_schema.federatedtable;
id | name | log_txn_date
----+--------+---------------------
1 | shadab | 2019-12-26 00:00:00
(1 row)
  1. Login to your Redshift cluster and Run the Federated Query from Redshift to PostgreSQL —
testdb=# select * from myredshiftschema.federatedtable;
id | name | log_txn_date
----+--------+---------------------
1 | shadab | 2019-12-26 00:00:00
(1 row)
-- Trying out CTAS from Redshift to PostgreSQL --
testdb=# create table test as select * from myredshiftschema.federatedtable;
SELECT
testdb=# select * from test;
id | name | log_txn_date
----+--------+---------------------
1 | shadab | 2019-12-26 00:00:00
testdb=# select pg_last_query_id();
         1251

(1 row)

select * from svl_s3query_summary where query='1251';
-[ RECORD 1 ]-----------+---------------------------
userid | 100
query | 1251
xid | 7338
pid | 11655
segment | 0
step | 0
starttime | 2019-12-27 06:17:30.800652
endtime | 2019-12-27 06:17:30.947853
elapsed | 147201
aborted | 0
external_table_name | PG Subquery
file_format | Text
.
.
.

References:

[1]
Federated Query in Amazon Redshift (Preview) – https://docs.aws.amazon.com/redshift/latest/dg/federated-overview.html
[2]
CREATE EXTERNAL SCHEMA – https://docs.aws.amazon.com/redshift/latest/dg/federated-external-schema.html
[3]
Create a Secret and an IAM Role for Federated Query – https://docs.aws.amazon.com/redshift/latest/dg/federated-create-secret-iam-role.html

Python Script to Copy-Unload Data to Redshift from S3

import psycopg2
import time
import sys
import datetime
from datetime import date
datetime_object = datetime.datetime.now()
print ("Start TimeStamp")
print ("---------------")
print(datetime_object)
print("")

#Progress Bar Function
def progressbar(it, prefix="", size=60, file=sys.stdout):
    count = len(it)
    def show(j):
        x = int(size*j/count)
        file.write("%s[%s%s] %i/%i\r" % (prefix, "#"*x, "."*(size-x), j, count))
        file.flush()
    show(0)
    for i, item in enumerate(it):
        yield item
        show(i+1)
    file.write("\n")
    file.flush()

#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= 'dev', host='redshift.amazonaws.com',
port= '5439', user= 'awsuser', password= '*****')

#Copy Command as Variable
copy_command="copy users from 's3://redshift-test-bucket/allusers_pipe.txt' credentials 'aws_iam_role=arn:aws:iam::775088:role/REDSHIFTROLE' delimiter '|' region 'ap-southeast-2';"

#Unload Command as Variable
unload_command="unload ('select * from users') to 's3://redshift-test-bucket/users_"+str(datetime.datetime.now())+".csv' credentials 'aws_iam_role=arn:aws:iam::7755088:role/REDSHIFTROLE' delimiter '|' region 'ap-southeast-2';"

#Opening a cursor and run copy query
cur = con.cursor()
cur.execute("truncate table users;")
cur.execute(copy_command)
con.commit()

#Display Progress Bar and Put a sleep condition in seconds to make the program wait
for i in progressbar(range(100), "Copying Data into Redshift: ", 10):
    time.sleep(0.1) # any calculation you need

print("")

#Display Progress Bar and Put a sleep condition in seconds to make the program wait
for i in progressbar(range(600), "Unloading Data from Redshift to S3: ", 60):
    time.sleep(0.1) # any calculation you need

print("")

#Opening a cursor and run unload query
cur.execute(unload_command)

#Close the cursor and the connection
cur.close()
con.close()

datetime_object_2 = datetime.datetime.now()
print ("End TimeStamp")
print ("-------------")
print(datetime_object_2)
print("")