[AWS] Athena: DynamoDB ํตํฉ
DynamoDB๋ ๋ ๊ฐ๋๋ค ๊ฐ๋ํ AWS์์ ๊ฐ์ฅ ๊ฐ๋ ฅํ๊ณ ๋น์ฉ ํจ์จ์ ์ธ DB ์๋น์ค๋ค.
RDB๋งํผ ์ ์ฐํ ๊ฒ์๊ธฐ๋ฅ์ ์ ๊ณตํ์ง ์์ง๋ง, ๋ฌด์ ํ ์ฐ๊ธฐ ํ์ฅ์ด ๊ฐ๋ฅํ๊ณ , ๋น์ฉ๋ ์ฑ๋ฅ ๋๋น ์ ๋ ดํ ํธ์ด๊ธฐ ๋๋ฌธ์ด๋ค. ๊ทธ๋์ ๋ก๊ทธ ๊ฐ์ APPEND Only ๋ฐ์ดํฐ ์์ง ์ฐฝ๊ณ ๋ก ์ฐ๊ธฐ ์ข๋ค.
๊ทธ๋ฆฌ๊ณ DynamoDB์ ๋ ํ๋์ ์ฅ์ ์, Athena & S3 ํ์ดํ๋ผ์ธ๊ณผ๋ ์์ฃผ ์ ํตํฉ๋๋ค๋ ๊ฒ์ด๋ค.
์๋น์ค๊ฐ DynamoDB์ ๋ฐ์ดํฐ๋ฅผ ์์ผ๋ฉด, ๊ทธ๊ฑธ S3์ parquet์ผ๋ก ๋ง์์ ์ฌ๋ฆฌ๊ณ , Athena๋ก ๋๋ ์ง๊ณ๋ฅผ ๋ ๋ฆฌ๋ ์ผ๋ จ์ ํจํด์ ํธ๋ฆฌํ๊ฒ ๊ตฌ์ฑํ ์ ์๋ค.
์ํคํ ์ณ ์ ํ์ง
Athena์์ DynamoDB๋ฅผ ์์ค๋ก ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ์ฌ๋ฌ๊ฐ์ง๊ฐ ์๋ค.
1. ์ง๊ตฌ๋ก ๊ฝ๊ธฐ
๊ฐ์ฅ ๋จ์ํ ์ฌ์ฉ ๋ฐฉ์์ ๋ค์๊ณผ ๊ฐ๋ค.
DynamoDB Connector๋ฅผ ์ฌ์ฉํด์ ์งํต์ผ๋ก ์ฐ๊ฒฐํ๋ ๊ฒ์ด๋ค.
์ด๊ฑด ์ฑ๋ฅ์ด๋ ๋น์ฉ์์ ์ด์ ์ด ๊ฑฐ์ ์๋ค.
๊ทธ๋ฅ DynamoDB์ ๋์ READ ๋ ๋ ค์ SQL Query์ ๋ง์ถฐ์ ๊ฐ๊ณต์ ํ๋ ๊ฒ์ ๋ถ๊ณผํ๊ณ , DynamoDB์ ์ฑ๋ฅ ํ๊ณ๋ฅผ ๊ทธ๋๋ก ๋ฐ๋๋ค. ๋น์ฉ๋ DynamoDB RCU ๊ธฐ๋ฐ์ผ๋ก ๋ฏ๋๋ค.
์ด๊ฑด ์์ ๋ฐ์ดํฐ์ ๋ํด์ JOIN ํ๋ ์ฉ๋๋ก๋ง ์ฐ๊ฑฐ๋ ํด์ผํ๋ค.
2. S3 Export Only
๊ทธ ๋ค์์ผ๋ก ๋์ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค.
DynamoDB๋ PITR ๋ฐฑ์
๊ธฐ๋ฐ์ผ๋ก S3์ exportํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋๋ฐ, ๊ทธ๋ฌ๊ณ ๋๋ฉด ์ํ
๋์์ ๋ฐ๋ก ๊ฝ์์ ์ฝ์ ์ ์๋ค.
๊ทผ๋ฐ ์ด๊ฒ๋ ์ฌ์ค ์ต์ ์ด๋ผ๊ณ ํ ์๋ ์๋ค.
์ํ
๋ ์กฐํ ํจํด์ ์ต์ ํ๋์ด์์ง๋ ์๊ณ , ๊ณต๊ฐ๋ ๋ถํ์ํ๊ฒ ํฌ๊ฒ ๋จน๊ธฐ ๋๋ฌธ์ด๋ค.
์ฑ๋ฅ๊ณผ ๋น์ฉํจ์จ์ฑ์ ์ํ๋ค๋ฉด parquet์ผ๋ก ๋ง์๋๋ ๊ตฌ์ฑ์ด ํ์ํ์ง๋ง, ์ด๊ฑด ๊ทธ๋ฅ flatํ๊ฒ ํผ์ณ๋ฒ๋ฆฐ๋ค.
3. S3 Export & CTAS (parquet)
Athena CTAS๋ผ๋ ๊ธฐ๋ฅ์ ์ฐ๋ฉด Athena๋ง์ ์ํ ์ต์ ํ๋ ๊ตฌ์กฐ๋ก ์ฌ์์ฑ์ํฌ ์ ์๋ค.

4. Best Practice?
Best Practice๋ก ๊ฐ์ฃผ๋๋ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค.
์ฆ๋ถํ S3 export๋ฅผ ํตํด์ S3์ ๋ถ๊ณ , ๊ทธ๊ฑธ Athena๋ฅผ ์ฌ์ฉํด์ ์ฌ์์ฑํ๋ ๊ฒ์ด๋ค.
ETL ์ ์ด์๋ Step Function์ด ๊ฐ์ฅ ๋จ์ํ๊ณ ํธ๋ฆฌํ ํธ์ด๋ค.
๋น์ฉ
https://aws.amazon.com/ko/dynamodb/pricing/on-demand/
๋น์ฉ ๋ฐ์ ์์๋ ๋ช๊ฐ์ง๊ฐ ์๋ค.
PITR์ด๋ผ๋ ๋ณต๊ตฌ ๋ฐ์ดํฐ๋ฅผ ์ ์งํด์ผ ๊ทธ๊ฑธ๋ก S3์ ๋ณด๋ด๋๊ฑด๋ฐ, ๊ทธ๊ฒ๋ ๋ฐฑ์
๋ณธ ์ ์ฒด ์ฉ๋ ๊ธฐ์ค์ผ๋ก ์๊ธ์ ๋ฏ๋๋ค.
์ ์ฒด ๋ฐฑ์
์ด 200GB๋ฉด 40๋ฌ๋ฌ ์ ๋ ๋ฏ๋ ์
์ด๋ค.
๊ทผ๋ฐ ์ต์ํ๋์ธ 1์ผ๋ก ์ก์ผ๋ฉด ๊ทธ ๋ ์ ๋ณ๊ฒฝ๋ถ๋ง ๋ณด์กด๋ ํ
๋, ๊ทธ๋ ๊ฒ ํฌ๊ฒ ๋ค์ง ์์ ๊ฒ์ด๋ค.
๋ด๋ณด๋ด๋ ํ์ ์์ฒด์๋ ํฌ๊ธฐ๋น ๋น์ฉ์ด ๋ฐ์ํ๋ค.

100GB ์ ๋ ๋๋ค๋ฉด ๋ง์์ฏค ๋์ค๋ ์
์ด๋ค.
๊ทธ๋ฆฌ๊ณ ๋น์ฐํ S3 ๋น์ฉ๋ ๋ฐ์ํ๋ค.
์ ์ฅ ๋น์ฉ์ ๋น์ฐํ ๋ฐ๋ก๊ณ , parquet์ผ๋ก ์ฌ์์ฑํ๋ ๊ณผ์ ์์ S3 ํ์ค ์ฝ๊ธฐ/์ฐ๊ธฐ ๋น์ฉ์ด ๋ฐ์ํ๋ค.
์ฌ์ฉํด๋ณด๊ธฐ
DynamoDB ํ ์ด๋ธ์ ์ง์ ๋ง๋ค๊ณ , ๊ณผ์ ์ ๊ฑฐ์น๋ฉด์ ์ฌ์ฉ๋ฒ์ ์ตํ๋ณด์.
์ ๋นํ ํ
์ด๋ธ ํ๋ ๋ง๋ค๊ณ
๋ฐฑ์
์ต์
์ธ PITR์ ์ผ์ผ ํ๋ค.
์ด๊ฑด ๋ณธ์ง์ ์ผ๋ก ๋ฐฑ์
๋ฐ์ดํฐ๋ผ์, ๋ฐ๋ก ๋ฐฑ์
์ ์ฑ
์ด ํ์์๊ณ S3 Export์ฉ์ผ๋ก๋ง ์ธ๊ฑฐ๋ผ๋ฉด 1์ผ๋ก ํด๋ ๋๋ค.
๊ทธ๋ฆฌ๊ณ ์ ๋นํ ๋ฐ์ดํฐ๋ฅผ ๋ฃ๋๋ค.
S3 Export
์ด์ S3๋ก ๋ถ์ ์ค๋น๋ ๋๋ค.
Dynamo์ ๋ณด๋ฉด ๋ด๋ณด๋ด๊ธฐ ํญ์ด ์๋๋ฐ,

์ฌ๊ธฐ์ ๋ด๋ณด๋ผ S3 ๋ฒํท ๊ณ ๋ฅด๊ณ ํธ๋ฆฌ๊ฑฐํ๋ฉด ๋๋ค.
์ ์ฒด ๋ด๋ณด๋ด๊ธฐ์ ์ฆ๋ถ(๋ถ๋ถ) ๋ด๋ณด๋ด๊ธฐ๊ฐ ์๋๋ฐ, ์ผ๋จ์ ์ ์ฒด๋ก ๋จผ์ ํด๋ณธ๋ค.
๊ทธ๋ฆฌ๊ณ ์ด ๋ด๋ณด๋ด๊ธฐ ์์ฒด๋ ๋น๋๊ธฐ API๋ผ์ ์ฆ์ ๋์ง ์๋๋ค.
์ข ๊ธฐ๋ค๋ฆฌ๋ฉด ์ฑ๊ณต์ผ๋ก ๋๋ ํ ๋ฐ
๊ทธ๋ผ ๋ชฉ์ ์ง์ ์ด๋ฐ ํ์ผ๋ค์ด ์ง์ ๋ถํ๊ฒ ๋ง ๋จ์ด์ ธ์์ ๊ฒ์ด๋ค.
์ด๊ฒ ํ์ํ ๋ฐ์ดํฐ๋ค์ด๋ค.
๋งคํ์ฉ External ํ ์ด๋ธ ๊ตฌ์ฑํ๊ธฐ
์. ์ด์ ์ค๋น๋ ๊ฑฐ์ ๋๋ค.
Athena๋ก ์ด๋ํด์ ๋งคํ ํ
์ด๋ธ์ ๊ตฌ์ฑํด์ค๋ค.
๋ฐฉ๊ธ Export๋ json.gz๋ค์ด ์๋ ๊ฒฝ๋ก๋ฅผ ๋ฃ๊ณ , ์ด๋ค ํ๋๋ค์ด ์๋์ง ์ฒดํฌํด์ ํค ๋ชฉ๋ก์ ์ธํ
ํด์ฃผ๋ฉด ๋๋ค.
CREATE EXTERNAL TABLE ์ธ๋ถํ
์ด๋ธ๊ตฌ์ฑ (
Item struct<
ํค: struct<S:ํ์
>,
...
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://๋ฒํท/๊ฒฝ๋ก/dynamo/AWSDynamoDB/?/data/';
์ด๊ฒ ์์ฒด๋ ๊ธฐ์กด ๋ฐ์ดํฐ์ ๋ฉํ๋ฐ์ดํฐ๋ง ์ถ๊ฐํด์ฃผ๋๊ฑฐ๋ผ์, ๋น์ฉ์ด๋ ์ฑ๋ฅ์ ๋จน์ง๋ ์๋๋ค.
๊ทธ๋ผ ์ผ๋จ ์กฐํ๋ฅผ ํด๋ณผ ์๋ ์์ ๊ฒ์ด๋ค.
์ฐ๋ฆฐ ์ฌ๊ธฐ์ ํ๋ฐ ๋ ๋์๊ฐ์ผ ํ๋ค.
athena์ CTAS ๊ธฐ๋ฅ์ ์ฌ์ฉํด์ parquet๋ก ์ฌ์์ฑํด์ค๋ค.
CREATE TABLE access_logs_parquet
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = 's3://์ ์ฅ๊ฒฝ๋ก/'
) AS
SELECT
Item.id.S AS id,
Item.ip.S AS ip,
Item.path.S AS path
FROM ์์ํ
์ด๋ธ;

๊ทธ๋ผ ์ค์ ๋ก ์ต์ข ๋ชฉ์ ์ง์ ์์ถ๋ parquet ์ํธ๋ฆฌ๊ฐ ์์ฑ๋ ๊ฒ์ด๊ณ

์ฟผ๋ฆฌ๋ ๋ ๋ ค๋ณผ ์ ์์ ๊ฒ์ด๋ค.
์ ๋ค์ด๊ฐ ๊ฒ์ ๋ณผ ์ ์๋ค.
์ฟผ๋ฆฌ ๋ ๋ฆฌ๋๊ฑด ๊ทธ๋ฅ ์ด๋ ๊ฒ ํ๋ฉด ๋๋ค.
DynamoDB์ ์ฆ๋ถ Export (Incremental Export)
๊ทผ๋ฐ ๋ฐฉ๊ธ ์ฌ์ฉํ ๋ฐฉ์์ ์ ๋์ํ๊ธด ํ์ง๋ง, ํ์ฅ์ฑ์ด ์ข์ ๋ฐฉ์์ ์๋๋ค.
๋งค๋ฒ ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์ ๋ฐ์ด๋ฃ๋ ๊ฒ์ ๋๋ฌด ์ค๋๊ฑธ๋ฆฌ๊ณ , ๋น์ฉ๋ ๋น์ฉ๋๋ก ๋จน๋๋ค.
๋คํํ DynamoDB๋ ์ด๋ฐ ๋๋ ๋ง๋ฅผ ์ ํด์ํด์ค๋ค.
DynamoDB์ ๋ฐฑ์
์์คํ
์์ฒด๊ฐ ํ์์คํฌํ ๊ธฐ๋ฐ์ผ๋ก ํํฐ๋ง์ด ๋๋ ๊ตฌ์กฐ๋ก ๋์ด์์ด์, "๋ช์๊ฐ ์ ๋ถํฐ์ ๋ฐ์ดํฐ"๋ "์ด์ ๋ถํฐ์ ๋ฐ์ดํฐ" ๊ฐ์ ์์ผ๋ก ๋ถ๋ถ์ EXPORT๊ฐ ์ ๋๊ธฐ ๋๋ฌธ์ด๋ค.
์ด๋ฒ์๋ ํ์ ํ๋ ์ถ๊ฐํ ๋ค์์ ๋๊ธฐํ๋ฅผ ๊ตฌ์ฑํด๋ณด์.
id=4444๋ฅผ ์ถ๊ฐํ๊ณ
์ฆ๋ถ ๋ด๋ณด๋ด๊ธฐ๋ฅผ ์ ํํด์ ํธ๋ฆฌ๊ฑฐํ๋ค.
๊ทธ๋ผ ๋ง์ฐฌ๊ฐ์ง๋ก json.gz๊ฐ ์๋ฉ ๋จ์ด์ง๋ค.
๊ทผ๋ฐ ์ฌ๊ธฐ์๋ถํด ์ข ๋ค๋ฅด๋ค.
์ ์ฒด ๋ด๋ณด๋ด๊ธฐ๋ ์ง์ง ์ ์ฒด ๋ฐ์ดํฐ๋ง ๊ฐ์ ธ์ค๋ ๊ฑฐ๋ผ์ ๊ตฌ์กฐ๊ฐ ๋จ์ํ๋ฐ, ์ฆ๋ถ ๋ด๋ณด๋ด๊ธฐ๋ UPDATE/DELETE ๊ฐ์ ์ํ ์ ๋ณด๋ ํจ๊ป ํฌํจ๋๊ธฐ ๋๋ฌธ์ด๋ค. ๊ทธ ์๊ธฐ์ "๋ชจ๋ ๋ณ๊ฒฝ"์ ๊ฐ์ ธ์จ๋ค.
CREATE EXTERNAL TABLE access_log_incremental (
Metadata struct<
WriteTimestampMicros: string
>,
Keys struct<
id: struct<S:string>
>,
NewImage struct<
id: struct<S:string>,
ip: struct<S:string>,
path: struct<S:string>
>,
OldImage struct<
id: struct<S:string>,
ip: struct<S:string>,
path: struct<S:string>
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://๋ฒํท/๊ฒฝ๋ก/dynamo/AWSDynamoDB/?/data/';

๊ทธ๋์ ์ด๋ฐ ์์ผ๋ก ๋ญ๊ฐ ์ข ๋ง๋ค.
INSERT/UPDATE๋ NewImage๋ฅผ ์ฝ์ด์ ๋ฃ์ด์ผ ํ๊ณ , DELETE๋ Keys๋ง ์ ๋ฌ๋๋ค.
์ฌ๊ธฐ์๋ถํฐ๋ ๊ธฐ๋ฅ ๊ตฌ์กฐ์ ๋ฐ๋ผ์ ์ ์ฝ์กฐ๊ฑด์ด ์ข ๋ฌ๋ผ์ง๋ค.
APPEND-ONLY์ ๋จ์ํ ์์คํ
์ด๋ผ๋ฉด, ๊ทธ๋ฅ NewImage๋ง ๊ฑธ๋ฌ์ ๊ธฐ์กด CTAS ํ
์ด๋ธ์ insertํ๋ฉด ๋๋ค.

๊ทธ๋ผ ์๋์ผ๋ก parquet ๋ฉ์ด๋ฆฌ๊ฐ ์ถ๊ฐ๋ ๊ฒ์ด๊ณ
์กฐํ๋ ์ ๋ ๊ฒ์ด๋ค.
๊ทผ๋ฐ UPDATE/DELETE๋ฅผ ๊ณ ๋ คํด์ผ ํ๋ค๋ฉด ์ข ๋ณต์กํด์ง๋ค.
์ฌ์ค ์ง๊ธ ์์ ์ฟผ๋ฆฌ๋, ๊ธฐ์กด parquet์ ๋ค์ด์๋ ID์ ๋์ผํ INSERT๊ฐ ๋ค์ด์จ๋ค๋ฉด ์ค๋ณต ID๊ฐ ๋ฐ์ํ ์ ์๋ค๋ ๋จ์ ์ด ์๋ค. ์๋ณธ ๋ฐ์ดํฐ๋ถํฐ๊ฐ NEW INSERT ONLY๋ผ๋ ์ ์ ํ์์๋ง ์ ๋์ํ๋ ๊ตฌ์กฐ๋ค.
IceBerg๋ฅผ ์ฌ์ฉํ UPDATE/DELETE
์ฌ๊ธฐ์ ๋ฌธ์ ๊ฐ ๋๋ ๊ฒ์ parquet ์์ฒด๊ฐ ์์ ์ ์ํ ๋ฐ์ดํฐ ๊ท๊ฒฉ์ด ์๋๊ธฐ ๋๋ฌธ์ด๋ค.
๊ทธ์ ๋ฐ์ดํฐ๋ฅผ ์์ถํด์ ํจ์จ์ ์ผ๋ก ๊ด๋ฆฌํ๊ธฐ ์ํ ํฌ๋งท์ ๋ถ๊ณผํ๋ค.
๊ทธ๋์ ๋ณ๊ฒฝ๋๋ ๋ฐ์ดํฐ์
์ ๋ํด ์์ ๊ณผ ์ญ์ ๋ฅผ ๊ณ ๋ คํด์ parquet์ ๊ด๋ฆฌํด์ผ ํ๋ค๋ฉด, iceberg ์์ง์ ์ฌ์ฉํ๋ ํธ์ด ๊ถ์ฅ๋๋ค.
๋ค์๊ณผ ๊ฐ์ ์์ผ๋ก ํ
์ด๋ธ์ ๋ง๋ค๋ฉด iceberg์ ํํฐ์
๊ด๋ฆฌ ๋ฅ๋ ฅ์ ๊ฐ์ ธ๊ฐ ์ ์๋ค.
CREATE TABLE access_log_iceberg
WITH (
table_type = 'ICEBERG',
format = 'PARQUET',
location = 's3://.../iceberg/access_log/',
is_external = false
) AS
SELECT id, ip, path FROM access_log_parquet;
๊ทธ๋ฌ๋ฉด ์ด์ MERGE ๊ตฌ๋ฌธ์ ํตํด์ ํํฐ์
์ ์ฌ๊ตฌ์ฑํ๋ ๊ฒ์ด ๊ฐ๋ฅํด์ง๋ค.
DELETE๋ UPDATE๋ฅผ ๋ฐ์ํ ์ ์๋ค.
MERGE INTO access_log_iceberg t
USING (
SELECT
COALESCE(NewImage.id.S, Keys.id.S) AS id,
NewImage.ip.S AS ip,
NewImage.path.S AS path,
NewImage IS NULL AS is_deleted,
CAST(Metadata.WriteTimestampMicros AS bigint) AS change_ts,
ROW_NUMBER() OVER (
PARTITION BY COALESCE(NewImage.id.S, Keys.id.S)
ORDER BY CAST(Metadata.WriteTimestampMicros AS bigint) DESC
) AS rn
FROM access_log_incremental
) s
ON t.id = s.id AND s.rn = 1
WHEN MATCHED AND s.is_deleted THEN DELETE
WHEN MATCHED THEN UPDATE SET ip = s.ip, path = s.path
WHEN NOT MATCHED AND NOT s.is_deleted THEN INSERT (id, ip, path) VALUES (s.id, s.ip, s.path);ํํฐ์ ์ ๋ต
Athena๋ฅผ ๋น์ฉํจ์จ์ ์ผ๋ก, ๊ทธ๋ฆฌ๊ณ ๋ ๋น ๋ฅด๊ฒ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ํํฐ์
์ ๋ต์ ์ ์ก์์ผ ํ๋ค.
์ด๊ฑด ๋ณ๋ ํฌ์คํธ๋ก ๊ฐ์ํ๋ค.
https://blog.naver.com/sssang97/223716323937
์ฐธ์กฐ
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataExport.HowItWorks.html
https://docs.aws.amazon.com/athena/latest/ug/connectors-dynamodb.html