Introduction: Change Data Capture (CDC) is a technique used to track changes in a database, such as inserts, updates, and deletes. In this blog post, we will show you how to implement a custom CDC in PostgreSQL to track changes in your database. By using a custom CDC, you can keep a record of changes in your database and use that information in your applications, such as to provide a history of changes, track auditing information, or trigger updates in other systems
Implementing a Custom CDC in PostgreSQL: To implement a custom CDC in PostgreSQL, you will need to create a new table to store the change information, create a trigger function that will be executed whenever a change is made in the target table, and create a trigger that will call the trigger function. The trigger function will insert a new row into the change table with the relevant information, such as the old and new values of the record, the time of the change, and any other relevant information.
To demonstrate this, we will show you an example of a custom CDC for a table called “employee”. The change table will be called “employee_cdc” and will contain columns for the timestamp, employee ID, old values, and new values of the employee record. The trigger function will be executed after an update on the “employee” table and will insert a new row into the “employee_cdc” table with the relevant information. Finally, we will show you how to query the “employee_cdc” table to retrieve a list of all changes that have occurred in the “employee” table since a certain timestamp.
Create the Employee and CDC table
To store the CDC information, you need to create a new table in your PostgreSQL database. In this example, we will create a table called “employee”, “employee_cdc”, “employee_audit” with the following columns:
CREATE TABLE employee ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, department VARCHAR(50) NOT NULL, salary NUMERIC(10,2) NOT NULL, hire_date DATE NOT NULL );
In this table, “id” is an auto-incremented primary key, “timestamp” is a timestamp with time zone to store the time of the change, “employee_id” is the primary key of the employee record that was changed, and “old_values” and “new_values” are text columns to store the old and new values of the employee record, respectively.
To capture the changes in the employee table, you will need to create a trigger function that will be executed whenever a record is inserted, updated, or deleted in the table. The trigger function will insert a new row into the “employee_cdc” table with the relevant information. Here is an example trigger function:
CREATE OR REPLACE FUNCTION employee_cdc() RETURNS TRIGGER AS $$ BEGIN IF (TG_OP = 'UPDATE') THEN INSERT INTO employee_cdc (timestamp, employee_id, old_values, new_values) VALUES (now(), NEW.id, row_to_json(OLD), row_to_json(NEW)); INSERT INTO employee_audit (employee_id, old_values, new_values) VALUES (NEW.id, row_to_json(OLD), row_to_json(NEW)); END IF; RETURN NULL; END; $$ LANGUAGE plpgsql;
This trigger function uses the “row_to_json” function to convert the old and new values of the employee record into JSON strings, which are then stored in the “old_values” and “new_values” columns of the “employee_cdc” table. The “NOW()” function is used to get the current timestamp.
4. Create the trigger
Now that the trigger function has been created, you need to create the trigger on the “employee” table that will call the function whenever a record is updated. You can create the trigger with the following command:
CREATE TRIGGER employee_cdc_trigger AFTER UPDATE ON employee FOR EACH ROW EXECUTE FUNCTION employee_cdc();
4. Query the CDC table
In your application code, you can query the “employee_cdc” table to get a list of all changes that have occurred since a certain timestamp. For example, to get all changes since January 1st, 2023, you can use the following SQL query:
SELECT * FROM employee_cdc WHERE timestamp >= '2023-01-01 00:00:00';
You can then process these changes as needed in your application code.
Conclusion: In this blog post, we have shown you how to implement a custom Change Data Capture (CDC) in PostgreSQL to track changes in your database. By using a custom CDC, you can keep a record of changes in your database and use that information in your applications. Whether you are tracking changes for auditing purposes, providing a history of changes, or triggering updates in other systems, a custom CDC is a useful tool to have in your PostgreSQL toolkit.
Create DMS Replication From MongoDB 4.2 on EC2 Linux to Redshift
We will create a hub to spoke replication from Mongo DB 4.2 Database to Redshift Schema. MongoDb is installed in the same VPC as Redshift and DMS replication Instance
MongoDB is a NoSQL Datastore where data in inserted and read in JSON format. Internally, a MongoDB document is stored as a binary JSON (BSON) file in a compressed format. Terminology for database, schema and tables in MongoDB is a bit different when compared to relational databases. Here’s how each jargon in MongoDb compares to one in Redshift/PostgreSQL
MongoDB does not have a a proper structure for a schema and it is is essentially a document store so data can be inserted without defining a proper schema or a structure. This gives MongoDB a lot of flexibility and hence it is the preferred choice for modern applications as you can start development of your application without defining a data model first. It is a schema-less approach to software architecture, which has its own pros and cons.
In our example we will work through the default database in MongoDB called “admin” and create collections(tables) in it and those tables will be replicated to an equivalent schema in Redshift called “admin” which will hold the different tables. We will go about this in 3 stages:
Stage 1 : Create MongoDB on EC2 AMZN Linux, create the collections and connect to MongoDB from your client machine and check the configuration.
Stage 2: Create Redshift Cluster, Aurora PostgreSQL in Private and Public Subnet and Connect to the All Database instances and check.
Stage 3: Create DMS Replication Instance, DMS Replication Endpoints & DMS Replication Tasks.And finally we will check if all data is being replicated by DMS to all the targets.
Architecture : It is a Hub-to-Spoke Architecture with MongoDB Source Being Replicated to Multiple Heterogeneous Targets. However in this Article we will only configure Replication from MongoDB to Redshift.
Stage 1: Create a EC2 Amazon Linux in a Public Subnet and Install MongoDB in it
You can use AWS ‘MongoDB Quick Start’ guide on AWS to deploy MongoDB into a new VPC or deploy into an existing VPC. The guide has two cloud formation templates which can create a new VPC under your account, configure the public & private subnets and launch the EC2 instances with latest version of MongoDB installed. Check this link for the quick deployment options for MongoDB on AWS : https://docs.aws.amazon.com/quickstart/latest/mongodb/step2.html
If you want to install MongoDB manually then you can follow the below procedure :
We have created 3 user collections called ‘accounts’,’inventory’ & ‘inventory_new’. These 3 collections(tables) shall be replicated to our targets. Connect and Check from MongoDB Compass on your Client Machine
Stage 2: Install Redshift Cluster
Create a VPC with Public and Private Subnets. In a real world production scenario, it is always recommended to put your databases in a Private subnet
In MongoDB as source you have 2 modes available : Document Mode and Table Mode. Some important points to note in this regard are :
A record in MongoDB is a document, which is a data structure composed of field and value pairs. The value of a field can include other documents, arrays, and arrays of documents. A document is roughly equivalent to a row in a relational database table.
A collection in MongoDB is a group of documents, and is roughly equivalent to a relational database table.
Internally, a MongoDB document is stored as a binary JSON (BSON) file in a compressed format that includes a type for each field in the document. Each document has a unique ID.
MongoDB is officially supported on versions 2.6.x and 3.x as a database source only. But I have tested it with MongoDB 4.2, which is the latest community version and it works without any issues, However I would advise to stick with the officially certified versions. AWS DMS supports two migration modes when using MongoDB as a source. You specify the migration mode using the Metadata mode parameter using the AWS Management Console or the extra connection attribute nestingLevel when you create the MongoDB endpoint.
In document mode, the MongoDB document is migrated as is, meaning that the document data is consolidated into a single column named _doc in a target table.
In table mode, AWS DMS transforms each top-level field in a MongoDB document into a column in the target table. If a field is nested, AWS DMS flattens the nested values into a single column. AWS DMS then adds a key field and data types to the target table’s column set.
Value : NONE
Description : NONE – Specify NONE to use document mode. Specify ONE to use table mode.
Description : false – Use this attribute when nestingLevel is set to NONE.
Test the Endpoint
b) Create Redshift Replication Endpoint
Test Redshift Endpoint
Once you create the endpoint for Redshift it will automatically adds a DMS endpoint roles and assigns it to the Redshift role. Further down when we create S3 as target endpoint we need to add the S3 permissions via a managed policy to this same role
With Redshift Spectrum, you are billed at $5 per terabyte of data scanned, rounded up to the next megabyte, with a 10 megabyte minimum per query. For example, if you scan 10 gigabytes of data, you will be charged $0.05. If you scan 1 terabyte of data, you will be charged $5.
To find how much data is being transferred by Redshift spectrum queries you will have to look at system table ‘SVL_S3QUERY_SUMMARY’ and a specific field called ‘s3_scanned_bytes’ (number of bytes scanned from Amazon S3). The cost of a Redshift Spectrum query is reflected in the amount of data scanned from Amazon S3.
s3_scanned_bytes – The number of bytes scanned from Amazon S3 and sent to the Redshift Spectrum layer, based on compressed data.
For eg: You can run the below query to determine number of bytes transferred by 1 particular spectrum query
— select s3_scanned_bytes from svl_s3query_summary where query= ;
To determine sum of bytes of all queries from Redshift spectrum
— select sum(s3_scanned_bytes) from svl_s3query_summary ;
To determine sum of bytes of all queries in last one day from Redshift spectrum
— select sum(s3_scanned_bytes) from svl_s3query_summary where starttime >= – interval ’24 hours’;
Let’s say we have the figure like the above for one day of spectrum use, then using the Redshift spectrum pricing of $5 per terabyte of data scanned, rounded up to the next megabyte, with a 10 megabyte minimum per query. So we can calculate the cost for one day like below,
Being a data engineer, one of the tasks which you have to do almost on a daily basis is load huge amounts of data into your data warehouse or data lakes. Sometimes to do benchmark load times or emulate performance tuning issues in your test environment, you need to use test datasets. While their is a lot of very good huge open datasets available on Kaggle and AWS
But instead of having actual data all you need is a CSV file with dummy data in it. Fear not, up comes Python to the resuce. Python is the golden goose in the age of information not only can it help you sort through massive amounts of data it can also help you generate data.
Faker is a Python package which can generate fake data for you. First you need to pip install faker. For this excercise we are using Python 3.7.2
$ python -m pip install faker
— Script to Generate a CSV file with Fake Data and 1 Billion Rows —
Caution : The file size will be about 1.3GB and it can really hammer your machine. I have an Ec2 instance on which i generate this test data and let it leave running in the background. You can use multiprocessor in Python and hammer all cores but that is a discussion worthy of it’s own blog post.
from time import time
from decimal import Decimal
from faker import Faker