Data streaming from postgresql to clickhouse via logical replication mechanism
Continuous data transfer from PostgreSQL to ClickHouse using logical replication mechanism.
Currently pg2ch tool is in active testing stage, as for now it is not for production use
Get:
go get -u github.com/mkabilov/pg2ch
Run:
pg2ch --config {path to the config file (default config.yaml)}
tables: {postgresql table name}: main_table: {clickhouse table name} buffer_table: {clickhouse buffer table name} # optional, if not specified, insert directly to the main table buffer_row_id: {clickhouse buffer table column name for row id} init_sync_skip: {skip initial copy of the data} init_sync_skip_buffer_table: {if true bypass buffer_table and write directly to the main_table on initial sync copy} # makes sense in case of huge tables init_sync_skip_truncate: {skip truncate of the main_table during init sync} engine: {clickhouse table engine: MergeTree, ReplacingMergeTree or CollapsingMergeTree} max_buffer_length: {number of DML(insert/update/delete) commands to store in the memory before flushing to the buffer/main table } merge_threshold: {if buffer table specified, number of buffer flushed before moving data from buffer to the main table} columns: # postgres - clickhouse column name mapping, # if not present, all the columns are expected to be on the clickhouse side with the exact same names {postgresql column name}: {clickhouse column name} is_deleted_column: # in case of ReplacingMergeTree 1 will be stored in the {is_deleted_column} in order to mark deleted rows sign_column: {clickhouse sign column name for CollapsingMergeTree engines only, default "sign"} ver_column: {clickhouse version column name for the ReplacingMergeTree engine, default "ver"}inactivity_merge_timeout: {interval, default 1 min} # merge buffered data after that timeout
clickhouse: # clickhouse tcp protocol connection params host: {clickhouse host, default 127.0.0.1} port: {tcp port, default 9000} database: {database name} username: {username} password: {password} params: {extra param name}:{extra param value} ...
postgres: # postgresql connection params host: {host name, default 127.0.0.1} port: {port, default 5432} database: {database name} user: {user} replication_slot_name: {logical replication slot name} publication_name: {postgresql publication name}
db_path: {path to the persistent storage dir where table lsn positions will be stored}
localhost:5432
wal_levelin the postgresql config file to
logical
max_replication_slotsto at least
2
localhost:9000e.g. in the docker
pg2ch_testin PostgreSQL:
CREATE DATABASE pg2ch_test;
pgbench -U postgres -d pg2ch_test -i
pgbench_accountstable to FULL, so that we'll receive old values of the updated rows:
ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;
pgbench_accountstable:
CREATE PUBLICATION pg2ch_pub FOR TABLE pgbench_accounts;
SELECT * FROM pg_create_logical_replication_slot('pg2ch_slot', 'pgoutput');
CREATE TABLE pgbenchaccountsbuf (aid Int32, abalance Int32, sign Int8, rowid UInt64) ENGINE = Memory() -- will be used as a buffer table
- create `config.yaml` file with the following content:yaml tables: pgbenchaccounts: maintable: pgbenchaccounts buffertable: pgbenchaccountsbuf bufferrowid: rowid engine: CollapsingMergeTree maxbufferlength: 1000 mergethreshold: 4 columns: aid: aid abalance: abalance signcolumn: sign
inactivitymergetimeout: '10s'
clickhouse: host: localhost port: 9000 database: default username: default postgres: host: localhost port: 5432 database: pg2chtest user: postgres replicationslotname: pg2chslot publicationname: pg2chpub
db_path: db ```
run pg2ch to start replication:
bash pg2ch --config config.yaml
run
pgbenchto have some test load:
bash pgbench -U postgres -d pg2ch_test --time 30 --client 10
wait for
inactivity_merge_timeoutperiod (in our case 10 seconds) so that data in the memory gets flushed to the table in ClickHouse
check the sums of the
abalancecolumn both on ClickHouse and PostgreSQL:
SELECT SUM(abalance * sign), SUM(sign) FROM pgbench_accounts(why multiply by
signcolumn?)
SELECT SUM(abalance), COUNT(*) FROM pgbench_accounts
numbers must match; if not, please open an issue.