[PostgreSQL] Publication: WAL ๊ธฐ๋ฐ˜ ๋ณต์ œ/CDC

Publication์€ PostgreSQL์—์„œ ์ž์ฒด์ ์œผ๋กœ ์ œ๊ณตํ•˜๋Š” ๋ฐ์ดํ„ฐ ๋ณต์ œ์šฉ ๊ธฐ๋Šฅ์ด๋‹ค.
WAL์„ ๊ทธ๋Œ€๋กœ ์ŠคํŠธ๋ฆฌ๋ฐํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๋ฉฐ, Replica ๊ตฌ์„ฑ์„ ์œ„ํ•œ ๋ณต์ œ๋‚˜ CDC ๊ธฐ๋Šฅ ๋“ฑ์— ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.




์ œํ•œ

DML, INSERT/UPDATE/DELETE์— ๋Œ€ํ•ด์„œ๋งŒ ์œ ํšจํ•˜๊ฒŒ ๋™์ž‘ํ•œ๋‹ค. DDL ๋“ฑ์— ๋Œ€ํ•ด์„œ๋Š” ๋ฐ˜์‘ํ•  ์ˆ˜ ์—†๋‹ค.




Publication ๋งŒ๋“ค๊ธฐ

๋ณต์ œ ๊ฒฝ๋กœ๋ฅผ ๋šซ์œผ๋ ค๋ฉด ๋จผ์ ธ publication์ด๋ผ๋Š” ์ผ์ข…์˜ ์ฑ„๋„์„ ๋งŒ๋“ค์–ด์•ผ ํ•œ๋‹ค.
๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค.

CREATE PUBLICATION ์ด๋ฆ„ FOR ALL TABLES -- ๋ชจ๋“  ํ…Œ์ด๋ธ”
CREATE PUBLICATION ์ด๋ฆ„ FOR ONLY TABLE ํ…Œ์ด๋ธ”, ํ…Œ์ด๋ธ”2 -- ํŠน์ • ํ…Œ์ด๋ธ”
CREATE PUBLICATION ์ด๋ฆ„ FOR TABLE ONLY ํ…Œ์ด๋ธ” WITH (publish = 'update'); -- ํŠน์ • ํ…Œ์ด๋ธ”์˜ update ์—ฐ์‚ฐ๋งŒ

์˜ˆ๋ฅผ ๋“ค์–ด global_state๋ผ๋Š” ํ…Œ์ด๋ธ”์— ๋Œ€ํ•ด update/insert ๊ธฐ๋ก์„ ์กฐํšŒํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด, ์ด๋Ÿฐ ์‹์œผ๋กœ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค.

publication ๋ชฉ๋ก์€ ์ด๊ฑธ๋กœ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๊ณ 

SELECT * FROM pg_publication order by oid desc

publication์— ์†ํ•œ ํ…Œ์ด๋ธ” ๋ชฉ๋ก์€ ์ด๊ฑธ๋กœ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๋‹ค.


SELECT * FROM pg_publication_tables where pubname = 'pub ์ด๋ฆ„'

์‚ญ์ œ๋Š” ๋ฐ˜๋Œ€๋กœ DROP ๋ช…๋ น์„ ํ†ตํ•ด์„œ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

์ƒ์„ฑ ์ดํ›„์— ํ…Œ์ด๋ธ”์„ ์ถ”๊ฐ€ํ•˜๊ฑฐ๋‚˜ ์ œ๊ฑฐํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด, ALTER PUBLICATION์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

ADD TABLE์ด๋‚˜ DROP TABLE์„ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.




Slot ๋งŒ๋“ค๊ธฐ

๋ณ€๊ฒฝ๋‚ด์—ญ์„ ์กฐํšŒํ•˜๋ ค๋ฉด ์Šฌ๋กฏ์ด๋ž€๊ฑธ ๋˜ ๋งŒ๋“ค์–ด์•ผ ํ•œ๋‹ค.
์Šฌ๋กฏ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•ด์„œ ์ผ์ข…์˜ ํŽ˜์ด์ง€๋„ค์ด์…˜ ์กฐํšŒ๋ฅผ ํ•  ์ˆ˜ ์žˆ๋‹ค.

SELECT pg_create_logical_replication_slot(์Šฌ๋กฏ์ด๋ฆ„, '์ถœ๋ ฅ ํฌ๋งท');

์ถœ๋ ฅ ํฌ๋งท์€ ์‘๋‹ต์ด ์–ด๋–ค ํ˜•ํƒœ๋กœ ๋‚˜์˜ฌ์ง€๋ฅผ ์ •์˜ํ•œ๋‹ค.
pgoutput์ด ๊ฐ€์žฅ ์ผ๋ฐ˜์ ์ธ ์„ ํƒ์ด๊ณ , ๋ฐ”์ด๋„ˆ๋ฆฌ ํ˜•ํƒœ๋กœ ์‘๋‹ต์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค. pgoutput์„ ์“ธ ๊ฒฝ์šฐ์—๋Š” ๋‹จ์ˆœํ•œ SQL ์ธํ„ฐํŽ˜์ด์Šค๋กœ๋Š” ์กฐํšŒ๋ฅผ ํ•  ์ˆ˜ ์—†๋‹ค.


์ด๋ ‡๊ฒŒ ๋งŒ๋“ค๊ณ  ๋‚˜๋ฉด

๋งŒ๋“ค์–ด์ง„ slot์€ ๋‹ค์Œ ์ฟผ๋ฆฌ๋กœ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๊ณ 

select * from pg_replication_slots

์Šฌ๋กฏ์„ ์ง€์šฐ๊ณ ์‹ถ๋‹ค๋ฉด ์ด๋ ‡๊ฒŒ ์ง€์šธ ์ˆ˜ ์žˆ๋‹ค.

SELECT pg_drop_replication_slot('์Šฌ๋กฏ์ด๋ฆ„');




๋ณ€๊ฒฝ ๋ฐ์ดํ„ฐ ๊ตฌ๋…ํ•˜๊ธฐ

๋ณ€๊ฒฝ ๋ฐ์ดํ„ฐ ๊ตฌ๋…์€ pg_logical_slot_peek_binary_changes ํ˜น์€ pg_logical_slot_get_binary_changes์„ ์‚ฌ์šฉํ•ด์„œ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.
peek์€ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒ๋งŒ ํ•˜๊ณ , get์€ ์กฐํšŒ ํ›„ ์†Œ๋น„๊นŒ์ง€ ํ•œ๋‹ค.


SELECT *
FROM pg_logical_slot_peek_binary_changes('์Šฌ๋กฏ์ด๋ฆ„', NULL, NULL, 'proto_version', '1', 'publication_names', 'publication_์ด๋ฆ„')
SELECT *
FROM pg_logical_slot_peek_binary_changes('์Šฌ๋กฏ์ด๋ฆ„', NULL, NULL, 'proto_version', '1', 'publication_names', 'publication_์ด๋ฆ„')

๊ทธ๋Ÿผ ์ด๋Ÿฐ์‹์œผ๋กœ ์กฐํšŒ๊ฐ€ ๋˜๋Š”๋ฐ, ๋ฌธ์ œ๋Š” ์ € ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐ”์ด๋„ˆ๋ฆฌ ์ธ์ฝ”๋”ฉ์ด๋ผ์„œ ํ•ด์„ํ•˜๋ ค๋ฉด ๋””์ฝ”๋”ฉ์„ ํ•ด์•ผ ํ•œ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค. ์ด๊ฑด SQL ๋‚ด์—์„œ ์ฒ˜๋ฆฌํ•˜๊ธด ์ข€ ๊นŒ๋‹ค๋กœ์šธ ์ˆ˜ ์žˆ๊ณ , ํ”„๋กœ๊ทธ๋žจ์œผ๋กœ ํ•ด๊ฒฐํ•ด์•ผ ํ•œ๋‹ค.





Consumer ์ฝ”๋“œ ์ž‘์„ฑ (Go)

์˜ˆ์‹œ ์ฝ”๋“œ๋Š” ๋‹ค์Œ ์œ„์น˜์— ์žˆ๋‹ค.
https://github.com/myyrakle/go_example/blob/master/database/postgresql/publication.go

๊ทธ๋ฆฌ๊ณ  ํ…Œ์ด๋ธ”์—์„œ ๊ฐ’์„ ๋ณ€๊ฒฝํ•ด๋ณด๊ณ 

Consumer๋ฅผ ์‹คํ–‰ํ•ด๋ณด๋ฉด

์ด๋Ÿฐ ์‹์œผ๋กœ ์ฐํž ๊ฒƒ์ด๋‹ค.
์ฐธ๊ณ ๋กœ, ์ €๋ ‡๊ฒŒ ์กฐํšŒ๋˜๋Š” ๋ฉ”์„ธ์ง€ ์ค‘์—๋Š” ์“ธ๋ชจ์—†๋Š” transaction/rollback ๋“ฑ์ด ์„ž์—ฌ์žˆ๋‹ค.
๊ทธ๋ž˜์„œ ์‹ค์ œ ๋ฉ”์„ธ์ง€๋Š” ๋ช‡๋งŒ๊ฐœ ๋‚จ์ง“์ธ๋ฐ ์‹ค์ œ ๋ณ€๊ฒฝ์— ๋Œ€ํ•œ ์ด๋ฒคํŠธ๋Š” ํ•˜๋‚˜์ธ ๊ฒƒ์ด๋‹ค. ์ด๊ฑด ๊ทธ๋ƒฅ ์ ๋‹นํžˆ ์•Œ์•„์„œ ๊ฑฐ๋ฅด๋Š”์ˆ˜๋ฐ–์— ์—†๋‹ค.

๊ทธ๋ฆฌ๊ณ  peek๋Š” ์ฝ๊ธฐ์ „์šฉ์ด๋ผ์„œ ์ฝ๋”๋ผ๋„ ์†Œ๋ชจ๊ฐ€ ๋˜์ง€ ์•Š๊ณ  ๊ณ„์† ๋…ธ์ถœ์ด ๋œ๋‹ค.

get ๋ฒ„์ „์„ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด ์ฝ๊ธฐ ์ดํ›„์— ์†Œ๋ชจํ•ด์„œ


๋‹ค์‹œ ์กฐํšŒํ•˜๋ฉด ๊ทธ ์ดํ›„์˜ ๋ฐ์ดํ„ฐ๋งŒ ์กฐํšŒ๊ฐ€ ๋  ๊ฒƒ์ด๋‹ค.





์ˆ˜๋™ ์Šฌ๋กฏ ์œ„์น˜ ์กฐ์ ˆ

peek๋Š” ์Šฌ๋กฏ ์œ„์น˜๋ฅผ ์ด๋™ํ•˜์ง€ ์•Š๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ๊บผ๋‚ด์˜ค๊ณ , get์€ ์œ„์น˜๋ฅผ ์ด๋™์‹œํ‚ค๋ฉด์„œ ๊บผ๋‚ด์˜จ๋‹ค๊ณ  ํ–ˆ๋‹ค.

๊ทผ๋ฐ ๊ทธ๋Ÿฌ๋ฉด ๋ฐ์ดํ„ฐ ๊ฐ€๊ณต์„ ์™„๋ฃŒํ•œ ๋’ค์— ์œ„์น˜๋ฅผ ์ด๋™์‹œํ‚ค๋ ค๋ฉด ์–ด๋–ป๊ฒŒ ํ• ๊นŒ?
get์„ ์ผ๋Š”๋ฐ, ์˜ค๋ฅ˜๊ฐ€ ๋‚˜๋ฉด ๊ทธ๋Œ€๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์ฆ๋ฐœํ•˜๋Š” ์…ˆ์ด๊ณ , peek๋ฅผ ์“ฐ๋ฉด ์œ„์น˜๊ฐ€ ์ด๋™๋˜์ง€ ์•Š๋Š”๋‹ค.

๊ทธ๋Ÿด๋•Œ๋Š” pg_replication_slot_advance๋ผ๋Š” ํ•จ์ˆ˜๋ฅผ ์“ธ ์ˆ˜ ์žˆ๋‹ค.

pg_replication_slot_advance("์Šฌ๋กฏ์ด๋ฆ„", "์ปค์„œ(lsn)");

๋งŒ์•ฝ ์ €๊ธฐ์„œ 6๋ฒˆ์งธ๊นŒ์ง€ ๋‹ค ์ฒ˜๋ฆฌํ–ˆ๋‹ค๊ณ  ํ•˜๋ฉด


์ด๋ ‡๊ฒŒ ์Šฌ๋กฏ ์œ„์น˜๋ฅผ ์ด๋™์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค.


๊ทธ๋Ÿฌ๋ฉด ์ดํ›„์— ์กฐํšŒํ•  ๋•Œ๋Š” ๊ทธ ๋กœ๊ทธ๋ถ€ํ„ฐ ์กฐํšŒ๊ฐ€ ๋  ๊ฒƒ์ด๋‹ค.




๋ฌธ์ œ: WAL ๋งŒ๋ฃŒ

์Šฌ๋กฏ์„ ์˜ค๋ž˜ ์“ฐ๋‹ค๋ณด๋ฉด WAL์ด ๋‚ ๋ผ๊ฐ€์„œ ์ฝ๊ธฐ๊ฐ€ ๋ถˆ๊ฐ€๋Šฅํ•œ ์ƒํƒœ๊ฐ€ ๋  ์ˆ˜ ์žˆ๋‹ค.

์ด ์ƒํƒœ์—์„œ ๋‚ด๊ฐ€ ์•Œ๊ธฐ๋กœ ์Šฌ๋กฏ์„ ์‚ด๋ฆฌ๋Š”๊ฑด ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค.
์•„๋งˆ ์‚ญ์ œ ํ›„์— ๋‹ค์‹œ ๋งŒ๋“ค์–ด์•ผ ํ•  ๊ฒƒ์ด๋‹ค.

์ด๊ฑธ ์ข€ ๋œ ๋‚ ๋ผ๊ฐ€๊ฒŒ ๋งŒ๋“ค๋ ค๋ฉด, postgres์—์„œ wal ๋ณด์กด ๊ด€๋ จ๋œ ์˜ต์…˜์„ ์กฐ์ ˆํ•ด์•ผ ํ•œ๋‹ค.




์ฐธ์กฐ
https://www.postgresql.org/docs/9.4/catalog-pg-replication-slots.html
https://blog.ex-em.com/1809
https://postgresql.kr/docs/12/logicaldecoding-example.html
https://blog.ex-em.com/1817