[PostgreSQL] Publication: WAL ๊ธฐ๋ฐ ๋ณต์ /CDC
Publication์ PostgreSQL์์ ์์ฒด์ ์ผ๋ก ์ ๊ณตํ๋ ๋ฐ์ดํฐ ๋ณต์ ์ฉ ๊ธฐ๋ฅ์ด๋ค.
WAL์ ๊ทธ๋๋ก ์คํธ๋ฆฌ๋ฐํ ์ ์๊ฒ ํด์ฃผ๋ฉฐ, Replica ๊ตฌ์ฑ์ ์ํ ๋ณต์ ๋ CDC ๊ธฐ๋ฅ ๋ฑ์ ํ์ฉํ ์ ์๋ค.
์ ํ
DML, INSERT/UPDATE/DELETE์ ๋ํด์๋ง ์ ํจํ๊ฒ ๋์ํ๋ค. DDL ๋ฑ์ ๋ํด์๋ ๋ฐ์ํ ์ ์๋ค.
Publication ๋ง๋ค๊ธฐ
๋ณต์ ๊ฒฝ๋ก๋ฅผ ๋ซ์ผ๋ ค๋ฉด ๋จผ์ ธ publication์ด๋ผ๋ ์ผ์ข
์ ์ฑ๋์ ๋ง๋ค์ด์ผ ํ๋ค.
๋ค์๊ณผ ๊ฐ์ด ๋ง๋ค ์ ์๋ค.
CREATE PUBLICATION ์ด๋ฆ FOR ALL TABLES -- ๋ชจ๋ ํ
์ด๋ธCREATE PUBLICATION ์ด๋ฆ FOR ONLY TABLE ํ
์ด๋ธ, ํ
์ด๋ธ2 -- ํน์ ํ
์ด๋ธCREATE PUBLICATION ์ด๋ฆ FOR TABLE ONLY ํ
์ด๋ธ WITH (publish = 'update'); -- ํน์ ํ
์ด๋ธ์ update ์ฐ์ฐ๋ง
์๋ฅผ ๋ค์ด global_state๋ผ๋ ํ ์ด๋ธ์ ๋ํด update/insert ๊ธฐ๋ก์ ์กฐํํ๊ณ ์ถ๋ค๋ฉด, ์ด๋ฐ ์์ผ๋ก ๋ง๋ค ์ ์๋ค.

publication ๋ชฉ๋ก์ ์ด๊ฑธ๋ก ์กฐํํ ์ ์๊ณ
SELECT * FROM pg_publication order by oid desc

publication์ ์ํ ํ ์ด๋ธ ๋ชฉ๋ก์ ์ด๊ฑธ๋ก ์กฐํํ ์ ์๋ค.
SELECT * FROM pg_publication_tables where pubname = 'pub ์ด๋ฆ'


์ญ์ ๋ ๋ฐ๋๋ก DROP ๋ช ๋ น์ ํตํด์ ์ฒ๋ฆฌํ ์ ์๋ค.

์์ฑ ์ดํ์ ํ ์ด๋ธ์ ์ถ๊ฐํ๊ฑฐ๋ ์ ๊ฑฐํ๊ณ ์ถ๋ค๋ฉด, ALTER PUBLICATION์ ์ฌ์ฉํ ์ ์๋ค.
ADD TABLE์ด๋ DROP TABLE์ ์ฌ์ฉํ๋ฉด ๋๋ค.
Slot ๋ง๋ค๊ธฐ
๋ณ๊ฒฝ๋ด์ญ์ ์กฐํํ๋ ค๋ฉด ์ฌ๋กฏ์ด๋๊ฑธ ๋ ๋ง๋ค์ด์ผ ํ๋ค.
์ฌ๋กฏ์ ๊ธฐ๋ฐ์ผ๋ก ํด์ ์ผ์ข
์ ํ์ด์ง๋ค์ด์
์กฐํ๋ฅผ ํ ์ ์๋ค.
SELECT pg_create_logical_replication_slot(์ฌ๋กฏ์ด๋ฆ, '์ถ๋ ฅ ํฌ๋งท');
์ถ๋ ฅ ํฌ๋งท์ ์๋ต์ด ์ด๋ค ํํ๋ก ๋์ฌ์ง๋ฅผ ์ ์ํ๋ค.
pgoutput์ด ๊ฐ์ฅ ์ผ๋ฐ์ ์ธ ์ ํ์ด๊ณ , ๋ฐ์ด๋๋ฆฌ ํํ๋ก ์๋ต์ ๋ฐํํ๋ค. pgoutput์ ์ธ ๊ฒฝ์ฐ์๋ ๋จ์ํ SQL ์ธํฐํ์ด์ค๋ก๋ ์กฐํ๋ฅผ ํ ์ ์๋ค.
์ด๋ ๊ฒ ๋ง๋ค๊ณ ๋๋ฉด
๋ง๋ค์ด์ง slot์ ๋ค์ ์ฟผ๋ฆฌ๋ก ์กฐํํ ์ ์๊ณ
select * from pg_replication_slots

์ฌ๋กฏ์ ์ง์ฐ๊ณ ์ถ๋ค๋ฉด ์ด๋ ๊ฒ ์ง์ธ ์ ์๋ค.
SELECT pg_drop_replication_slot('์ฌ๋กฏ์ด๋ฆ');

๋ณ๊ฒฝ ๋ฐ์ดํฐ ๊ตฌ๋ ํ๊ธฐ
๋ณ๊ฒฝ ๋ฐ์ดํฐ ๊ตฌ๋
์ pg_logical_slot_peek_binary_changes ํน์ pg_logical_slot_get_binary_changes์ ์ฌ์ฉํด์ ์ฒ๋ฆฌํ ์ ์๋ค.
peek์ ๋ฐ์ดํฐ๋ฅผ ์กฐํ๋ง ํ๊ณ , get์ ์กฐํ ํ ์๋น๊น์ง ํ๋ค.
SELECT *
FROM pg_logical_slot_peek_binary_changes('์ฌ๋กฏ์ด๋ฆ', NULL, NULL, 'proto_version', '1', 'publication_names', 'publication_์ด๋ฆ')
SELECT *
FROM pg_logical_slot_peek_binary_changes('์ฌ๋กฏ์ด๋ฆ', NULL, NULL, 'proto_version', '1', 'publication_names', 'publication_์ด๋ฆ')
๊ทธ๋ผ ์ด๋ฐ์์ผ๋ก ์กฐํ๊ฐ ๋๋๋ฐ, ๋ฌธ์ ๋ ์ ๋ฐ์ดํฐ๊ฐ ๋ฐ์ด๋๋ฆฌ ์ธ์ฝ๋ฉ์ด๋ผ์ ํด์ํ๋ ค๋ฉด ๋์ฝ๋ฉ์ ํด์ผ ํ๋ค๋ ๊ฒ์ด๋ค. ์ด๊ฑด SQL ๋ด์์ ์ฒ๋ฆฌํ๊ธด ์ข ๊น๋ค๋ก์ธ ์ ์๊ณ , ํ๋ก๊ทธ๋จ์ผ๋ก ํด๊ฒฐํด์ผ ํ๋ค.
Consumer ์ฝ๋ ์์ฑ (Go)
์์ ์ฝ๋๋ ๋ค์ ์์น์ ์๋ค.
https://github.com/myyrakle/go_example/blob/master/database/postgresql/publication.go

๊ทธ๋ฆฌ๊ณ ํ ์ด๋ธ์์ ๊ฐ์ ๋ณ๊ฒฝํด๋ณด๊ณ

Consumer๋ฅผ ์คํํด๋ณด๋ฉด
์ด๋ฐ ์์ผ๋ก ์ฐํ ๊ฒ์ด๋ค.
์ฐธ๊ณ ๋ก, ์ ๋ ๊ฒ ์กฐํ๋๋ ๋ฉ์ธ์ง ์ค์๋ ์ธ๋ชจ์๋ transaction/rollback ๋ฑ์ด ์์ฌ์๋ค.
๊ทธ๋์ ์ค์ ๋ฉ์ธ์ง๋ ๋ช๋ง๊ฐ ๋จ์ง์ธ๋ฐ ์ค์ ๋ณ๊ฒฝ์ ๋ํ ์ด๋ฒคํธ๋ ํ๋์ธ ๊ฒ์ด๋ค. ์ด๊ฑด ๊ทธ๋ฅ ์ ๋นํ ์์์ ๊ฑฐ๋ฅด๋์๋ฐ์ ์๋ค.
๊ทธ๋ฆฌ๊ณ peek๋ ์ฝ๊ธฐ์ ์ฉ์ด๋ผ์ ์ฝ๋๋ผ๋ ์๋ชจ๊ฐ ๋์ง ์๊ณ ๊ณ์ ๋ ธ์ถ์ด ๋๋ค.
get ๋ฒ์ ์ ์ฌ์ฉํ๋ค๋ฉด ์ฝ๊ธฐ ์ดํ์ ์๋ชจํด์
๋ค์ ์กฐํํ๋ฉด ๊ทธ ์ดํ์ ๋ฐ์ดํฐ๋ง ์กฐํ๊ฐ ๋ ๊ฒ์ด๋ค.
์๋ ์ฌ๋กฏ ์์น ์กฐ์
peek๋ ์ฌ๋กฏ ์์น๋ฅผ ์ด๋ํ์ง ์๊ณ ๋ฐ์ดํฐ๋ฅผ ๊บผ๋ด์ค๊ณ , get์ ์์น๋ฅผ ์ด๋์ํค๋ฉด์ ๊บผ๋ด์จ๋ค๊ณ ํ๋ค.
๊ทผ๋ฐ ๊ทธ๋ฌ๋ฉด ๋ฐ์ดํฐ ๊ฐ๊ณต์ ์๋ฃํ ๋ค์ ์์น๋ฅผ ์ด๋์ํค๋ ค๋ฉด ์ด๋ป๊ฒ ํ ๊น?
get์ ์ผ๋๋ฐ, ์ค๋ฅ๊ฐ ๋๋ฉด ๊ทธ๋๋ก ๋ฐ์ดํฐ๊ฐ ์ฆ๋ฐํ๋ ์
์ด๊ณ , peek๋ฅผ ์ฐ๋ฉด ์์น๊ฐ ์ด๋๋์ง ์๋๋ค.
๊ทธ๋ด๋๋ pg_replication_slot_advance๋ผ๋ ํจ์๋ฅผ ์ธ ์ ์๋ค.
pg_replication_slot_advance("์ฌ๋กฏ์ด๋ฆ", "์ปค์(lsn)");
๋ง์ฝ ์ ๊ธฐ์ 6๋ฒ์งธ๊น์ง ๋ค ์ฒ๋ฆฌํ๋ค๊ณ ํ๋ฉด
์ด๋ ๊ฒ ์ฌ๋กฏ ์์น๋ฅผ ์ด๋์ํฌ ์ ์๋ค.
๊ทธ๋ฌ๋ฉด ์ดํ์ ์กฐํํ ๋๋ ๊ทธ ๋ก๊ทธ๋ถํฐ ์กฐํ๊ฐ ๋ ๊ฒ์ด๋ค.
๋ฌธ์ : WAL ๋ง๋ฃ
์ฌ๋กฏ์ ์ค๋ ์ฐ๋ค๋ณด๋ฉด WAL์ด ๋ ๋ผ๊ฐ์ ์ฝ๊ธฐ๊ฐ ๋ถ๊ฐ๋ฅํ ์ํ๊ฐ ๋ ์ ์๋ค.

์ด ์ํ์์ ๋ด๊ฐ ์๊ธฐ๋ก ์ฌ๋กฏ์ ์ด๋ฆฌ๋๊ฑด ๋ถ๊ฐ๋ฅํ๋ค.
์๋ง ์ญ์ ํ์ ๋ค์ ๋ง๋ค์ด์ผ ํ ๊ฒ์ด๋ค.
์ด๊ฑธ ์ข ๋ ๋ ๋ผ๊ฐ๊ฒ ๋ง๋ค๋ ค๋ฉด, postgres์์ wal ๋ณด์กด ๊ด๋ จ๋ ์ต์ ์ ์กฐ์ ํด์ผ ํ๋ค.

์ฐธ์กฐ
https://www.postgresql.org/docs/9.4/catalog-pg-replication-slots.html
https://blog.ex-em.com/1809
https://postgresql.kr/docs/12/logicaldecoding-example.html
https://blog.ex-em.com/1817