Build and store a Hive mestastore outside an EMR cluster in a RDS MySQL database and Connect a Redshift cluster to an EMR cluster

This document addresses the specific configuration points that needs to be in place in order to build and store a Hive mestastore outside an EMR cluster in a RDS MySQL database. It also covers the steps to connect a Redshift cluster to an EMR cluster so Redshift can create and access the tables stored within the external metastore.

Resources Used:

Redshift Cluster

• RDS MySQL Instance

• EMR Cluster

Note: All resources must be in same VPC and same region for this practice.

Creating the RDS MySQL:

1 – First, start creating a RDS MySQL instance if you don’t have one already. Open AWS RDS Console and create an MySQL instance that will be used during this practice.

Note: Please make note of RDS security group, endpoint, Master User and Master Password. We will need that information later on.

2 – Once the RDS MySQL instance is created, modify its security groups to add a rule for All traffic on all Port Range to be allowed from the VPC’s default security group.

Note: This VPC’s default Security Group will be used while creating the EMR cluster later on as well but it needs to be whitelisted beforehand otherwise the EMR launching will fail while trying to reach out to the RDS MySQL.

Before creating the EMR Cluster:

3 – After creating the RDS MySQL (and open its security group to EMR) but right before creating the EMR cluster, a JSON configuration file needs to be created. This file will be ingested by EMR during the bootstrapping phase of EMR’s creation, it will basically tell EMR how to access the remote RDS MySQL database.

4 – Copy the JSON property structure from the following link (use Copy icon): https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive-metastore-external.html

5 – Paste it in a text editor and modify it carefully with the RDS details you noted earlier.

Note: Be careful, the value property can not contain any spaces or carriage returns. It should appear all on one line. Save it as “hiveConfiguration.json”.

6 – The final JSON configuration file should look like the following:

[
    {
      “Classification”: “hive-site”,
      “Properties”: {

        “javax.jdo.option.ConnectionURL”: “jdbc:mysql:\/\/database-1.cefjr3enh3dk.us-east-2.rds.amazonaws.com:3306\/hive?createDatabaseIfNotExist=true“,      

        “javax.jdo.option.ConnectionDriverName”:”org.mariadb.jdbc.Driver”,
        “javax.jdo.option.ConnectionUserName”:”admin“,

        “javax.jdo.option.ConnectionPassword”: “*********
      }
    }
]

Note 1: replace <hostname>, <username>, <password> with your own details:

Note 2: The part “hive?createDatabaseIfNotExist=true” determines the name of the database to be created in the MySQL RDS, in this case the database will be called “hive”.

7 – After creating above file, upload it to an S3 bucket/folder of your choice (in the same region of your resources).

Creating the EMR:

8 – Now, it is time to create the EMR cluster. To do this, open AWS EMR console and click Create Cluster button. This will prompt the Quick Options page but we won’t be using that. Click on Go to advanced options on the top of the page.

9 – This will send you to the Advanced Options page. There, under Software Configuration, select the following Applications:

Hadoop, Ganglia, Hive, Hue, Tez, Pig, Mahout

10 – In the same page, under Edit Software Settings section, click Load JSON from S3 and select the S3 bucket/path where you uploaded the previous created file “hiveConfiguration.json“. Select the file there and hit Select.

11 – In the Hardware Configuration page, make sure that the EMR cluster is in the same VPC as your MySQL RDS instance. Hit Next if you don’t want to change any Network configuration or Node types.

12 – Hit Next in the General Options page if you don’t want to change anything, although you might want to change the name of your EMR cluster here.

13 – In the next page, Security Options, make sure you have an EC2 Key Pair in that region and select it. Otherwise, create one!

Note: Create one now (if you don’t have one) before creating the EMR as you CAN’T add it later!!!

14 – Still in the Security Options page, expand the EC2 security groups panel and change both, Master and Core & Task instances to use the VPC’s default security group (the same whitelisted in the RDS MySQL security group earlier).

15 – Hit Create cluster and wait the EMR to be created. It will take some time…

Confirming that the metastore was created in the RDS MySQL

16 – Once the EMR is created, another rule needs to be added to the VPC’s default security group, one that allows SSHing into the EMR cluster on port 22 from your local IP. It should look like the following:

17 – With the right rules in place, try to connect to your EMR cluster from your local machine:

– – – chmod 600 article_key.pem 
–   ssh -i article_key.pem hadoop@ec2-18-XX-XX-XX.us-east-2.compute.amazonaws.com

18 – EMR has a MySQL client installed, use this client to connect to your MySQL database and perform few tests such as if the Security Groups are working properly and if the “hive” database was created properly

Note: You can do a telnet test from within EMR box as well to test Security Group access.

19 – To connect to the RDS MySQL, run the following command from your EMR box:

 mysql -h <rds-endpoint> -P 3306 -u <rds master user> -p <rds master password>

Examplemysql -h database-1.cefjr3enh3dk.us-east-2.rds.amazonaws.com -P 3306 -u admin123 -pPwD12345

20 – Once connected, use the following commands to verify if the Hive metastore was indeed created in the RDS. You should be able to see a database named “hive” there:

show databases;       à Lists all databases – “hive” should be there
use hive;             à Connects you to “hive” database
show tables;          à Lists all the meta tables within hive database
select * from TBLS;   à Lists all tables created in hive. At this point there’s none

Setting up necessary Spectrum Roles and Network requirements for Redshift and EMR

Note 1: Following steps assume that you already have a Redshift cluster and that you can connect to it. It will not guide you on how to create and access the Redshift cluster. 

Note 2: Since EMR, RDS MySQL share the same VPC’s default security group, they should be able to communicate to each other already. If that’s the case, you can skip Step 22 and go straight to Step 23, otherwise, If EMR and Redshift use different security groups, please do the step 22 first.

21 – Create a Role for Spectrum and attach it to your Redshift cluster. Follow the instructions here:

•       To Create the Role: https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-create-role.html

•       To Associate the Role: https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-add-role.html

22 – (Optional) Now that Redshift can access S3, Redshift also needs to access EMR cluster and vice-versa. Follow the steps listed under section “Enabling Your Amazon Redshift Cluster to Access Your Amazon EMR Cluster” in the following link: https://docs.aws.amazon.com/redshift/latest/dg/c-spectrum-external-schemas.html#c-spectrum-enabling-emr-access

Note: In summary, this creates an EC2 security group with Redshift’s Security Group and the EMR’s master node’s security groups inside it. Redshift’s Security Group must allow TCP in every port (0 – 65535) while EMR’s Security Group must allow TCP in port 9083 (Hive’s default). Next, you attach this newly created security group to both of your Redshift and EMR clusters.

23 – Once this is done, you should now be able to create the External Schema in Redshift, query the external tables from Redshift and also be able to create/see the schemas/tables from EMR Hive as well. However, at this point there’s no tables created yet.

Creating Tables on Hive First

24 – Log to Hive console and run the following:

> show databases;
default  (that’s the only database so far)
 
> create external table hive_table (col1 int, col2 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’
location ‘s3://<your_bucket>/<your_folder>/‘;

> show tables;
hive_table  (that’s the table we just created) 

25 – Log back to your MySQL database and run the following commands:

show databases;       à Lists all databases – “hive” should be there
use hive;             à Connects you to “hive” database
show tables;          à Lists all the meta tables within hive database
select * from TBLS;   à  Lists all tables created in hive, “hive_table” is there!

Note: Now you will be able to see the newly created table “hive_table” showing on your External MySQL catalog.

Creating Schemas and Tables on Redshift Now

26 – On Redshift side, an External Schema must be created first before creating or querying the Hive tables, like following:

CREATE EXTERNAL SCHEMA emr_play                     à It can be any name, that’s a schema valid only for Redshift.
FROM HIVE METASTORE DATABASE ‘default’              à Use default database to match the database we have in Hive.  
URI ‘172.XXX.XXX.XXX‘ PORT 9083                     à EMR’s Private IP of the Master Instance. Hive’s default port is 9083.
IAM_ROLE ‘arn:aws:iam::000000000000:role/spectrum‘; à A valid Spectrum Role attached Redshift.

27 – Create the table(s):

create external table emr_play.redshift_table (col1 int, col2 varchar)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’
location ‘s3://<your_bucket>/<your_folder>/‘;

28 – Simply query the table now:

select * from emr_play.redshift_table;

29 – One more time, log back to your MySQL database and run the following commands again:

show databases;       à Lists all databases – “hive” should be there
use hive;             à Connects you to “hive” database
show tables;          à Lists all the meta tables within hive database
select * from TBLS;   à  Lists all tables created in hive, both tables are there!

 Note: You should be able to see the both Hive and Redshift tables now showing on your External MySQL catalog. You can also query the tables and create new tables on both Hive and Redshift side.

Create-Modify-Destroy Redshift Cluster Using Terraform

Devops tool have become quite popular in the last few years. Infrastructure automation tools like Chef, Ansible, Cloudformation and Terraform are increasingly being used to provision cloud infrastructure. Once only used for provisioning compute resources but nowadays due to the agile data analytics organizational need even resources like Data warehouses are being added to the devops cycle. Most of these tools eg: Saltstack, Ansible, Chef, Puppet etc are widely used in the industry one of them stands out among the rest : Terraform.

What makes Terraform different from others including our very own Cloudformation is it’s declarative nature, most of infrastructure automation tools are procedural in nature not declarative. Let me explain the difference between declarative and procedural

 Lets say you want to provision 10 ec2 instances using an automation approach. With a tool like Ansible your template would like something below using a procedural declaration.

 – ec2:
 count: 10
 image: ami-v1
 instance_type: t2.micro

Same code in Terraform using a declarative approach looks like

 resource “aws_instance” “example” {
 count = 10
 ami = “ami-v1”
 instance_type = “t2.micro”
}

The difference is that even though both approaches look similar, lets say you want to add additional 5 servers to the configuration. The ansible code is essentially useless since ansible does not maintain state. For ansible if you change the count and increase it to 15, it will create 15 new additional EC2 instances. Ansible has no way to know what it did in the past. For creating total 15 servers you need to add additional 5.

 – ec2:
 count: 5
 image: ami-v1
 instance_type: t2.micro

With Terraform this is the big game changer. Terraform maintains state of your infrastructure. Terraform is aware of any state it created in the past. Therefore, to deploy additional 5 more servers, all you have to do is go back to the same Terraform template and update the count from 10 to 15:

 resource “aws_instance” “example” {
 count = 15
 ami = “ami-v1”
 instance_type = “t2.micro”
}

When you execute this template Terraform knows it created 10 instances before so it will add only the 5 new instances. With declarative approach the end goal matters. This makes Terraform the winner IMHO from all others. So in this example once we are done with the test , to delete the cluster we just have to run one command without specifying any additional details. Becuase Terraform maintains a record that it created a Redshift cluster with so and so name.

 Let’s now jump in and create a Redshift dc1.large cluster in region ‘us-east-1’ using Terraform

1. Download and Install Terraform for Linux from the Terraform Website : https://www.terraform.io/downloads.html

Note : Install awscli and configure your AWS credentials before we begin

On Linux the download is a zip file containing only 1 file. Unzip to any directory and copy the file ‘terraform’ to /usr/bin

 2. Create a Terraform configuration file in a new directory

 mkdir redshift_tf

 cd redshift_tf

 vim redshift.tf

 provider “aws” {
 region = “us-east-1”
}
resource “aws_redshift_cluster” “default” {
 cluster_identifier = “terraform-rs-cluster”
 database_name = “testdb”
 master_username = “awsuser”
 master_password = “SomePassword1”
 node_type = “dc1.large”
 cluster_type = “single-node”
 skip_final_snapshot = true
}

 3. Initiate Terraform

$ terraform init

 Initializing the backend…

Initializing provider plugins…
– Checking for available provider plugins…
– Downloading plugin for provider “aws” (terraform-providers/aws) 2.14.0…

The following providers do not have any version constraints in configuration,
so the latest version was installed.

To prevent automatic upgrades to new major versions that may contain breaking
changes, it is recommended to add version = “…” constraints to the
corresponding provider blocks in configuration, with the constraint strings
suggested below.

* provider.aws: version = “~&gt; 2.14”

Terraform has been successfully initialized!

 4. Apply Terraform Configuration

Note 1: From Terraform 0.11 and above you do not have to run ‘terraform plan’ command

Note
 2 : For security purpose it is not good practice to store access_key or
 secret_key in the .tf file. If you have installed awscli then Terraform
 will take your AWS credentials from ‘~/.aws/credentials’ or IAM
credentials.

$ terraform apply

 An execution plan has been generated and is shown below.
Resource actions are indicated with the following symbols:
 + create

Terraform will perform the following actions:

 # aws_redshift_cluster.default will be created
 + resource “aws_redshift_cluster” “default” {
 + allow_version_upgrade = true
 + automated_snapshot_retention_period = 1
 + availability_zone = (known after apply)
 + bucket_name = (known after apply)
 + cluster_identifier = “terraform-rs-cluster”
 + cluster_parameter_group_name = (known after apply)
 + cluster_public_key = (known after apply)
 + cluster_revision_number = (known after apply)
 + cluster_security_groups = (known after apply)
 + cluster_subnet_group_name = (known after apply)
 + cluster_type = “single-node”
 + cluster_version = “1.0”
 + database_name = “testdb”
 + dns_name = (known after apply)
 + enable_logging = (known after apply)
 + encrypted = false
 + endpoint = (known after apply)
 + enhanced_vpc_routing = (known after apply)
 + iam_roles = (known after apply)
 + id = (known after apply)
 + kms_key_id = (known after apply)
 + master_password = (sensitive value)
 + master_username = “awsuser”
 + node_type = “dc1.large”
 + number_of_nodes = 1
 + port = 5439
 + preferred_maintenance_window = (known after apply)
 + publicly_accessible = true
 + s3_key_prefix = (known after apply)
 + skip_final_snapshot = false
 + vpc_security_group_ids = (known after apply)
 }

Plan: 1 to add, 0 to change, 0 to destroy.

 aws_redshift_cluster.default: Creation complete after 3m33s [id=terraform-rs-cluster]

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

 5. Check the state of your infrastructure

You can go
check in your AWS console &gt; Redshift Dashboard and you will see the cluster. To see it from terraform run the below command

$ terraform show

 6. Destroy the Redshift cluster
Like i mentioned in the beginning of this article, the beauty of Terraform is it maintains
state of your infrastructure. You can remove the Redshift cluster by running
 just one simple command

$ terraform destroy

Check Redshift Table and Send SMS Programmatically using Amazon SNS

Requirement : Check if an UPDATE was run on a Redshift Table and Send SMS Programmatically using Amazon SNS. This script can be used in a variety of different scenarios, for eg: you can use the same logic to check for load errors on you cluster or check for INSERTS or DELETE commands.

Check if an UPDATE has occurred on a Table and send an SMS everytime the table is updated

Pre-requisites :

1. BOTO3 Python SDK installed for Python3.7

2. AWS CLI installed

3. Pscyopg2 package installed for Python3.7

4. Redshift Cluster

5. Amazon SNS configured to send SMS

6. Basic understanding of Python scripting, BOTO3 and Redshift.

Environment:

EC2 Instance Running CENTOS

Python3.7 Installed

AWS CLI installed and Configured Account credentials

Solution:

We will create a Python script to check svl_statementtext for update statements on a table ‘TEST’. The script can be configured to run in crontab every minute and if it an UPDATE occurs it dispatches an SMS using SNS.

1. Create the TEST table in your Redshift cluster and Insert some data into it

testdb=# create table test (id int8, name varchar(20));
CREATE TABLE
testdb=# insert into test values(1,’John’);
INSERT 0 1
testdb=# insert into test values(2,’Matt’);
INSERT 0 1
testdb=# insert into test values(3,’Chris’);

2. Run an UPDATE statement on the table and check svl_statementtext

testdb=# select * from test;
 id | name
—-+——-
  1 | John
  2 | Matt
  3 | Chris
(3 rows)

testdb=# update test set name=’Tim’ where id=1;
UPDATE 1


testdb=# select * from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);
 userid |  xid   |  pid  |             label              |         starttime
       |          endtime           | sequence | type  |
                                                                           text
    100 | 506858 | 20017 | default                        | 2019-05-25 14:44:41.
657139 | 2019-05-25 14:44:49.955101 |        0 | QUERY | update test set name=’T
im’ where id=1;


(1 row)

As you can see the table logs the update command and displays it. Now we will run another update command and check the same table but using count(*)

testdb=# update test set name=’John’ where id=1;
UPDATE 1


testdb=# select count(*) from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);
 count
——-
     1
(1 row)

So it correctly display that 1 row was updated in the last minute on table ‘TEST’. Using this logic we can poll the table every minute to see if a transaction hit the table svl_statementtext. And if it did we will send an SMS via SNS

3. Python Script (#Attached svl_statementtext_1min.py) to Check for UPDATE statements in last one minute and if COUNT is not ‘0’ then send an SMS

import boto3
import psycopg2

#Obtaining the connection to RedShift
con=psycopg2.connect(dbname= ‘testdb’, host=’redshift-dc2-test.ctzrqaulg0u6.us-east-1.redshift.amazonaws.com’,
port= ‘5439’, user= ‘awsuser’, password= ‘********’)


#Opening a cursor and run sql query
cur = con.cursor()
cur.execute(“select count(*) from svl_statementtext where text ilike ‘update%test%’ and starttime > date_trunc(‘minute’, sysdate);”)
data = str(cur.fetchone())
print(data)
con.commit()

#Close the cursor and the connection
cur.close()
con.close()

# Compare data variable for threshold
if  data == ‘(0,)’:
    print(“NO UPDATES IN LAST 1 MINUTE ON TABLE TEST”)

else:
    print(“UPDATES IN LAST 1 MINUTE ON TABLE TEST”)
    # Create an SNS client
    client = boto3.client(
    “sns”,”us-east-1″
    )
    # Create the topic if it doesn’t exist
    topic = client.create_topic(Name=”invites-for-push-notifications”)
    topic_arn = topic[‘TopicArn’]  # get its Amazon Resource Name
    # Get List of Contacts
    list_of_contacts = [“+6144*********”] # <– You can add a list of mutiple mobile numbers here
    # Add SMS Subscribers
    for number in list_of_contacts:
        client.subscribe(
        TopicArn=topic_arn,
        Protocol=’sms’,
        Endpoint=number  # <– numbers who’ll receive an SMS message.
        )
    # Publish a message.
    client.publish(Message=”Hello World!”, TopicArn=topic_arn, MessageAttributes={
    ‘AWS.SNS.SMS.SenderID’: {
    ‘DataType’: ‘String’,
    ‘StringValue’: ‘EASYORADBA’ # <– Name of Sender, Not Available in USA
    },’AWS.SNS.SMS.SMSType’: {‘DataType’: ‘String’, ‘StringValue’: ‘Transactional’}})

Open 2 sessions and from one session run an UPDATE command on table ‘TEST’ and from another session execute the Python script. If you configured every properly, you will get an SMS from Sender ‘EASYORADBA’ wuth message text “Hello World”

4. Save Script & Schedule to run every minute in Crontab

* * * * * /usr/local/bin/python3.7 svl_statementtext_1min.py

This script can be used in a variety of different scenarios to dispatch SMS based on some count logic. Another scenario is to schedule this script to check load errors on your Redshift cluster. Example run this script in your Data loading window and check for errors in STL_LOAD_ERRORS table. If there was a data loading issue then the Data Engineering team can be notified via SMS. I am attaching the script (stl_load_errors.py)  to check for data loading errors. You can change the granuliarity of time in which it should check for load errors by simply changing the time intervsl in date_trunc function.

SQL : select count(*) from stl_load_errors where starttime > date_trunc(‘minute’, sysdate);

AWS Reshift Insert into Table without S3

To generate random data into a table without using S3 for doing some quick tests

drop table if exists seed;

create table seed ( n int8 );

insert into seed (
SELECT
p0.n
+ p1.n*2
+ p2.n * POWER(2,2)
+ p3.n * POWER(2,3)
+ p4.n * POWER(2,4)
+ p5.n * POWER(2,5)
+ p6.n * POWER(2,6)
+ p7.n * POWER(2,7)
as number
FROM
(SELECT 0 as n UNION SELECT 1) p0,
(SELECT 0 as n UNION SELECT 1) p1,
(SELECT 0 as n UNION SELECT 1) p2,
(SELECT 0 as n UNION SELECT 1) p3,
(SELECT 0 as n UNION SELECT 1) p4,
(SELECT 0 as n UNION SELECT 1) p5,
(SELECT 0 as n UNION SELECT 1) p6,
(SELECT 0 as n UNION SELECT 1) p7
Order by 1
);

commit;

drop table if exists test_table;

create table test_table(
ingest_time timestamp encode zstd,
doi date encode zstd,
id int encode bytedict,
value float encode zstd,
data_sig varchar(32) encode zstd
) DISTKEY(id) SORTKEY(ingest_time);

commit;

insert into test_table (
select dateadd(‘msec’, – 10n , getdate() ) as ingest_time, trunc(dateadd(‘msec’, – 10n , getdate() )) as doi,id,
n::float / 1000000 as value, ‘sig-‘ || to_hex(n % 16) as data_sig
FROM (select (a.n + b.n + c.n + d.n) as n, (random() * 1000)::int as id from seed a cross join (select n256 as n from seed) b cross join (select n65536 as n from seed) c
cross join (select n*16777216 as n from ( select distinct (n/16)::int as n from seed ) ) d)
) order by ingest_time;

commit;

analyze test_table;

select count(*) from test_table;

–Consecutive run on of above insert query will 268 million rows for each execution–

You can create a table with about 1 billion rows in 8 minutes on a ds2.xlarge cluster