Stream Changes from PostgreSQL to Any Destination with Change Data Capture

Stream Changes from PostgreSQL to Any Destination with Change Data Capture

February 20, 2023

Yevgeny Pats
Name
Yevgeny Pats
Twitter
@yevgenypats

One of the most powerful features of PostgreSQL is its ability to replicate data, and in this article we'll take a deep dive into the topic of logical replication (For running CloudQuery PostgreSQL CDC source plugin take a look at our docs).

What is CDC?

CDC (Change-Data-Capture) (opens in a new tab) is a set of design patterns in databases where one can track changes to the data and act upon it. In PostgreSQL, CDC can be achieved using the logical replication (opens in a new tab) feature.

What is Logical Replication?

PostgreSQL supports two types of replication: physical replication and logical replication. Physical replication creates a complete copy of the data, including the underlying storage structure, whereas logical replication only replicates changes to the data, such as insertions, updates, and deletions of rows in tables. Logical replication provides a more flexible and efficient way to replicate data as it only replicates changes to the data, rather than the entire dataset. This can help to minimize the overhead of replication and reduce the amount of disk space required to store the replicated data.

How Does Logical Replication Work?

In PostgreSQL, logical replication is achieved by replicating changes made to the data in the form of write-ahead log (WAL) records. These records contain all the necessary information to recreate the changes that were made to the data, including the type of change (insert, update, or delete), the time the change was made, and the actual data that was changed.

The WAL records are written to disk on the master server, and a separate process, known as the WAL sender, sends the WAL records to the replica server. The replica server then applies the changes to its own copy of the data. This process happens in real-time, so the replica server's copy of the data is always up-to-date with the master server's copy.

How CloudQuery Uses Postgres Logical Replication to Sync Postgres to Any Other Destination

CloudQuery PostgreSQL source plugin provides the ability to sync data and changes in real time from a PostgreSQL database, directly to any of the supported destinations, without the need for Kafka or any other intermediary. (You can still stream to Kafka or Google PubSub.) This gives a much simpler setup to operate: just run one CloudQuery binary!

In the following section, we will go through how our CloudQuery PostgreSQL source plugin works internally. (For running and configuring, refer to the docs (opens in a new tab).)

Set wal_level to logical

To use logical replication, you will need to make sure you started your database with wal_level.

See our documentation on how to set wal_level to logical in various environments.

Create a publication

CloudQuery creates a publication for all or a subset of the specified tables via the CREATE PUBLICATION (opens in a new tab) command. This publication will then be used in the next step when we set up logical replication.

Create logical replication

Now we are ready to start replicating. CloudQuery creates a replication slot and exports a snapshot via CREATE_REPLICATION_SLOT slot_name LOGICAL pgoutput EXPORT_SNAPSHOT (opens in a new tab).

The above command creates two things:

  • Replication slot name: We will use it when we start replicating and subscribing to changes
  • Snapshot name: We will use this snapshot to replicate the initial content of the database at the exact point in time before the replication started.

Replicate the initial snapshot

Via the snapshot in the previous step, CloudQuery will extract all rows from all the specified tables, transform them into CloudQuery's type system, and send them to the specified CloudQuery destination.

Listen and stream changes

Now that the initial snapshot of the database is synced, we can start sending the changes we receive from the logical replication slot.

We start listening to the replication changes via the START_REPLICATION (opens in a new tab) command. This is a low-level command of the streaming replication protocol, so it's not really possible to start it via psql, and we use pglogrepl (opens in a new tab) and pgx (opens in a new tab) drivers.

For each received message, we parse it per the protocol spec defined here (opens in a new tab) and transform it to CloudQuery's type system to be sent to the destination.

Message acknowledgement

Every X messages that are sent to the destination, we need to send back to the Postgres the last position that was received so it will know not to resend that data again. This is needed in case the process is stopped, so we can resume it from the exact same location.

We send the acknowledgement position every X messages or Y timeout to avoid bombarding Postgres with messages and basically batch our ack messages.

The ack command is sent via the Status Update Message.

Summary

That's it! It's quite a lot of PostgreSQL internals, so we hope you enjoyed it. The good news is that, by using CloudQuery source plugin, all those internals are pretty much abstracted away so you can focus on just what you want to replicate and where to replicate it!