[AWS] Kinesis

Kinesis๋Š” Kafka์˜ aws ๋ฒ„์ „์ด๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

์šฉ๋„ ์ž์ฒด๋Š” ์นดํ”„์นด๋ฅผ ๊ทธ๋Œ€๋กœ ์˜ฌ๋ ค์ฃผ๋Š” AWS MSK์™€ ๋™์ผํ•˜๋‹ค.
๋‹ค๋งŒ AWS์—์„œ ์ž์ฒด์ ์œผ๋กœ ๊ด€๋ฆฌํ•ด์ฃผ๊ณ  ์ถ”์ƒํ™”๋œ ๋ถ€๋ถ„์ด ๋งŽ๊ธฐ ๋•Œ๋ฌธ์— ๋” ์‰ฝ๊ณ  ํŽธํ•˜๊ฒŒ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ฒŒ๋‹ค๊ฐ€ ๋‹ค๋ฅธ AWS ๋ฆฌ์†Œ์Šค๋“ค๊ณผ์˜ ์—ฐ๊ณ„๋„ ์ž˜ ์ง€์›๋œ๋‹ค.

๊ธฐ๋ณธ์ ์œผ๋กœ ํด๋Ÿฌ์Šคํ„ฐ์™€ ๋ธŒ๋กœ์ปค๋ฅผ ๊ด€๋ฆฌํ•  ํ•„์š”๊ฐ€ ์—†๋Š”๋ฐ๋‹ค, ๊ฒŒ๋‹ค๊ฐ€ ์˜คํ† ์Šค์ผ€์ผ๋ง ์ฆ๊ฐ€/๊ฐ์†Œ๊ฐ€ ๋‘˜๋‹ค ์ž๋™์œผ๋กœ ๋œ๋‹ค. MSK๋Š” ์ฆ๊ฐ€๋งŒ ๋œ๋‹ค.

์˜จ๋””๋งจ๋“œ ์š”๊ธˆ์ด ์žˆ๋‹จ๊ฒƒ๋„ ์žฅ์ ์ด๋‹ค.




๊ตฌ์กฐ ๋ฐ ์ œํ•œ

stream์€ kafka์˜ topic์— ๋Œ€์‘๋˜๋Š” ์šฉ์–ด๊ณ , shard๋Š” kafak์˜ partition์— ๋Œ€์‘๋œ๋‹ค๊ณ  ๋ณด๋ฉด ๋œ๋‹ค.

ํ”„๋กœ๋น„์ €๋‹ ๋ชจ๋“œ์˜ ๊ฒฝ์šฐ์—๋Š” ์ƒค๋“œ ๋‹จ์œ„๋กœ ์Šค์ผ€์ผ๋ง์ด ๊ฐ€๋Šฅํ•˜๋‹ค. ์‚ฌ์šฉ๋Ÿ‰์ด ๋Š˜๋ฉด ์ƒค๋“œ๋ฅผ ๋” ๋„์›Œ์„œ ๋ถ€ํ•˜ ๋ถ„์‚ฐ์„ ํ•˜๋Š” ๊ฒƒ์ด๋‹ค.
์ƒค๋“œ๋Š” ๊ณ„์ •๋‹น 200๊ฐœ๊ฐ€ ์ง€์›๋˜๋Š”๋ฐ, ์š”์ฒญํ•˜๋ฉด ๋Š˜๋ฆด ์ˆ˜ ์žˆ๊ณ . ์ƒค๋“œ๋‹น 1000TPS๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋•Œ๋ ค๋ฐ•์„ ์ˆ˜ ์žˆ๋‹ค.
์ƒค๋“œ๋ฅผ 5๊ฐœ ๋„์›Œ๋‘๋ฉด 1์ดˆ์— 5000๊ฐœ๊นŒ์ง€ ๋ฐ•์„ ์ˆ˜ ์žˆ๋Š” ๊ฒƒ์ด๋‹ค.

์˜จ๋””๋งจ๋“œ ๋ชจ๋“œ๋Š” 200000TPS๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ•์„ ์ˆ˜ ์žˆ๋‹ค. ๊ทธ ์ด์ƒ ๊ทœ๋ชจ๋กœ ๋Š˜์–ด๋‚  ์ˆ˜ ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•œ๋‹ค๋ฉด ํ”„๋กœ๋น„์ €๋‹ ๋ชจ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ํŽธ์ด ์ข‹์„ ๊ฒƒ์ด๋‹ค.




๋น„์šฉ

์ž์„ธํ•œ๊ฑด ์•„๋ž˜ ํŽ˜์ด์ง€๋ฅผ ์ฐธ์กฐํ•œ๋‹ค.
https://aws.amazon.com/ko/kinesis/data-streams/pricing/

ํ”„๋กœ๋น„์ €๋‹ ๋ชจ๋“œ๋Š” ์ด๋Ÿฐ์‹์œผ๋กœ ์ƒค๋“œ๋‹น ์š”๊ธˆ๊ณผ ๋ฐ์ดํ„ฐ๋ฅผ ๋ช‡๊ฐœ๋‚˜ ๋„ฃ์—ˆ๋Š”์ง€๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ธก์ •ํ•œ๋‹ค.

์ƒ๊ฐ๋ณด๋‹ค ๊ทธ๋ ‡๊ฒŒ ๋น„์‹ธ์ง€๋Š” ์•Š์•˜๋‹ค. ์ƒค๋“œ๊ฐ€ ํ•˜๋‚˜๋งŒ ๋– ์žˆ๋‹ค๋ฉด ์ƒค๋“œ ๋น„์šฉ์€ ํ•œ๋‹ฌ์— 14๋‹ฌ๋Ÿฌ ์ •๋„๋งŒ ๋‚˜์˜จ๋‹ค.

์˜จ๋””๋งจ๋“œ๋Š” ์ŠคํŠธ๋ฆผ ๋‹จ์œ„, ๊ทธ๋ฆฌ๊ณ  ์–ผ๋งˆ๋‚˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋„ฃ๊ณ  ์ฝ์–ด์™”๋Š”์ง€๋กœ ๋น„์šฉ์„ ๋ถ€๊ณผํ•œ๋‹ค.

์ด๊ฑด ๊ทœ๋ชจ๊ฐ€ ๋งค์šฐ ์ž‘์„๋•Œ๋Š” ์˜จ๋””๋งจ๋“œ๊ฐ€ ์˜คํžˆ๋ ค ๋น„์Œ€ ๊ฒƒ ๊ฐ™๋‹ค.




์ŠคํŠธ๋ฆผ ๋งŒ๋“ค๊ธฐ

๊ธฐ๋ณธ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์œผ๋กœ ๋จผ์ € ๋งŒ๋“ค์–ด๋ณด๊ฒ ๋‹ค.

์ด๋ฆ„ ์ž˜ ์ง“๊ณ , ์˜จ๋””๋งจ๋“œ๋กœ ํ• ์ง€ ํ”„๋กœ๋น„์ €๋‹์œผ๋กœ ํ• ์ง€๋งŒ ์ •ํ•˜๋ฉด ๋œ๋‹ค.

๋‚˜๋Š” ํ”„๋กœ๋น„์ €๋‹์œผ๋กœ ํ•˜๊ฒ ๋‹ค.

๊ทธ๋ž˜์„œ ์ด๋Ÿฐ ๋ชจ์–‘์œผ๋กœ ๋งŒ๋“ค์–ด์ง€๋ฉด ์ผ๋‹จ ์ƒ์„ฑ์€ ๋œ ๊ฒƒ์ด๋‹ค.




๋ฐ์ดํ„ฐ ์Œ“์•„๋ณด๊ธฐ (Producer)

aws-sdk์˜ putRecord๋ฅผ ์ด์šฉํ•˜๋ฉด ๋ณ„๋กœ ์–ด๋ ค์šธ ๊ฒƒ ์—†์ด ๋ฐ์ดํ„ฐ๋ฅผ ์Œ“์„ ์ˆ˜ ์žˆ๋‹ค.
๋‚˜๋Š” Lambda๋กœ ์ž‘์„ฑํ•ด์„œ ๋Œ๋ ธ๋‹ค.

const aws = require('aws-sdk');

const kinesis = new aws.Kinesis();

async function putEvent(event) {
    await kinesis.putRecord({
      Data: JSON.stringify(event),
      PartitionKey: '1', 
      StreamName: 'test_queue',
    }).promise();
}

exports.handler = async (event) => {
    await putEvent({
      user_id: 1,
      product_id: 1,
    });

    await putEvent({
      user_id: 2,
      product_id: 1,
    });

    await putEvent({
      user_id: 3,
      product_id: 1,
    });

    await putEvent({
      user_id: 1,
      product_id: 2,
    });

    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from Lambda!'),
    };
    return response;
};

ํŒŒํ‹ฐ์…˜ ํ‚ค๋Š” ํ•ด์‹ฑํ•ด์„œ ์–ด๋–ค ์ƒค๋“œ์— ๋„ฃ์„์ง€๋ฅผ ๊ฒฐ์ •ํ• ๋•Œ, ๊ทธ ํ•ด์‹œ ํ‚ค๊ฐ’์œผ๋กœ ์‚ฌ์šฉ๋˜๋Š” ๊ฐ’์ด๋‹ค.
์ค‘๋ณต๋˜์ง€๋งŒ ์•Š๊ฒŒ ๋„ฃ์œผ๋ฉด ๋˜๊ณ , ๊ทธ๋•Œ๊ทธ๋•Œ ๋งŒ๋“  requestId๊ฐ™์€๊ฑธ ๋„ฃ์–ด๋„ ๋œ๋‹ค.

๊ทธ๋Ÿฌ๋ฉด ์ด๋ ‡๊ฒŒ

์ฝ˜์†”์—์„œ๋„ ํ™•์ธ์„ ํ•  ์ˆ˜ ์žˆ๋‹ค. "์‹œ์ž‘ ์œ„์น˜"๋Š” ํŠธ๋ฆผ ํ˜ธ๋ผ์ด์ฆŒ์œผ๋กœ ํ•ด์•ผ ์ „์ฒด๋ชฉ๋ก์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.




๋ฐ์ดํ„ฐ ์ฝ์–ด๋ณด๊ธฐ (Consumer)

์ด๋ฒˆ์—” Lambda๋ฅผ ์ด์šฉํ•ด์„œ ์Œ“์•„๋‘” ๊ฒƒ๋“ค์„ ์ฝ์–ด๋ณด๊ฒ ๋‹ค. ์“ฐ๋Š”๊ฒƒ๋ณด๋‹ค ์ฝ์–ด๋“ค์ด๋Š”๊ฒŒ ์ข€๋” ๋ณต์žกํ•˜๋‹ค.

const aws = require('aws-sdk');

const kinesis = new aws.Kinesis();

async function getShardIterator(ShardId) {
    return (await kinesis.getShardIterator({
        ShardId, 
        ShardIteratorType: 'TRIM_HORIZON',
        StreamName: 'test_queue',
    }).promise())?.ShardIterator;
}

async function getRecords(shardIterator) {
    return (await kinesis.getRecords({
      Limit: 10,
      ShardIterator: shardIterator,
    }).promise())?.Records;
}

async function getShards() {
    return (await kinesis.listShards({
        StreamName: 'test_queue',
    }).promise()).Shards;
}

exports.handler = async (event) => {
    const shards = await getShards();
    console.log(shards);

    const shardIterator = await getShardIterator(shards[0].ShardId);
    const records = await getRecords(shardIterator);

    for(const record of records) {
        let data = Buffer.from(record.Data).toString();
        console.log(data);
    }

    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from Lambda!'),
    };
    return response;
};

๋จผ์ € ์ฝ์„ Shard๋ฅผ ๊ฒฐ์ •ํ•˜๊ณ , ShardIterator๋ฅผ ๋งŒ๋“  ๋‹ค์Œ์— getRecords๋ฅผ ํ†ตํ•ด์„œ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

๊ทธ๋ž˜์„œ consumer๋Š” shard๋ฅผ ๊ฐ์ž ํ•˜๋‚˜์”ฉ๋งŒ ๋‹ด๋‹นํ•ด์„œ ์ฒ˜๋ฆฌํ•˜๋Š”๊ฒŒ ์ผ๋ฐ˜์ ์ด๋‹ค.
๊ทผ๋ฐ shard์— ๋งž์ถฐ์„œ consumer๋„ ์˜คํ† ์Šค์ผ€์ผ๋ง์„ ํ•˜๋ ค๋ฉด.. ๊ทธ๊ฒƒ๋„ ๊ฝค ์†์ด ๋งŽ์ด ๊ฐ„๋‹ค. ๊ทธ๋ž˜์„œ ์ผ๋ฐ˜์ ์œผ๋กœ Lambda๋กœ ํŠธ๋ฆฌ๊ฑฐ๋ฅผ ๊ฑธ์–ด์„œ ์ฒ˜๋ฆฌํ•˜๋Š”๊ฒŒ ์ผ๋ฐ˜์ ์ธ๋“ฏํ•˜๋‹ค.

ShardIteratorType ๋ช‡๊ฐ€์ง€ ํƒ€์ž…์ด ์žˆ๋Š”๋ฐ, ๋Œ€์ถฉ ์ด๋ ‡๋‹ค.

  1. AT_SEQUENCE_NUMBER: ์ „๋‹ฌ๋œ ์‹œํ€€์Šค ๋ฒˆํ˜ธ๋ถ€ํ„ฐ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฝ์–ด์˜ด.
  2. AFTER_SEQUENCE_NUMBER: ์ „๋‹ฌ๋œ ์‹œํ€€์Šค ๋ฒˆํ˜ธ ์ดํ›„๋ถ€ํ„ฐ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฝ์–ด์˜ด.
  3. AT_TIMESTAMP: ์ „๋‹ฌ๋œ ํƒ€์ž„์Šคํƒฌํ”„ ์‹œ์ ๋ถ€ํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ์„ ์ˆœ์„œ๋Œ€๋กœ ์ฝ์–ด์˜ด
  4. TRIM_HORIZON: ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ๋ถ€ํ„ฐ ์ฝ์–ด์˜ด
  5. LATEST: ์ƒค๋“œ์˜ ๊ฐ€์žฅ ์ตœ๊ทผ ๋ ˆ์ฝ”๋“œ ๋ฐ”๋กœ ๋‹ค์Œ๋ถ€ํ„ฐ ์ฝ์–ด์˜ด. (Iterator๋ฅผ ๋งŒ๋“  ์‹œ์ ๋ถ€ํ„ฐ ์ƒˆ๋กœ ๋“ค์–ด์˜จ ๊ฒƒ๋งŒ ๋ฐ›์Œ)

๋ ˆ์ฝ”๋“œ๋Š” ์ž„์˜๋กœ ์‚ญ์ œํ•  ์ˆ˜๊ฐ€ ์—†๋‹ค. ์„ค์ •๋œ ๋ณด์กด ๊ธฐํ•œ์ด ์ง€๋‚˜๋ฉด ์ž๋™์œผ๋กœ ์‚ญ์ œ๋  ๋ฟ์ด๋‹ค.
๊ทธ๋ž˜์„œ ํ˜„์‹ค์ ์œผ๋กœ TRIM_HORIZON ๊ฐ™์€๊ฑด ์“ธ์ผ์ด ๋”ฑํžˆ ์—†์„ ๊ฒƒ์ด๋‹ค.

์–ด์จŒ๋“  ์ €๋Œ€๋กœ ์‹คํ–‰ํ•˜๋ฉด

๊ฐ€์ ธ์˜ค๊ธด ์ž˜ ๊ฐ€์ ธ์˜จ๋‹ค.




Lambda๋กœ Consumer ๊ตฌ์ถ•

AWS Lambda๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐ consume ์ฒ˜๋ฆฌ๋ฅผ ๊ฝค๋‚˜ ๊ฐ„ํŽธํ•˜๊ณ  ์‰ฝ๊ฒŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.
consumer์šฉ ํ•จ์ˆ˜๋ฅผ ๋จผ์ € ๋งŒ๋“ค๊ณ ,

ํ•จ์ˆ˜ ์—ญํ•  ๊ถŒํ•œ์— ํ‚ค๋„ค์‹œ์Šค ์ ‘๊ทผ๊ถŒํ•œ์„ ๋„ฃ์–ด์ค€๋‹ค.

๊ทธ๋ฆฌ๊ณ  ํŠธ๋ฆฌ๊ฑฐ๋กœ ํ‚ค๋„ค์‹œ์Šค๋ฅผ ์ถ”๊ฐ€ํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

์ŠคํŠธ๋ฆผ ๊ณ ๋ฅด๊ณ , ๋ฐฐ์น˜ ํฌ๊ธฐ๋ฅผ ์ •ํ•œ๋‹ค. ์ €๋Ÿฌ๋ฉด ํ•œ๋ฒˆ์— 100๊ฐœ์”ฉ ๋ชจ์•„์„œ ๋“ค์–ด์˜จ๋‹ค.
๊ทธ๋ฆฌ๊ณ  ์ตœ์‹  ํ•ญ๋ชฉ๋“ค๋งŒ ์ˆœ์„œ๋Œ€๋กœ ๋“ค์–ด์˜ค๋„๋ก ํ–ˆ๋‹ค.

๊ทธ๋Ÿผ ์ด๋ ‡๊ฒŒ ์ถ”๊ฐ€๋  ๊ฒƒ์ด๋‹ค.

๋ฐ›์•„๋จน์„ consumer ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•ด๋ณด์ž.
์ผ๋‹จ ๋กœ๊ทธ๋งŒ ์ฐ๊ฒŒ ํ–ˆ๋‹ค.
Records๋ผ๋Š” ํ•ญ๋ชฉ์œผ๋กœ ๋“ค์–ด์˜ค๊ณ , data๋Š” base64๋กœ ๋“ค์–ด์™€์„œ ๋””์ฝ”๋”ฉ์„ ํ•ด์ค˜์•ผ ํ•œ๋‹ค.

exports.handler = async (event) => {
    const records = event.Records;

    console.log('๋ ˆ์ฝ”๋“œ ๋“ค์–ด์˜ด!!');

    for(const record of records) {
       record.kinesis.data = Buffer.from(record.kinesis.data, "base64").toString('utf8');

       console.log(record);   
    }

    // TODO implement
    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from Lambda!'),
    };
    return response;
};

๊ทธ๋ฆฌ๊ณ  producer๋กœ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋„ฃ์–ด์ฃผ๋ฉด

ํŠธ๋ฆฌ๊ฑฐ๋กœ ๋žŒ๋‹ค๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด์„œ ์‹คํ–‰์ด ๋  ๊ฒƒ์ด๋‹ค.

์ €๊ฑธ ํ•„์š”์— ๋”ฐ๋ผ ์ ์ ˆํžˆ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.




๋‹ค๋ฅธ ๋ฐฉ๋ฒ•: KCL

KCL์€ Kinesis Client Library์˜ ์ถ•์•ฝ์ธ๋ฐ, ์ด๋Ÿฐ shard๋ณ„ consumer์˜ ๊ด€๋ฆฌ ๊ฐ™์€ ์ •ํ˜•ํ™”๋œ ๊ตฌ์กฐ๋“ค์„ ์–ด๋А์ •๋„ ์ œ๊ณตํ•ด์ฃผ๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋‹ค.
์ž˜ ์ง€์›๋˜๋Š”๊ฑด Java, Python ์ •๋„์ธ ๊ฒƒ ๊ฐ™๋‹ค.

์ด๋Ÿฐ๊ฑด ์ถ”ํ›„ ๋ณ„๋„ ํฌ์ŠคํŠธ๋กœ ๋‹ค๋ค„๋ณด๊ฒ ๋‹ค.



์ฐธ์กฐ
https://jaemunbro.medium.com/aws-%EB%A9%94%EC%8B%9C%EC%A7%95%EC%84%9C%EB%B9%84%EC%8A%A4-%EB%B9%84%EA%B5%90-kinesis-sqs-sns-ab397a07cb1d
https://docs.aws.amazon.com/ko_kr/streams/latest/dev/service-sizes-and-limits.html