Tracking Changes in Your PostgreSQL Tables: Implementing a Custom Change Data Capture (CDC)

Introduction:
Change Data Capture (CDC) is a technique used to track changes in a database, such as inserts, updates, and deletes. In this blog post, we will show you how to implement a custom CDC in PostgreSQL to track changes in your database. By using a custom CDC, you can keep a record of changes in your database and use that information in your applications, such as to provide a history of changes, track auditing information, or trigger updates in other systems

Implementing a Custom CDC in PostgreSQL:
To implement a custom CDC in PostgreSQL, you will need to create a new table to store the change information, create a trigger function that will be executed whenever a change is made in the target table, and create a trigger that will call the trigger function. The trigger function will insert a new row into the change table with the relevant information, such as the old and new values of the record, the time of the change, and any other relevant information.

To demonstrate this, we will show you an example of a custom CDC for a table called “employee”. The change table will be called “employee_cdc” and will contain columns for the timestamp, employee ID, old values, and new values of the employee record. The trigger function will be executed after an update on the “employee” table and will insert a new row into the “employee_cdc” table with the relevant information. Finally, we will show you how to query the “employee_cdc” table to retrieve a list of all changes that have occurred in the “employee” table since a certain timestamp.

  1. Create the Employee and CDC table

    To store the CDC information, you need to create a new table in your PostgreSQL database. In this example, we will create a table called “employee”, “employee_cdc”, “employee_audit” with the following columns:

CREATE TABLE employee (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(50) NOT NULL,
salary NUMERIC(10,2) NOT NULL,
hire_date DATE NOT NULL
);

CREATE TABLE employee_cdc (
timestamp TIMESTAMP DEFAULT now(),
employee_id INTEGER,
old_values JSONB,
new_values JSONB
);

In this table, “id” is an auto-incremented primary key, “timestamp” is a timestamp with time zone to store the time of the change, “employee_id” is the primary key of the employee record that was changed, and “old_values” and “new_values” are text columns to store the old and new values of the employee record, respectively.

2. Create the Audit table

CREATE TABLE employee_audit (
audit_timestamp TIMESTAMP DEFAULT now(),
employee_id INTEGER,
old_values JSONB,
new_values JSONB
);

3. Create the trigger function

To capture the changes in the employee table, you will need to create a trigger function that will be executed whenever a record is inserted, updated, or deleted in the table. The trigger function will insert a new row into the “employee_cdc” table with the relevant information. Here is an example trigger function:

CREATE OR REPLACE FUNCTION employee_cdc() RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'UPDATE') THEN
INSERT INTO employee_cdc (timestamp, employee_id, old_values, new_values)
VALUES (now(), NEW.id, row_to_json(OLD), row_to_json(NEW));
INSERT INTO employee_audit (employee_id, old_values, new_values)
VALUES (NEW.id, row_to_json(OLD), row_to_json(NEW));
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

This trigger function uses the “row_to_json” function to convert the old and new values of the employee record into JSON strings, which are then stored in the “old_values” and “new_values” columns of the “employee_cdc” table. The “NOW()” function is used to get the current timestamp.

4. Create the trigger

Now that the trigger function has been created, you need to create the trigger on the “employee” table that will call the function whenever a record is updated. You can create the trigger with the following command:

CREATE TRIGGER employee_cdc_trigger
AFTER UPDATE ON employee
FOR EACH ROW
EXECUTE FUNCTION employee_cdc();

4. Query the CDC table

In your application code, you can query the “employee_cdc” table to get a list of all changes that have occurred since a certain timestamp. For example, to get all changes since January 1st, 2023, you can use the following SQL query:

SELECT * FROM employee_cdc
WHERE timestamp >= '2023-01-01 00:00:00';

You can then process these changes as needed in your application code.

Conclusion:
In this blog post, we have shown you how to implement a custom Change Data Capture (CDC) in PostgreSQL to track changes in your database. By using a custom CDC, you can keep a record of changes in your database and use that information in your applications. Whether you are tracking changes for auditing purposes, providing a history of changes, or triggering updates in other systems, a custom CDC is a useful tool to have in your PostgreSQL toolkit.

Advertisement

Create DMS Replication From MongoDB 4.2 on EC2 Linux to Redshift

Create DMS Replication From MongoDB 4.2 on EC2 Linux to Redshift

Summary

We will create a hub to spoke replication from Mongo DB 4.2 Database to Redshift Schema. MongoDb is installed in the same VPC as Redshift and DMS replication Instance

Main Text

MongoDB is a NoSQL Datastore where data in inserted and read in JSON format. Internally, a MongoDB document is stored as a binary JSON (BSON) file in a compressed format. Terminology for database, schema and tables in MongoDB is a bit different when compared to relational databases. Here’s how each jargon in MongoDb compares to one in Redshift/PostgreSQL

MongoDBRedshift/PostgreSQL
DatabaseSchema
CollectionTable
DocumentsRecords/Rows

MongoDB does not have a a proper structure for a schema and it is is essentially a document store so data can be inserted without defining a proper schema or a structure. This gives MongoDB a lot of flexibility and hence it is the preferred choice for modern applications as you can start development of your application without defining a data model first. It is a schema-less approach to software architecture, which has its own pros and cons.

In our example we will work through the default database in MongoDB called “admin” and create collections(tables) in it and those tables will be replicated to an equivalent schema in Redshift called “admin” which will hold the different tables. We will go about this in 3 stages:

Stage 1 : Create MongoDB on EC2 AMZN Linux, create the collections and connect to MongoDB from your client machine and check the configuration.

Stage 2: Create Redshift Cluster, Aurora PostgreSQL in Private and Public Subnet and Connect to the All Database instances and check.

Stage 3: Create DMS Replication Instance, DMS Replication Endpoints & DMS Replication Tasks.And finally we will check if all data is being replicated by DMS to all the targets.

Architecture : It is a Hub-to-Spoke Architecture with MongoDB Source Being Replicated to Multiple Heterogeneous Targets. However in this Article we will only configure Replication from MongoDB to Redshift.

Stage 1: Create a EC2 Amazon Linux in a Public Subnet and Install MongoDB in it

You can use AWS ‘MongoDB Quick Start’ guide on AWS to deploy MongoDB into a new VPC or deploy into an existing VPC. The guide has two cloud formation templates which can create a new VPC under your account, configure the public & private subnets and launch the EC2 instances with latest version of MongoDB installed. Check this link for the quick deployment options for MongoDB on AWS : https://docs.aws.amazon.com/quickstart/latest/mongodb/step2.html

OR

If you want to install MongoDB manually then you can follow the below procedure :

1. Create Amazon Linux EC2 Instance

2. Add MongoDB Repo and Install MongoDB on AMZN Linux

$ sudo vi /etc/yum.repos.d/mongodb-org-4.0.repo


[mongodb-org-4.0]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/testing/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-4.2.asc

$ sudo yum -y install mongodb-org
$ sudo service mongod start
$ sudo cat /var/log/mongodb/mongod.log


3 . Create Root login

 $ mongo localhost/admin
 > use admin
 > db.createUser( { user:"root", pwd:"root123", roles: [ { role: "root", db: "admin" } ] } )
 > exit

 
4. Modify mongod configuration file /etc/mongod.conf using vi editor
 
     Change below lines from

    # network interfaces
    net:
      port: 27017
      bindIp: 127.0.0.1  # Listen to local interface only, comment to listen on all interfaces.
    #security:

    To

# network interfaces
net:
  port: 27017
  bindIp: 0.0.0.0  # Listen to local interface only, comment to listen on all interfaces.
security:
  authorization: enabled
 
5. Restart mongod service
 

 $ sudo service mongod restart

Use admin DB as authentication database and ‘root’ user can be used for CDC full load task.

6. For CDC replication, a rmongodb replica needs to be setup and permissions need to be modified/added as below :

Modify mongod.conf using vi editor

$ sudo vi /etc/mongod.conf


replication:
  replSetName: rs0

$ sudo service mongod restart


7. Initiate Replica Set for CDC

$ mongo localhost/admin -u root -p
> rs.status()


{
    “ok” : 0,
    “errmsg” : “no replset config has been received”,
    “code” : 94,
    “codeName” : “NotYetInitialized”
}

> rs.initiate()


{
    “info2” : “no configuration specified. Using a default configuration for the set”,
    “me” : “ip-10-0-137-99.ap-southeast-2.compute.internal:27017”,
    “ok” : 1
}

rs0:SECONDARY> rs.status()

{
    “set” : “rs0”,
    “date” : ISODate(“2019-12-31T05:02:22.431Z”),
    “myState” : 1,
    “term” : NumberLong(1),
    “syncingTo” : “”,
    “syncSourceHost” : “”,
    “syncSourceId” : -1,
    “heartbeatIntervalMillis” : NumberLong(2000),
    “majorityVoteCount” : 1,
    “writeMajorityCount” : 1,
    “optimes” : {
        “lastCommittedOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “lastCommittedWallTime” : ISODate(“2019-12-31T05:02:08.362Z”),
        “readConcernMajorityOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “readConcernMajorityWallTime” : ISODate(“2019-12-31T05:02:08.362Z”),
        “appliedOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “durableOpTime” : {
            “ts” : Timestamp(1577768528, 5),
            “t” : NumberLong(1)
        },
        “lastAppliedWallTime” : ISODate(“2019-12-31T05:02:08.362Z”),
        “lastDurableWallTime” : ISODate(“2019-12-31T05:02:08.362Z”)
    },
    “lastStableRecoveryTimestamp” : Timestamp(1577768528, 4),
    “lastStableCheckpointTimestamp” : Timestamp(1577768528, 4),
    “electionCandidateMetrics” : {
        “lastElectionReason” : “electionTimeout”,
        “lastElectionDate” : ISODate(“2019-12-31T05:02:07.346Z”),
        “electionTerm” : NumberLong(1),
        “lastCommittedOpTimeAtElection” : {
            “ts” : Timestamp(0, 0),
            “t” : NumberLong(-1)
        },
        “lastSeenOpTimeAtElection” : {
            “ts” : Timestamp(1577768527, 1),
            “t” : NumberLong(-1)
        },
        “numVotesNeeded” : 1,
        “priorityAtElection” : 1,
        “electionTimeoutMillis” : NumberLong(10000),
        “newTermStartDate” : ISODate(“2019-12-31T05:02:08.354Z”),
        “wMajorityWriteAvailabilityDate” : ISODate(“2019-12-31T05:02:08.362Z”)
    },
    “members” : [
        {
            “_id” : 0,
            “name” : “ip-10-0-137-99.ap-southeast-2.compute.internal:27017”,
            “ip” : “10.0.137.99”,
            “health” : 1,
            “state” : 1,
            “stateStr” : “PRIMARY”,
            “uptime” : 80,
            “optime” : {
                “ts” : Timestamp(1577768528, 5),
                “t” : NumberLong(1)
            },
            “optimeDate” : ISODate(“2019-12-31T05:02:08Z”),
            “syncingTo” : “”,
            “syncSourceHost” : “”,
            “syncSourceId” : -1,
            “infoMessage” : “could not find member to sync from”,
            “electionTime” : Timestamp(1577768527, 2),
            “electionDate” : ISODate(“2019-12-31T05:02:07Z”),
            “configVersion” : 1,
            “self” : true,
            “lastHeartbeatMessage” : “”
        }
    ],
    “ok” : 1,
    “$clusterTime” : {
        “clusterTime” : Timestamp(1577768528, 5),
        “signature” : {
            “hash” : BinData(0,”nDISrR4afyRUVEQVntFkkVpTJKY=”),
            “keyId” : NumberLong(“6776464228418060290”)
        }
    },
    “operationTime” : Timestamp(1577768528, 5)
}
rs0:PRIMARY>



8. Make sure security group is open for dms replication group for the port on your EC2 instance where MongoDB is running (Default MongoDB port is 27017)



9. Add a collection (table) with some data to database ‘admin’ in the mongodb installation

> show collections
db.createCollection("accounts", { capped : true, autoIndexId : true, size : 
   6142800, max : 10000 } )
db.accounts.insert({"company": "Booth-Wade",
    "location": "5681 Mitchell Heights\nFort Adamstad, UT 8019B",
    "ip_address": "192.168.110.4B",
    "name": "Mark Becker",
    "eid": 27561})
   
db.accounts.insert({"company": "Myers,  Smith and Turner",
      "location": "USS BenjaminNlinFP0 AA 40236",
      "ip_address": "172.26.254.156",
      "name": "Tyler clark",
      "eid": 87662})

db.accounts.insert({"company": "Bowen-Harris",
      "location": "Tracey Plaza East Katietown,Sc74695",
      "ip_address": "172.28.45.209",
      "name": "Veronica Gomez",
      "eid": 772122})
     
> db.accounts.find( {} )

— Insert Many —
      
      db.accounts.insertMany(
      [
    {“company”: “Booth-Wade”,
      “location”: “5681 Mitchell Heights\nFort Adamstad, UT 8019B”,
      “ip_address”: “192.168.110.4B”,
      “name”: “Mark Becker”,
      “eid”: 27561},   
    {“company”: “Myers,  Smith and Turner”,
      “location”: “USS BenjaminNlinFP0 AA 40236”,
      “ip_address”: “172.26.254.156”,
      “name”: “Tyler clark”,
      “eid”: 87662},        
    {“company”: “Bowen-Harris”,
      “location”: “Tracey Plaza East Katietown,Sc74695”,
      “ip_address”: “172.28.45.209”,
      “name”: “Veronica Gomez”,
      “eid”: 772122}
      ]);
     

— Array Insert —

  db.accounts.insertMany( [
{
  “_id”: “5e037719f45Btodlcdb492464”,
  “accounts”: [
    {
      “company”: “Booth-Wade”,
      “location”: “5681 Mitchell Heights\nFort Adamstad, UT 8019B”,
      “ip_address”: “192.168.110.4B”,
      “name”: “Mark Becker”,
      “eid”: 27561
    },
    {
      “company”: “Myers,  Smith and Turner”,
      “location”: “USS BenjaminNlinFP0 AA 40236”,
      “ip_address”: “172.26.254.156”,
      “name”: “Tyler clark”,
      “eid”: 87662
    },
    {
      “company”: “Bowen-Harris”,
      “location”: “Tracey Plaza East Katietown,Sc74695”,
      “ip_address”: “172.28.45.209”,
      “name”: “Veronica Gomez”,
      “eid”: 772122
    }
  ]
}
]);

— Inventory Nested Array Collection —
db.createCollection(“inventory”, { capped : true, autoIndexId : true, size :
   6142800, max : 10000 } )
   
   db.createCollection(“inventory_new”, { capped : true, autoIndexId : true, size :
   6142800, max : 10000 } )
   
db.inventory.insertMany( [
   { item: “journal”, instock: [ { warehouse: “A”, qty: 5 }, { warehouse: “C”, qty: 15 } ] },
   { item: “notebook”, instock: [ { warehouse: “C”, qty: 5 } ] },
   { item: “paper”, instock: [ { warehouse: “A”, qty: 60 }, { warehouse: “B”, qty: 15 } ] },
   { item: “planner”, instock: [ { warehouse: “A”, qty: 40 }, { warehouse: “B”, qty: 5 } ] },
   { item: “postcard”, instock: [ { warehouse: “B”, qty: 15 }, { warehouse: “C”, qty: 35 } ] }
]);


db.inventory_new.insertMany([
   { item: “journal”, qty: 25, tags: [“blank”, “red”], size: { h: 14, w: 21, uom: “cm” } },
   { item: “mat”, qty: 85, tags: [“gray”], size: { h: 27.9, w: 35.5, uom: “cm” } },
   { item: “mousepad”, qty: 25, tags: [“gel”, “blue”], size: { h: 19, w: 22.85, uom: “cm” } }
])

> db.inventory.find( {} )

> show collections

accounts
inventory
inventory_new
system.keys
system.users
system.version

We have created 3 user collections called ‘accounts’,’inventory’ & ‘inventory_new’. These 3 collections(tables) shall be replicated to our targets. Connect and Check from MongoDB Compass on your Client Machine

Stage 2: Install Redshift Cluster

Create a VPC with Public and Private Subnets. In a real world production scenario, it is always recommended to put your databases in a Private subnet

1. Create Public and Private Subnet

https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-public-private-vpc.html#run-VPC-wizard

2. Install Redshift in Public Subnet

3. Install Redshift in Private Subnet

If you have different scenario’s of your DMS replication in one VPC and Databases in other VPC or Replicating from on-premise to AWS VPC then you can refer this link : https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.VPC.html

Stage 3: Create DMS Replication Instance, DMS Replication Endpoints & DMS Replication Tasks for MongoDB

Steps : Create Replication Instance > Create Endpoints > Create DMS Tasks

1. Create Replication instance

Go to AWS Console > Database Migration Service  > Replication Instance > Create Replication Instance

2. Create Replication Endpoints

a) MongoDB Replication Endpoint

Go to DMS Console > Endpoints > Create Endpoint. Use this link for configuration for your endpoint > https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.MongoDB.html

In MongoDB as source you have 2 modes available : Document Mode and Table Mode. Some important points to note in this regard are :

  • A record in MongoDB is a document, which is a data structure composed of field and value pairs. The value of a field can include other documents, arrays, and arrays of documents. A document is roughly equivalent to a row in a relational database table.
  • A collection in MongoDB is a group of documents, and is roughly equivalent to a relational database table.
  • Internally, a MongoDB document is stored as a binary JSON (BSON) file in a compressed format that includes a type for each field in the document. Each document has a unique ID.

MongoDB is officially supported on versions 2.6.x and 3.x as a database source only. But I have tested it with MongoDB 4.2, which is the latest community version and it works without any issues, However I would advise to stick with the officially certified versions. AWS DMS supports two migration modes when using MongoDB as a source. You specify the migration mode using the Metadata mode parameter using the AWS Management Console or the extra connection attribute nestingLevel when you create the MongoDB endpoint.

Document mode

In document mode, the MongoDB document is migrated as is, meaning that the document data is consolidated into a single column named _doc in a target table.

Table mode

In table mode, AWS DMS transforms each top-level field in a MongoDB document into a column in the target table. If a field is nested, AWS DMS flattens the nested values into a single column. AWS DMS then adds a key field and data types to the target table’s column set.

Connection Attributes

nestingLevel

Value : NONE

ONE

Description : NONE – Specify NONE to use document mode. Specify ONE to use table mode.

extractDocID

Value :true

false

Description : false – Use this attribute when nestingLevel is set to NONE.

Test the Endpoint

b) Create Redshift Replication Endpoint

Test Redshift Endpoint

Once you create the endpoint for Redshift it will automatically adds a DMS endpoint roles and assigns it to the Redshift role. Further down when we create S3 as target endpoint we need to add the S3 permissions via a managed policy to this same role

dms-access-for-endpoint : arn:aws:iam::775867435088:role/dms-access-for-endpoint

c) Create MongoDB-Redshift Database Migration Task

Go to DMS Console > Conversion & Migration > Database Migrations Tasks > Create Task

Before moving ahead step that the security group of Redshift allows ingress rules for port 5439 for 0.0.0.0/0 or preferably the Security Group ID of your Replication Instance is added to the ingress rules for Redshift SG over port 5439. Check this link for more information : https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Redshift.html

In our case DMS Replication Instance SGID is ‘sg-0a695ef98b6e39963’. So SG of Redshift looks like below:

Refer this documentation for more complex VPC setup, It is beyond the scope of this article : https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.VPC.html

Checking from Redshift..we can see all the 3 tables from mongodb ‘accounts’,’inventory’ & ‘inventory_new’ are created and also the schema ‘admin’ is automatically created by DMS.

Query all the tables to confirm data is replicated

testdb=# select * from admin.accounts;

            _id             |   array_accounts
——————————————————————————————————-
 5e037719f45Btodlcdb492464  | [ { “company” : “Booth-Wade”, “location” : “5681 Mitchell Heights\nFort Adamstad, UT 8019B”, “ip
_address” : “192.168.110.4B”, “name” : “Mark Becker”, “eid” : 27561.0 }, { “company” : “Myers,  Smith and Turner”, “location”
: “USS BenjaminNlinFP0 AA 40236”, “ip_address” : “172.26.254.156”, “name” : “Tyler clark”, “eid” : 87662.0 }, { “company” : “B
owen-Harris”, “location” : “Tracey Plaza East Katietown,Sc74695”, “ip_address” : “172.28.45.209”, “name” : “Veronica Gomez”, “
eid” : 772122.0 } ]
 9772sjs19f45Btodlcdbk49fk4 | [ { “company” : “Trust Co”, “location” : “Zetland Inc.”, “ip_address” : “12.168.210.2B”, “name”
: “Mert Cliff”, “eid” : 4343.0 }, { “company” : “Mist Ltd.”, “location” : “Cliffstone yard”, “ip_address” : “72.32.254.156”, “
name” : “Kris Loff”, “eid” : 76343.0 }, { “company” : “Coles Supermarket”, “location” : “Randwich St”, “ip_address” : “22.28.4
5.110″, “name” : “Will Markbaeur”, “eid” : 13455.0 } ]
(2 rows)

testdb=# select * from admin.inventory;


         oid__id          |   item   |                                array_instock
————————–+———-+——————————————————————————
 5e0bd854fd4602c4b6926d68 | journal  | [ { “warehouse” : “A”, “qty” : 5.0 }, { “warehouse” : “C”, “qty” : 15.0 } ]
 5e0bd854fd4602c4b6926d69 | notebook | [ { “warehouse” : “C”, “qty” : 5.0 } ]
 5e0bd854fd4602c4b6926d6a | paper    | [ { “warehouse” : “A”, “qty” : 60.0 }, { “warehouse” : “B”, “qty” : 15.0 } ]
 5e0bd854fd4602c4b6926d6b | planner  | [ { “warehouse” : “A”, “qty” : 40.0 }, { “warehouse” : “B”, “qty” : 5.0 } ]
 5e0bd854fd4602c4b6926d6c | postcard | [ { “warehouse” : “B”, “qty” : 15.0 }, { “warehouse” : “C”, “qty” : 35.0 } ]
(5 rows)


testdb=# select * from admin.inventory_new;


         oid__id          |   item   | qty |     array_tags     | size.h | size.w | size.uom
————————–+———-+—–+——————–+——–+——–+———-
 5e0bef1775d0b39f2ef66923 | journal  |  25 | [ “blank”, “red” ] |     14 |     21 | cm
 5e0bef1775d0b39f2ef66924 | mat      |  85 | [ “gray” ]         |   27.9 |   35.5 | cm
 5e0bef1775d0b39f2ef66925 | mousepad |  25 | [ “gel”, “blue” ]  |     19 |  22.85 | cm
(3 rows)

Redshift Spectrum Cost of Data Scanned

With Redshift Spectrum, you are billed at $5 per terabyte of data scanned, rounded up to the next megabyte, with a 10 megabyte minimum per query. For example, if you scan 10 gigabytes of data, you will be charged $0.05. If you scan 1 terabyte of data, you will be charged $5.

To find how much data is being transferred by Redshift spectrum queries you will have to look at system table ‘SVL_S3QUERY_SUMMARY’ and a specific field called ‘s3_scanned_bytes’ (number of bytes scanned from Amazon S3). The cost of a Redshift Spectrum query is reflected in the amount of data scanned from Amazon S3.

s3_scanned_bytes – The number of bytes scanned from Amazon S3 and sent to the Redshift Spectrum layer, based on compressed data.

For eg: You can run the below query to determine number of bytes transferred by 1 particular spectrum query

— select s3_scanned_bytes from svl_s3query_summary where query= ;

To determine sum of bytes of all queries from Redshift spectrum

— select sum(s3_scanned_bytes) from svl_s3query_summary ;

To determine sum of bytes of all queries in last one day from Redshift spectrum

— select sum(s3_scanned_bytes) from svl_s3query_summary where starttime >= – interval ’24 hours’;

sum

621900000000
(1 row)

Let’s say we have the figure like the above for one day of spectrum use, then using the Redshift spectrum pricing of $5 per terabyte of data scanned, rounded up to the next megabyte, with a 10 megabyte minimum per query. So we can calculate the cost for one day like below,

621900000000 bytes = 621900000000/1024 = 607324218.75 kilobytes
607324218.75 kilobytes = 607324218.75/1024 = 593090.057373046875 megabytes
593090.057373046875 megabytes = 593090.057373046875 /1024 = 579.189509153366089 gigabytes
579.189509153366089 gigabytes = 579.189509153366089/1024 = 0.565614755032584 terabytes

In this case you will be charged for 0.5657 terabytes.(since it is rounded to the next megabyte) $5*0.5657= 2.83$

So 2.83$ is what you will pay for scanning 0.5657 terabytes of data daily from S3 via Spectrum.

Though there is no SQL query to calculate cost, I have created one to easily summarize the approximate cost of data scanned by loading the bytes value into a cost table and running below SQL:

— create table test_cost (s3_scanned_bytes float8 ) ;
— insert into test_cost values (621900000000);
— select sum(s3_scanned_bytes/1024/1024/1024/1024) s3_scanned_tb, 5*sum(s3_scanned_bytes/1024/1024/1024/1024) cost_in_usd from test_cost ;

s3_scanned_tb | cost_in_usd
——————-+————————————————-
0.565614755032584 | 2.82807377516292
(1 row)

-+ Final Approximate Scan in Terabytes and Cost in USD +-

— select round(sum(s3_scanned_bytes/1024/1024/1024/1024),4) s3_scanned_tb, round(5*sum(s3_scanned_bytes/1024/1024/1024/1024),2) cost_in_usd from test_cost ;

s3_scanned_tb | cost_in_usd
——————-+————————————————-
0.5656 | 2.83
(1 row)

This is the easiest way to find the data transferred through Spectrum and the cost associated with it.

P.S: The system table is rotated so it is better calculate per day and store it in another table, if you need to maintain record of the bytes transferred via Spectrum.

References:

[1] Amazon Redshift pricing – https://aws.amazon.com/redshift/pricing/#Redshift_Spectrum_Pricing
[2] Monitoring Metrics in Amazon Redshift Spectrum – https://docs.aws.amazon.com/redshift/latest/dg/c-spectrum-metrics.html
[3] SVL_S3QUERY_SUMMARY – https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_S3QUERY_SUMMARY.html

Generate Fake Data using Python

Being a data engineer, one of the tasks which you have to do almost on a daily basis is load huge amounts of data into your data warehouse or data lakes. Sometimes to do benchmark load times or emulate performance tuning issues in your test environment, you need to use test datasets. While their is a lot of very good huge open datasets available on Kaggle and AWS

But instead of having actual data all you need is a CSV file with dummy data in it. Fear not, up comes Python to the resuce. Python is the golden goose in the age of information not only can it help you sort through massive amounts of data it can also help you generate data.

Faker is a Python package which can generate fake data for you. First you need to pip install faker. For this excercise we are using Python 3.7.2

$ python -m pip install faker

— Script to Generate a CSV file with Fake Data and 1 Billion Rows —

Caution : The file size will be about 1.3GB and it can really hammer your machine. I have an Ec2 instance on which i generate this test data and let it leave running in the background. You can use multiprocessor in Python and hammer all cores but that is a discussion worthy of it’s own blog post.

import csv
 import random
 from time import time
 from decimal import Decimal
 from faker import Faker
RECORD_COUNT = 1000000000
 fake = Faker()
    writer.writeheader()
    for i in range(RECORD_COUNT):
        writer.writerow(
            {
                'userid': fake.ean8(),
                'username': fake.user_name(),
                'firstname': fake.first_name(),
                'lastname': fake.last_name(),
                'city': fake.city(),
                'state': fake.state_abbr(),
                'email': fake.email(),
                'phone': fake.phone_number(),
                'cardno': fake.credit_card_number(card_type=None),
                'likesports': fake.null_boolean(),
                'liketheatre': fake.null_boolean(),
                'likeconcerts': fake.null_boolean(),
                'likejazz': fake.null_boolean(),
                'likeclassical': fake.null_boolean(),
                'likeopera': fake.null_boolean(),
                'likerock': fake.null_boolean(),
                'likevegas': fake.null_boolean(),
                'likebroadway': fake.null_boolean(),
                'likemusicals': fake.null_boolean(),
            }
        )
if name == 'main':
     create_csv_file()

This will create a file users1.csv with a billion rows and generated fake data which is almost like real data

Attached Script :