[AWS] RDS Aurora: Lambda๋ก CDC ๊ฑธ๊ธฐ
CDC(Change Data Capture:๋ณ๊ฒฝ ๋ฐ์ดํฐ ์บก์ณ)๋ DB ์์์ ๋ฐ์ดํฐ๊ฐ ๋ณ๊ฒฝ๋ ๋๋ง๋ค ๊ทธ๊ฑธ ๊ฐ์ ธ๋ค ๋ญ๊ฐ ์ก์
์ ์ทจํ๋ ๊ธฐ๋ฅ ์์๋ฅผ ๋งํ๋ค.
๋ณ๊ฒฝ์ ๊ธฐ๋ฐ์ผ๋ก ๋ค๋ฅธ ๋ฐ์ดํฐ์์ค์ ๋ฐ์ดํฐ๋ฅผ ๋๊ธฐํํ๊ฑฐ๋, ์นํ
์ ์๋ค๊ฑฐ๋ ํ ๋ ์์ฃผ ์ฌ์ฉ๋๋ ํจํด์ด๋ค.
์ด๊ฑธ ๊ตฌํํ ๋๋ ๋ณดํต Kafka ๊ฐ์๊ฑธ๋ก ๋ฐ์ดํฐ ํด๋งํ๋ฉด์ ์๊ธฐ๋ ํ๊ณ ๊ทธ๋ฌ๋๋ฐ, RDS AuroraDB๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ์๋ ์ด๋ฐ ๋ชฉํ๋ฅผ ๋์ฑ ๊ฐ๋จํ๊ฒ ๋ฌ์ฑํ ์ ์๋ค. MySQL, PostgreSQL ๋ ๋ค ๋๋ค.
RDS AuroraDB๋ ์์ฒด์ ์ผ๋ก ์์ ๊ทธ๋ฆผ๊ณผ ๊ฐ์ด DB์์ AWS Lambda๋ฅผ ๋ฐ๋ก ํธ์ถํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ์ด๊ฑธ ์์ฉํด์ ํ
์ด๋ธ์ update ํธ๋ฆฌ๊ฑฐ๋ฅผ ๊ฑธ๊ณ , ํธ๋ฆฌ๊ฑฐ ํ๋ก์์ ์์ Lambda๋ฅผ ํธ์ถํ๋ฉด Lambda๊ฐ ๊ทธ๊ฑธ ๋ฐ์์ ์ถ๊ฐ์ ์ธ ์ฒ๋ฆฌ๋ฅผ ํ๋๋ก ํ ์ ์๋ค.
Lambda ํ์ฅ ์ค์น
์ ์ฉ ํ์ฅ์ ์ค์นํด์ค์ผ ํ๋ค.
์๋ฌด๋ฐ๋ cli ๋ค์ด๊ฐ์ ์๋ ์ปค๋งจ๋๋ฅผ ์
๋ ฅํด์ค๋ค,
CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE;

Lambda ํจ์ ์์ฑ
ํจ์๋ฅผ ๋ง๋ค๋ VPC๋ฅผ ์ผ๊ณ , RDS์ ๋์ผํ VPC, ๋ณด์๊ทธ๋ฃน, AZ์ ๋์ด๋๋ก ์์ฑํ๋ค.

๋ฐํ์๊ฐ์๊ฑด ์๊ด์๋ค.
๊ทธ๋ฆฌ๊ณ ์ ๋นํ ๋ค์ด์ค๋ ๊ฐ๋ง ์ฐ์ด์ฃผ๋๋ก ์์ ํ๋ค.
IAM ๊ถํ ์ค์
๋จผ์ RDS๊ฐ Lambda์ ์ ๊ทผํ ์ ์๊ฒ ํด์ค ๊ถํ๊ณผ ์ ์ฑ ์ ์ค์ ํด์ค์ผ ํ๋ค.
๋จผ์ ์ญํ ์ ํ๋ ๋ง๋ ๋ค.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "rds.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
๊ทธ๋ฆฌ๊ณ ์์ ์ ๋ขฐ ์ ์ฑ ์ ๋ณต๋ถํด ๋ฃ์ด์ค๋ค.
์ด๋ฆ์ ์ ๋นํ ์ง๊ณ
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "*"
}
]
}
์ญํ ์ ๊ถํ์ ์ถ๊ฐํด์ค๋ค.
๋๋ ์ธ๋ผ์ธ ์ ์ฑ
์ผ๋ก ๋ฃ์๋ค.
RDS ์ถ๊ฐ์ค์
๋ฐฉ๊ธ ๋ง๋ IAM ์ญํ ์ RDS ํ์ด์ง์์ Lambda ํ๊ฒ์ผ๋ก ๋ฑ๋กํด์ค๋ค.


๊ทธ๋ฆฌ๊ณ ์ฐ๊ฒฐ๋ ์ปดํจํ ๋ฆฌ์์ค์ Lambda๋ฅผ ์ฐ๊ฒฐํด์ค๋ค.

ํ๋ก์๋ ํ์ํ๋ค๋ฉด ๋ฌ๋ฉด ๋๋ค. ๋น์ฐํ ์ถ๊ฐ๋น์ฉ์ด ๋์จ๋ค.
๊ทธ๋์ ์ด๋ ๊ฒ ๋จ๋ฉด ๋๋ค.
RDS์์ Lambda ํธ์ถํด๋ณด๊ธฐ
ํจ์์ ํํ๋ก ํธ์ถ์ด ๊ฐ๋ฅํ๋ค.
์๋๋ ๋ฐฉ๊ธ ๋ง๋ Lambda ํจ์๋ฅผ RDS์์ ํธ์ถํ๋ ๊ฐ๋จํ ์์ ๋ค.
SELECT * from aws_lambda.invoke(aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'), '{"body": "Hello from Postgres!"}'::json );
๊ทธ๋ผ ์ ํ์ด๋ก๋๋ก ์์ฒญ์ ๋ ๋ ค์
์๋ต์ ๋ฐ์์ฌ ๊ฒ์ด๋ค.
Lambda ์ชฝ ๋ก๊ทธ์๋ ์ ์ฐํ๋ค.
ํธ๋ฆฌ๊ฑฐ๋ก CDC ๊ตฌ์ฑํ๊ธฐ
์ด๋ฒ์๋ ํธ๋ฆฌ๊ฑฐ๋ฅผ ํ์ฉํด์ Lambda๋ฅผ ํตํ CDC๋ฅผ ๊ตฌ์ฑํด๋ณด๊ฒ ๋ค.
๋จผ์ ๊ฐ๋จํ ํ ์ด๋ธ์ ํ๋ ์์ฑํ๋ค.
๋ณ๊ฑด ์๋ค.
๊ทธ๋ฆฌ๊ณ ์๋์ ๊ฐ์ ํํ๋ก ํจ์์ ํธ๋ฆฌ๊ฑฐ๋ฅผ ์์ฑํด์ค๋ค.
CREATE OR REPLACE FUNCTION public.trigger_test_table_function()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
begin
IF (TG_OP = 'DELETE') THEN
perform aws_lambda.invoke(
aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'),
json_build_object(
'type', 'delete',
'value', row_to_json(old)
)
);
ELSIF (TG_OP = 'UPDATE') THEN
perform aws_lambda.invoke(
aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'),
json_build_object(
'type', 'update',
'value', row_to_json(new)
)
);
ELSIF (TG_OP = 'INSERT') THEN
perform aws_lambda.invoke(
aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'),
json_build_object(
'type', 'insert',
'value', row_to_json(new)
)
);
END IF;
RETURN NULL;
END;
$function$;
CREATE TRIGGER trigger_test_table
AFTER INSERT OR update or delete
ON test
FOR EACH ROW
EXECUTE Function trigger_test_table_function();
์ด๋ค operation์ผ๋ก ๋ ๋ผ์จ๊ฑด์ง, ๊ทธ๋ฆฌ๊ณ ํ ์ด๋ธ row์ ์ ์ฒด ๊ฐ์ ๊ทธ๋๋ก ์ด์ฃผ๋๋ก ํ๋ค.
๊ทธ๋ฆฌ๊ณ insert๋ฅผ ๋ ๋ ค๋ณด๋ฉด
๋ ๋ฆฐ ๋๋ก ๋ค์ด์ฌ ๊ฒ์ด๊ณ
delete๋ฅผ ๋ ๋ ค๋ณด๋ฉด
๊ทธ๊ฒ๋ ๊ทธ๊ฒ๋๋ก ์ ๋ ๋ผ์ฌ ๊ฒ์ด๋ค.
์ด๋ฐ์์ผ๋ก ์จ๋จน์ผ๋ฉด ๋๋ค.
์ฐธ์กฐ
https://docs.aws.amazon.com/ko_kr/AmazonRDS/latest/AuroraUserGuide/PostgreSQL-Lambda.html
https://stackoverflow.com/questions/73474076/invoking-aws-lambda-with-postgresql
https://aws.amazon.com/ko/blogs/database/capturing-data-changes-in-amazon-aurora-using-aws-lambda/