[AWS] RDS Aurora: Lambda๋กœ CDC ๊ฑธ๊ธฐ

CDC(Change Data Capture:๋ณ€๊ฒฝ ๋ฐ์ดํ„ฐ ์บก์ณ)๋Š” DB ์ƒ์—์„œ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณ€๊ฒฝ๋ ๋•Œ๋งˆ๋‹ค ๊ทธ๊ฑธ ๊ฐ€์ ธ๋‹ค ๋ญ”๊ฐ€ ์•ก์…˜์„ ์ทจํ•˜๋Š” ๊ธฐ๋Šฅ ์š”์†Œ๋ฅผ ๋งํ•œ๋‹ค.
๋ณ€๊ฒฝ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ์†Œ์Šค์— ๋ฐ์ดํ„ฐ๋ฅผ ๋™๊ธฐํ™”ํ•˜๊ฑฐ๋‚˜, ์›นํ›…์„ ์œ๋‹ค๊ฑฐ๋‚˜ ํ• ๋•Œ ์ž์ฃผ ์‚ฌ์šฉ๋˜๋Š” ํŒจํ„ด์ด๋‹ค.

์ด๊ฑธ ๊ตฌํ˜„ํ•  ๋•Œ๋Š” ๋ณดํ†ต Kafka ๊ฐ™์€๊ฑธ๋กœ ๋ฐ์ดํ„ฐ ํด๋งํ•˜๋ฉด์„œ ์˜๊ธฐ๋„ ํ•˜๊ณ  ๊ทธ๋Ÿฌ๋Š”๋ฐ, RDS AuroraDB๋ฅผ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ์—๋Š” ์ด๋Ÿฐ ๋ชฉํ‘œ๋ฅผ ๋”์šฑ ๊ฐ„๋‹จํ•˜๊ฒŒ ๋‹ฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค. MySQL, PostgreSQL ๋‘˜ ๋‹ค ๋œ๋‹ค.


RDS AuroraDB๋Š” ์ž์ฒด์ ์œผ๋กœ ์œ„์˜ ๊ทธ๋ฆผ๊ณผ ๊ฐ™์ด DB์—์„œ AWS Lambda๋ฅผ ๋ฐ”๋กœ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค. ์ด๊ฑธ ์‘์šฉํ•ด์„œ ํ…Œ์ด๋ธ”์— update ํŠธ๋ฆฌ๊ฑฐ๋ฅผ ๊ฑธ๊ณ , ํŠธ๋ฆฌ๊ฑฐ ํ”„๋กœ์‹œ์ €์—์„œ Lambda๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด Lambda๊ฐ€ ๊ทธ๊ฑธ ๋ฐ›์•„์„œ ์ถ”๊ฐ€์ ์ธ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋„๋ก ํ•  ์ˆ˜ ์žˆ๋‹ค.




Lambda ํ™•์žฅ ์„ค์น˜

์ „์šฉ ํ™•์žฅ์„ ์„ค์น˜ํ•ด์ค˜์•ผ ํ•œ๋‹ค.
์•„๋ฌด๋ฐ๋‚˜ cli ๋“ค์–ด๊ฐ€์„œ ์•„๋ž˜ ์ปค๋งจ๋“œ๋ฅผ ์ž…๋ ฅํ•ด์ค€๋‹ค,

CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE;




Lambda ํ•จ์ˆ˜ ์ƒ์„ฑ

ํ•จ์ˆ˜๋ฅผ ๋งŒ๋“ค๋•Œ VPC๋ฅผ ์ผœ๊ณ , RDS์™€ ๋™์ผํ•œ VPC, ๋ณด์•ˆ๊ทธ๋ฃน, AZ์— ๋†“์ด๋„๋ก ์ƒ์„ฑํ•œ๋‹ค.

๋Ÿฐํƒ€์ž„๊ฐ™์€๊ฑด ์ƒ๊ด€์—†๋‹ค.


๊ทธ๋ฆฌ๊ณ  ์ ๋‹นํžˆ ๋“ค์–ด์˜ค๋Š” ๊ฐ’๋งŒ ์ฐ์–ด์ฃผ๋„๋ก ์ˆ˜์ •ํ–ˆ๋‹ค.




IAM ๊ถŒํ•œ ์„ค์ •

๋จผ์ € RDS๊ฐ€ Lambda์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ค„ ๊ถŒํ•œ๊ณผ ์ •์ฑ…์„ ์„ค์ •ํ•ด์ค˜์•ผ ํ•œ๋‹ค.

๋จผ์ € ์—ญํ• ์„ ํ•˜๋‚˜ ๋งŒ๋“ ๋‹ค.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "rds.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

๊ทธ๋ฆฌ๊ณ  ์œ„์˜ ์‹ ๋ขฐ ์ •์ฑ…์„ ๋ณต๋ถ™ํ•ด ๋„ฃ์–ด์ค€๋‹ค.


์ด๋ฆ„์€ ์ ๋‹นํžˆ ์ง“๊ณ 


{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "lambda:InvokeFunction",
            "Resource": "*"
        }
    ]
}

์—ญํ• ์— ๊ถŒํ•œ์„ ์ถ”๊ฐ€ํ•ด์ค€๋‹ค.
๋‚˜๋Š” ์ธ๋ผ์ธ ์ •์ฑ…์œผ๋กœ ๋„ฃ์—ˆ๋‹ค.




RDS ์ถ”๊ฐ€์„ค์ •

๋ฐฉ๊ธˆ ๋งŒ๋“  IAM ์—ญํ• ์„ RDS ํŽ˜์ด์ง€์—์„œ Lambda ํƒ€๊ฒŸ์œผ๋กœ ๋“ฑ๋กํ•ด์ค€๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์—ฐ๊ฒฐ๋œ ์ปดํ“จํŒ… ๋ฆฌ์†Œ์Šค์— Lambda๋ฅผ ์—ฐ๊ฒฐํ•ด์ค€๋‹ค.

ํ”„๋ก์‹œ๋Š” ํ•„์š”ํ•˜๋‹ค๋ฉด ๋‹ฌ๋ฉด ๋œ๋‹ค. ๋‹น์—ฐํžˆ ์ถ”๊ฐ€๋น„์šฉ์ด ๋‚˜์˜จ๋‹ค.


๊ทธ๋ž˜์„œ ์ด๋ ‡๊ฒŒ ๋œจ๋ฉด ๋œ๋‹ค.




RDS์—์„œ Lambda ํ˜ธ์ถœํ•ด๋ณด๊ธฐ

ํ•จ์ˆ˜์˜ ํ˜•ํƒœ๋กœ ํ˜ธ์ถœ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
์•„๋ž˜๋Š” ๋ฐฉ๊ธˆ ๋งŒ๋“  Lambda ํ•จ์ˆ˜๋ฅผ RDS์—์„œ ํ˜ธ์ถœํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์˜ˆ์ œ๋‹ค.

SELECT * from aws_lambda.invoke(aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'), '{"body": "Hello from Postgres!"}'::json );

๊ทธ๋Ÿผ ์ € ํŽ˜์ด๋กœ๋“œ๋กœ ์š”์ฒญ์„ ๋‚ ๋ ค์„œ


์‘๋‹ต์„ ๋ฐ›์•„์˜ฌ ๊ฒƒ์ด๋‹ค.


Lambda ์ชฝ ๋กœ๊ทธ์—๋„ ์ž˜ ์ฐํ˜”๋‹ค.





ํŠธ๋ฆฌ๊ฑฐ๋กœ CDC ๊ตฌ์„ฑํ•˜๊ธฐ

์ด๋ฒˆ์—๋Š” ํŠธ๋ฆฌ๊ฑฐ๋ฅผ ํ™œ์šฉํ•ด์„œ Lambda๋ฅผ ํ†ตํ•œ CDC๋ฅผ ๊ตฌ์„ฑํ•ด๋ณด๊ฒ ๋‹ค.

๋จผ์ € ๊ฐ„๋‹จํ•œ ํ…Œ์ด๋ธ”์„ ํ•˜๋‚˜ ์ƒ์„ฑํ–ˆ๋‹ค.

๋ณ„๊ฑด ์—†๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์•„๋ž˜์™€ ๊ฐ™์€ ํ˜•ํƒœ๋กœ ํ•จ์ˆ˜์™€ ํŠธ๋ฆฌ๊ฑฐ๋ฅผ ์ƒ์„ฑํ•ด์ค€๋‹ค.

CREATE OR REPLACE FUNCTION public.trigger_test_table_function()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
    begin
        IF (TG_OP = 'DELETE') THEN
            perform aws_lambda.invoke(
           		aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'), 
           		json_build_object(
           			'type', 'delete',
           			'value', row_to_json(old)
           		)
           	);
        ELSIF (TG_OP = 'UPDATE') THEN
            perform aws_lambda.invoke(
           		aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'), 
           		json_build_object(
           			'type', 'update',
           			'value', row_to_json(new)
           		)
           	);
        ELSIF (TG_OP = 'INSERT') THEN
            perform aws_lambda.invoke(
           		aws_commons.create_lambda_function_arn('cdc_function', 'ap-northeast-2'), 
           		json_build_object(
           			'type', 'insert',
           			'value', row_to_json(new)
           		)
           	);
        END IF;
        RETURN NULL; 
    END;
$function$;

CREATE TRIGGER trigger_test_table
AFTER INSERT OR update or delete
ON test
FOR EACH ROW
EXECUTE Function trigger_test_table_function();

์–ด๋–ค operation์œผ๋กœ ๋‚ ๋ผ์˜จ๊ฑด์ง€, ๊ทธ๋ฆฌ๊ณ  ํ…Œ์ด๋ธ” row์˜ ์ „์ฒด ๊ฐ’์„ ๊ทธ๋Œ€๋กœ ์ด์ฃผ๋„๋ก ํ–ˆ๋‹ค.


๊ทธ๋ฆฌ๊ณ  insert๋ฅผ ๋‚ ๋ ค๋ณด๋ฉด


๋‚ ๋ฆฐ ๋Œ€๋กœ ๋“ค์–ด์˜ฌ ๊ฒƒ์ด๊ณ 


delete๋ฅผ ๋‚ ๋ ค๋ณด๋ฉด


๊ทธ๊ฒƒ๋„ ๊ทธ๊ฒƒ๋Œ€๋กœ ์ž˜ ๋‚ ๋ผ์˜ฌ ๊ฒƒ์ด๋‹ค.

์ด๋Ÿฐ์‹์œผ๋กœ ์จ๋จน์œผ๋ฉด ๋œ๋‹ค.



์ฐธ์กฐ
https://docs.aws.amazon.com/ko_kr/AmazonRDS/latest/AuroraUserGuide/PostgreSQL-Lambda.html
https://stackoverflow.com/questions/73474076/invoking-aws-lambda-with-postgresql
https://aws.amazon.com/ko/blogs/database/capturing-data-changes-in-amazon-aurora-using-aws-lambda/