[AWS] Kinesis
Kinesis๋ Kafka์ aws ๋ฒ์ ์ด๋ผ๊ณ ํ ์ ์๋ค.
์ฉ๋ ์์ฒด๋ ์นดํ์นด๋ฅผ ๊ทธ๋๋ก ์ฌ๋ ค์ฃผ๋ AWS MSK์ ๋์ผํ๋ค.
๋ค๋ง AWS์์ ์์ฒด์ ์ผ๋ก ๊ด๋ฆฌํด์ฃผ๊ณ ์ถ์ํ๋ ๋ถ๋ถ์ด ๋ง๊ธฐ ๋๋ฌธ์ ๋ ์ฝ๊ณ ํธํ๊ฒ ์ฌ์ฉํ ์ ์๋ค. ๊ฒ๋ค๊ฐ ๋ค๋ฅธ AWS ๋ฆฌ์์ค๋ค๊ณผ์ ์ฐ๊ณ๋ ์ ์ง์๋๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก ํด๋ฌ์คํฐ์ ๋ธ๋ก์ปค๋ฅผ ๊ด๋ฆฌํ ํ์๊ฐ ์๋๋ฐ๋ค, ๊ฒ๋ค๊ฐ ์คํ ์ค์ผ์ผ๋ง ์ฆ๊ฐ/๊ฐ์๊ฐ ๋๋ค ์๋์ผ๋ก ๋๋ค. MSK๋ ์ฆ๊ฐ๋ง ๋๋ค.
์จ๋๋งจ๋ ์๊ธ์ด ์๋จ๊ฒ๋ ์ฅ์ ์ด๋ค.
๊ตฌ์กฐ ๋ฐ ์ ํ
stream์ kafka์ topic์ ๋์๋๋ ์ฉ์ด๊ณ , shard๋ kafak์ partition์ ๋์๋๋ค๊ณ ๋ณด๋ฉด ๋๋ค.
ํ๋ก๋น์ ๋ ๋ชจ๋์ ๊ฒฝ์ฐ์๋ ์ค๋ ๋จ์๋ก ์ค์ผ์ผ๋ง์ด ๊ฐ๋ฅํ๋ค. ์ฌ์ฉ๋์ด ๋๋ฉด ์ค๋๋ฅผ ๋ ๋์์ ๋ถํ ๋ถ์ฐ์ ํ๋ ๊ฒ์ด๋ค.
์ค๋๋ ๊ณ์ ๋น 200๊ฐ๊ฐ ์ง์๋๋๋ฐ, ์์ฒญํ๋ฉด ๋๋ฆด ์ ์๊ณ . ์ค๋๋น 1000TPS๋ก ๋ฐ์ดํฐ๋ฅผ ๋๋ ค๋ฐ์ ์ ์๋ค.
์ค๋๋ฅผ 5๊ฐ ๋์๋๋ฉด 1์ด์ 5000๊ฐ๊น์ง ๋ฐ์ ์ ์๋ ๊ฒ์ด๋ค.
์จ๋๋งจ๋ ๋ชจ๋๋ 200000TPS๋ก ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์ ์๋ค. ๊ทธ ์ด์ ๊ท๋ชจ๋ก ๋์ด๋ ์ ์๋ค๊ณ ๊ฐ์ ํ๋ค๋ฉด ํ๋ก๋น์ ๋ ๋ชจ๋๋ฅผ ์ฌ์ฉํ๋ ํธ์ด ์ข์ ๊ฒ์ด๋ค.
๋น์ฉ
์์ธํ๊ฑด ์๋ ํ์ด์ง๋ฅผ ์ฐธ์กฐํ๋ค.
https://aws.amazon.com/ko/kinesis/data-streams/pricing/
ํ๋ก๋น์ ๋ ๋ชจ๋๋ ์ด๋ฐ์์ผ๋ก ์ค๋๋น ์๊ธ๊ณผ ๋ฐ์ดํฐ๋ฅผ ๋ช๊ฐ๋ ๋ฃ์๋์ง๋ฅผ ๊ธฐ์ค์ผ๋ก ์ธก์ ํ๋ค.
์๊ฐ๋ณด๋ค ๊ทธ๋ ๊ฒ ๋น์ธ์ง๋ ์์๋ค. ์ค๋๊ฐ ํ๋๋ง ๋ ์๋ค๋ฉด ์ค๋ ๋น์ฉ์ ํ๋ฌ์ 14๋ฌ๋ฌ ์ ๋๋ง ๋์จ๋ค.
์จ๋๋งจ๋๋ ์คํธ๋ฆผ ๋จ์, ๊ทธ๋ฆฌ๊ณ ์ผ๋ง๋ ๋ฐ์ดํฐ๋ฅผ ๋ฃ๊ณ ์ฝ์ด์๋์ง๋ก ๋น์ฉ์ ๋ถ๊ณผํ๋ค.
์ด๊ฑด ๊ท๋ชจ๊ฐ ๋งค์ฐ ์์๋๋ ์จ๋๋งจ๋๊ฐ ์คํ๋ ค ๋น์ ๊ฒ ๊ฐ๋ค.
์คํธ๋ฆผ ๋ง๋ค๊ธฐ
๊ธฐ๋ณธ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ผ๋ก ๋จผ์ ๋ง๋ค์ด๋ณด๊ฒ ๋ค.

์ด๋ฆ ์ ์ง๊ณ , ์จ๋๋งจ๋๋ก ํ ์ง ํ๋ก๋น์ ๋์ผ๋ก ํ ์ง๋ง ์ ํ๋ฉด ๋๋ค.

๋๋ ํ๋ก๋น์ ๋์ผ๋ก ํ๊ฒ ๋ค.
๊ทธ๋์ ์ด๋ฐ ๋ชจ์์ผ๋ก ๋ง๋ค์ด์ง๋ฉด ์ผ๋จ ์์ฑ์ ๋ ๊ฒ์ด๋ค.

๋ฐ์ดํฐ ์์๋ณด๊ธฐ (Producer)
aws-sdk์ putRecord๋ฅผ ์ด์ฉํ๋ฉด ๋ณ๋ก ์ด๋ ค์ธ ๊ฒ ์์ด ๋ฐ์ดํฐ๋ฅผ ์์ ์ ์๋ค.
๋๋ Lambda๋ก ์์ฑํด์ ๋๋ ธ๋ค.
const aws = require('aws-sdk');
const kinesis = new aws.Kinesis();
async function putEvent(event) {
await kinesis.putRecord({
Data: JSON.stringify(event),
PartitionKey: '1',
StreamName: 'test_queue',
}).promise();
}
exports.handler = async (event) => {
await putEvent({
user_id: 1,
product_id: 1,
});
await putEvent({
user_id: 2,
product_id: 1,
});
await putEvent({
user_id: 3,
product_id: 1,
});
await putEvent({
user_id: 1,
product_id: 2,
});
const response = {
statusCode: 200,
body: JSON.stringify('Hello from Lambda!'),
};
return response;
};
ํํฐ์
ํค๋ ํด์ฑํด์ ์ด๋ค ์ค๋์ ๋ฃ์์ง๋ฅผ ๊ฒฐ์ ํ ๋, ๊ทธ ํด์ ํค๊ฐ์ผ๋ก ์ฌ์ฉ๋๋ ๊ฐ์ด๋ค.
์ค๋ณต๋์ง๋ง ์๊ฒ ๋ฃ์ผ๋ฉด ๋๊ณ , ๊ทธ๋๊ทธ๋ ๋ง๋ requestId๊ฐ์๊ฑธ ๋ฃ์ด๋ ๋๋ค.
๊ทธ๋ฌ๋ฉด ์ด๋ ๊ฒ
์ฝ์์์๋ ํ์ธ์ ํ ์ ์๋ค. "์์ ์์น"๋ ํธ๋ฆผ ํธ๋ผ์ด์ฆ์ผ๋ก ํด์ผ ์ ์ฒด๋ชฉ๋ก์ ๋ณผ ์ ์๋ค.
๋ฐ์ดํฐ ์ฝ์ด๋ณด๊ธฐ (Consumer)
์ด๋ฒ์ Lambda๋ฅผ ์ด์ฉํด์ ์์๋ ๊ฒ๋ค์ ์ฝ์ด๋ณด๊ฒ ๋ค. ์ฐ๋๊ฒ๋ณด๋ค ์ฝ์ด๋ค์ด๋๊ฒ ์ข๋ ๋ณต์กํ๋ค.
const aws = require('aws-sdk');
const kinesis = new aws.Kinesis();
async function getShardIterator(ShardId) {
return (await kinesis.getShardIterator({
ShardId,
ShardIteratorType: 'TRIM_HORIZON',
StreamName: 'test_queue',
}).promise())?.ShardIterator;
}
async function getRecords(shardIterator) {
return (await kinesis.getRecords({
Limit: 10,
ShardIterator: shardIterator,
}).promise())?.Records;
}
async function getShards() {
return (await kinesis.listShards({
StreamName: 'test_queue',
}).promise()).Shards;
}
exports.handler = async (event) => {
const shards = await getShards();
console.log(shards);
const shardIterator = await getShardIterator(shards[0].ShardId);
const records = await getRecords(shardIterator);
for(const record of records) {
let data = Buffer.from(record.Data).toString();
console.log(data);
}
const response = {
statusCode: 200,
body: JSON.stringify('Hello from Lambda!'),
};
return response;
};
๋จผ์ ์ฝ์ Shard๋ฅผ ๊ฒฐ์ ํ๊ณ , ShardIterator๋ฅผ ๋ง๋ ๋ค์์ getRecords๋ฅผ ํตํด์ ๊ฐ์ ธ์ฌ ์ ์๋ค.
๊ทธ๋์ consumer๋ shard๋ฅผ ๊ฐ์ ํ๋์ฉ๋ง ๋ด๋นํด์ ์ฒ๋ฆฌํ๋๊ฒ ์ผ๋ฐ์ ์ด๋ค.
๊ทผ๋ฐ shard์ ๋ง์ถฐ์ consumer๋ ์คํ ์ค์ผ์ผ๋ง์ ํ๋ ค๋ฉด.. ๊ทธ๊ฒ๋ ๊ฝค ์์ด ๋ง์ด ๊ฐ๋ค. ๊ทธ๋์ ์ผ๋ฐ์ ์ผ๋ก Lambda๋ก ํธ๋ฆฌ๊ฑฐ๋ฅผ ๊ฑธ์ด์ ์ฒ๋ฆฌํ๋๊ฒ ์ผ๋ฐ์ ์ธ๋ฏํ๋ค.
ShardIteratorType ๋ช๊ฐ์ง ํ์ ์ด ์๋๋ฐ, ๋์ถฉ ์ด๋ ๋ค.
- AT_SEQUENCE_NUMBER: ์ ๋ฌ๋ ์ํ์ค ๋ฒํธ๋ถํฐ ์์ฐจ์ ์ผ๋ก ์ฝ์ด์ด.
- AFTER_SEQUENCE_NUMBER: ์ ๋ฌ๋ ์ํ์ค ๋ฒํธ ์ดํ๋ถํฐ ์์ฐจ์ ์ผ๋ก ์ฝ์ด์ด.
- AT_TIMESTAMP: ์ ๋ฌ๋ ํ์์คํฌํ ์์ ๋ถํฐ ์คํธ๋ฆฌ๋ฐ์ ์์๋๋ก ์ฝ์ด์ด
- TRIM_HORIZON: ๊ฐ์ฅ ์ค๋๋ ๋ฐ์ดํฐ๋ถํฐ ์ฝ์ด์ด
- LATEST: ์ค๋์ ๊ฐ์ฅ ์ต๊ทผ ๋ ์ฝ๋ ๋ฐ๋ก ๋ค์๋ถํฐ ์ฝ์ด์ด. (Iterator๋ฅผ ๋ง๋ ์์ ๋ถํฐ ์๋ก ๋ค์ด์จ ๊ฒ๋ง ๋ฐ์)
๋ ์ฝ๋๋ ์์๋ก ์ญ์ ํ ์๊ฐ ์๋ค. ์ค์ ๋ ๋ณด์กด ๊ธฐํ์ด ์ง๋๋ฉด ์๋์ผ๋ก ์ญ์ ๋ ๋ฟ์ด๋ค.
๊ทธ๋์ ํ์ค์ ์ผ๋ก TRIM_HORIZON ๊ฐ์๊ฑด ์ธ์ผ์ด ๋ฑํ ์์ ๊ฒ์ด๋ค.
์ด์จ๋ ์ ๋๋ก ์คํํ๋ฉด
๊ฐ์ ธ์ค๊ธด ์ ๊ฐ์ ธ์จ๋ค.
Lambda๋ก Consumer ๊ตฌ์ถ
AWS Lambda๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฐ์ดํฐ consume ์ฒ๋ฆฌ๋ฅผ ๊ฝค๋ ๊ฐํธํ๊ณ ์ฝ๊ฒ ์ฒ๋ฆฌํ ์ ์๋ค.
consumer์ฉ ํจ์๋ฅผ ๋จผ์ ๋ง๋ค๊ณ ,
ํจ์ ์ญํ ๊ถํ์ ํค๋ค์์ค ์ ๊ทผ๊ถํ์ ๋ฃ์ด์ค๋ค.
๊ทธ๋ฆฌ๊ณ ํธ๋ฆฌ๊ฑฐ๋ก ํค๋ค์์ค๋ฅผ ์ถ๊ฐํด์ฃผ๋ฉด ๋๋ค.
์คํธ๋ฆผ ๊ณ ๋ฅด๊ณ , ๋ฐฐ์น ํฌ๊ธฐ๋ฅผ ์ ํ๋ค. ์ ๋ฌ๋ฉด ํ๋ฒ์ 100๊ฐ์ฉ ๋ชจ์์ ๋ค์ด์จ๋ค.
๊ทธ๋ฆฌ๊ณ ์ต์ ํญ๋ชฉ๋ค๋ง ์์๋๋ก ๋ค์ด์ค๋๋ก ํ๋ค.
๊ทธ๋ผ ์ด๋ ๊ฒ ์ถ๊ฐ๋ ๊ฒ์ด๋ค.

๋ฐ์๋จน์ consumer ์ฝ๋๋ฅผ ์์ฑํด๋ณด์.
์ผ๋จ ๋ก๊ทธ๋ง ์ฐ๊ฒ ํ๋ค.
Records๋ผ๋ ํญ๋ชฉ์ผ๋ก ๋ค์ด์ค๊ณ , data๋ base64๋ก ๋ค์ด์์ ๋์ฝ๋ฉ์ ํด์ค์ผ ํ๋ค.
exports.handler = async (event) => {
const records = event.Records;
console.log('๋ ์ฝ๋ ๋ค์ด์ด!!');
for(const record of records) {
record.kinesis.data = Buffer.from(record.kinesis.data, "base64").toString('utf8');
console.log(record);
}
// TODO implement
const response = {
statusCode: 200,
body: JSON.stringify('Hello from Lambda!'),
};
return response;
};
๊ทธ๋ฆฌ๊ณ producer๋ก ๋ ์ฝ๋๋ฅผ ๋ฃ์ด์ฃผ๋ฉด

ํธ๋ฆฌ๊ฑฐ๋ก ๋๋ค๊ฐ ํธ์ถ๋๋ฉด์ ์คํ์ด ๋ ๊ฒ์ด๋ค.
์ ๊ฑธ ํ์์ ๋ฐ๋ผ ์ ์ ํ ์ฌ์ฉํ๋ฉด ๋๋ค.
๋ค๋ฅธ ๋ฐฉ๋ฒ: KCL
KCL์ Kinesis Client Library์ ์ถ์ฝ์ธ๋ฐ, ์ด๋ฐ shard๋ณ consumer์ ๊ด๋ฆฌ ๊ฐ์ ์ ํํ๋ ๊ตฌ์กฐ๋ค์ ์ด๋์ ๋ ์ ๊ณตํด์ฃผ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค.
์ ์ง์๋๋๊ฑด Java, Python ์ ๋์ธ ๊ฒ ๊ฐ๋ค.
์ด๋ฐ๊ฑด ์ถํ ๋ณ๋ ํฌ์คํธ๋ก ๋ค๋ค๋ณด๊ฒ ๋ค.
์ฐธ์กฐ
https://jaemunbro.medium.com/aws-%EB%A9%94%EC%8B%9C%EC%A7%95%EC%84%9C%EB%B9%84%EC%8A%A4-%EB%B9%84%EA%B5%90-kinesis-sqs-sns-ab397a07cb1d
https://docs.aws.amazon.com/ko_kr/streams/latest/dev/service-sizes-and-limits.html