[PostgreSQL] queue 시스템 구현해보기 (with Rust)
요즈음에는 큐잉 시스템을 만든다고 하면 kafka니 nats니 하는 전용 시스템을 꺼내들 때가 많은데, 사실 부하가 아주 많은 것이 아니라면 RDB만으로도 꽤 넓은 범위의 요구사항을 만족시킬 수 있다.
특히나 PostgreSQL은 이런 처리에 있어서도 꽤 꽨찮은 입지를 가지고 있다. 유연한 Lock 처리나, 자체 구독 모델도 제공하기 때문이다.
여기다가 트리거도 버무리고 하면 그럭저럭 쓸만하다.
이번 포스트에서는 Rust로 작성된 예제를 통해 간단한 큐 Consume 시스템을 만드는 법을 다뤄보겠다.
언어는 Rust지만 rust를 잘 몰라도 맥락 이해는 그렇게 어렵지 않을 것이다. 핵심은 SQL의 응용이다.
프로젝트 기본 구성
필요한 종속성은 아래 4개 정도다.
[dependencies]
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "chrono" ] }
tokio = { version = "1", features = [ "full" ] }
anyhow = "1.0.95"
chrono = "0.4.39"
그리고 데이터베이스 계정도 적당히 하나 만들어준다.
CREATE USER admin WITH PASSWORD 'q1w2e3r4';
ALTER USER admin WITH SUPERUSER;
데이터베이스 연결까지만 확인하는 기본 골격이다.
use sqlx::{postgres::PgPoolOptions, Pool};
const CONNECTION_URL: &str = "postgres://admin:q1w2e3r4@localhost:5432/postgres";
pub async fn get_connection_pool(connection_url: &str) -> anyhow::Result<Pool<sqlx::Postgres>> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(connection_url)
.await?;
Ok(pool)
}
#[tokio::main]
async fn main() {
let connection_pool = get_connection_pool(CONNECTION_URL).await.unwrap();
let result = sqlx::query_as::<_, (i32,)>("SELECT 1")
.fetch_all(&connection_pool)
.await
.unwrap();
println!("{:?}", result);
}
잘 돌아가는지 보고, 다음으로 넘어가자.
Queue 테이블 설계해보기
Queueing에 사용할 테이블부터 설계해보자.
CREATE TABLE queue (
id VARCHAR(50) PRIMARY KEY,
topic VARCHAR(50),
payload TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
테이블 구조는 사용사례에 따라 달라질 수 있겠지만, 대충 이런 식으로 만들 수 있을 것이다.
- id는 uuid 같은것을 넣을 것을 고려해서 문자열 타입으로 했는데, 분산시스템을 고려하지 않는다면 bigint 같은걸로 하고 sequence 박아도 된다.
- topic은 각 이벤트에 대한 주제를 정의한다.
- payload가 각 이벤트에 대한 부가 데이터다. 타입은 jsonb 같은걸 박아도 된다.
- created_at이 생성 시점이다. 아마 대부분은 이걸 기준으로 정렬해서 가져올 것이다.
CREATE INDEX queue_topic_created_at ON queue (topic, created_at);
인덱스도 아마 이정도 걸면 될 것 같다.
where topic = '?'
order by created_at asc
limit N
사용 패턴도 일반적으로 이 정도 될 거다.
그리고 만들었다.
방금 정의한 테이블에 맞춰서 구조체 타입도 대충 정의했다.
Consume Loop 구현
이제 본격적으로 이벤트 루프를 만들어보자.
가장 먼저 할건 메세지를 받아오는 것이다.
그냥 특정 토픽에 대해서 오래된 것부터(FIFO) 하나씩 가져오도록 했다.
이벤트를 성공적으로 처리했다면 삭제도 해야 한다.
적당히 만들었다.
각 개별 이벤트를 처리하는 부분도 필요할 것이다.
테스트니까 그냥 로그 정도만 찍었다.
그리고 조합.
이벤트를 밀어넣으면
잘 읽고 삭제하는 것을 볼 수 있을 것이다.
다음은 전체 코드다.
use sqlx::{postgres::PgPoolOptions, Pool};
const CONNECTION_URL: &str = "postgres://admin:q1w2e3r4@localhost:5432/postgres";
pub async fn get_connection_pool(connection_url: &str) -> anyhow::Result<Pool<sqlx::Postgres>> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(connection_url)
.await?;
Ok(pool)
}
#[derive(Debug)]
pub struct Message {
pub id: String,
pub topic: String,
pub payload: String,
pub created_at: chrono::NaiveDateTime,
}
async fn receive_messages(pool: &Pool<sqlx::Postgres>) -> anyhow::Result<Vec<Message>> {
let list = sqlx::query_as::<_, (String, String, String, chrono::NaiveDateTime)>(
r#"
SELECT id, topic, payload, created_at
FROM queue
WHERE topic = 'topic1'
ORDER BY created_at ASC
LIMIT 1
"#,
)
.fetch_all(pool)
.await?;
let messages = list
.into_iter()
.map(|(id, topic, payload, created_at)| Message {
id,
topic,
payload,
created_at,
})
.collect();
Ok(messages)
}
async fn delete_messages(pool: &Pool<sqlx::Postgres>, ids: Vec<String>) -> anyhow::Result<()> {
let ids = ids
.iter()
.map(|id| id.as_str())
.collect::<Vec<&str>>()
.join("', '");
sqlx::query(
r#"
DELETE FROM queue
WHERE id IN ($1)
"#,
)
.bind(ids)
.execute(pool)
.await?;
Ok(())
}
async fn process_message(message: Message) -> anyhow::Result<()> {
println!("Processing message: {:?}", message);
Ok(())
}
#[tokio::main]
async fn main() {
let connection_pool = get_connection_pool(CONNECTION_URL).await.unwrap();
// 1. Consume 루프
loop {
// 2. 메세지를 받아옴
let messages = receive_messages(&connection_pool).await.unwrap();
let mut message_ids = vec![];
for message in messages {
message_ids.push(message.id.clone());
// 3. 메세지를 하나씩 처리
process_message(message).await.unwrap();
}
// 4. 다 처리됐다면 지움
delete_messages(&connection_pool, message_ids)
.await
.unwrap();
}
}배타적 Read 처리
단일 Consumer만 쓴다면 괜찮지만, 동일 주제에 대해서 Consumer를 병렬로 처리해야한다면 위의 구조로는 문제가 있을 수 있다.
그러니까, 여러개의 consumer가 같은 이벤트를 들고서 중복처리를 할 수도 있단 것이다.
그래서 지금 consumer 2개 띄워서 날려보면

양쪽으로 다 들어오는 것을 볼 수 있다.
이건 상황에 따라서 꽤 큰 문제가 될 수도 있다.
Row level Lock과 Skip Locked를 사용하면 읽기에 대해서 락을 걸고, 다른 읽기는 락이 걸린 이벤트는 무시하고 조회하는 식으로 중복 처리 문제를 없앨 수 있다.
자세한 것은 별도 포스트를 참조한다.
https://blog.naver.com/sssang97/222995953151
아무튼, 쿼리에 SKIP LOCKED를 걸고

읽기 자체도 트랜잭션 내에서 실행하도록 바꾼다.

그리고 매 루프 반복마다 트랜잭션을 열고, 그 안에서 consume을 처리하도록 하면 된다.

그러면 이제 단일 이벤트에 대해서는 한번씩만 처리가 될 것이다.

다음은 전체 코드다.
use sqlx::{postgres::PgPoolOptions, Pool};
const CONNECTION_URL: &str = "postgres://admin:q1w2e3r4@localhost:5432/postgres";
pub async fn get_connection_pool(connection_url: &str) -> anyhow::Result<Pool<sqlx::Postgres>> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(connection_url)
.await?;
Ok(pool)
}
#[derive(Debug)]
pub struct Message {
pub id: String,
pub topic: String,
pub payload: String,
pub created_at: chrono::NaiveDateTime,
}
async fn begin_transaction(
pool: &Pool<sqlx::Postgres>,
) -> anyhow::Result<sqlx::Transaction<'static, sqlx::Postgres>> {
let transaction = pool.begin().await?;
Ok(transaction)
}
async fn receive_messages(
transaction: &mut sqlx::Transaction<'static, sqlx::Postgres>,
) -> anyhow::Result<Vec<Message>> {
let list = sqlx::query_as::<_, (String, String, String, chrono::NaiveDateTime)>(
r#"
SELECT id, topic, payload, created_at
FROM queue
FOR UPDATE SKIP LOCKED
LIMIT 1
"#,
)
.fetch_all(&mut **transaction)
.await?;
let messages = list
.into_iter()
.map(|(id, topic, payload, created_at)| Message {
id,
topic,
payload,
created_at,
})
.collect();
Ok(messages)
}
async fn delete_messages(
transaction: &mut sqlx::Transaction<'static, sqlx::Postgres>,
ids: Vec<String>,
) -> anyhow::Result<()> {
let ids = ids
.iter()
.map(|id| id.as_str())
.collect::<Vec<&str>>()
.join("', '");
sqlx::query(
r#"
DELETE FROM queue
WHERE id IN ($1)
"#,
)
.bind(ids)
.execute(&mut **transaction)
.await?;
Ok(())
}
async fn process_message(message: Message) -> anyhow::Result<()> {
println!("Processing message: {:?}", message);
use std::fs::OpenOptions;
use std::io::Write;
let mut file = OpenOptions::new()
.create(true) // Create the file if it doesn't exist
.append(true) // Append to the file if it exists
.open("output.txt")?;
// Write the content to the file
file.write_all(format!("{}-{}-{}\n", message.id, message.topic, message.payload).as_bytes())?;
Ok(())
}
#[tokio::main]
async fn main() {
let connection_pool = get_connection_pool(CONNECTION_URL).await.unwrap();
// 1. Consume 루프
loop {
// 2. 트랜잭션을 시작
let mut transaction = begin_transaction(&connection_pool).await.unwrap();
// 3. 메세지를 받아옴
let messages = receive_messages(&mut transaction).await.unwrap();
let mut message_ids = vec![];
for message in messages {
message_ids.push(message.id.clone());
// 4. 메세지를 하나씩 처리
process_message(message).await.unwrap();
}
// 5. 다 처리됐다면 지움
delete_messages(&mut transaction, message_ids)
.await
.unwrap();
// 6. 트랜잭션 해제
transaction.commit().await.unwrap();
}
}Long Polling
위의 Consume 루프는 항상 이벤트가 들어온다는 가정 하에서는 괜찮지만, 이벤트가 없는 상황에서는 지나친 비효율을 초래할 수 있다.
큐에 이벤트가 없어도 그냥 계속 찌르면서 폴링하기 때문이다. 이런 현상을 보통 spinning이나 busy waiting으로 부르곤 한다.
그냥 느슨한 방법으로 이걸 완화한다고 하면, 적당히 대기를 주는 식으로 처리를 할 수도 있을 것이다.

좀 미묘하지만, 좀 나아진것처럼 보이기도 한다.
더 좋은 방법은 없을까? 있다!
Postgresql에서 제공하는 구독 모델을 이용하면 그럴듯한 롱 폴링을 구현할 수 있다.
https://blog.naver.com/sssang97/223712066608
이벤트가 없으면 Listen으로 대기하고.
이벤트가 들어오면 Notify로 깨워주도록 하는 것이다.
이건 데이터베이스 라이브러리 수준에서 listen 후 폴링 기능을 제공하는지를 확인해봐야한다.
내가 여기서 사용한 sqlx은 지원하는데, 어정쩡한 라이브러리들은 지원하지 않는 경우도 많다.
다음은 큐에 이벤트가 삽입될 때 notify로 깨워주는 트리거를 구현한 간단한 예제다.
-- 프로시저 생성
CREATE OR REPLACE FUNCTION trigger_queue_function()
returns trigger
AS $$
DECLARE
BEGIN
notify queue;
return NULL;
END; $$
LANGUAGE 'plpgsql';
-- 트리거 생성
CREATE TRIGGER trigger_when_queue_insert
AFTER INSERT ON queue
FOR EACH ROW
EXECUTE FUNCTION trigger_queue_function();

그리고 저 채널을 대기하는 함수를 하나 구현했다.

메세지가 없을떄 sleep을 거는게 아니라, listen 기반으로 기다리도록 했다.
이러면 실제로 메세지가 쌓였을 때만 wait이 끝나고 다음 루프에서 새 이벤트를 가져올 것이다.
그러면 메세지가 없을 때는 조용히 기다리다가

들어오면 그때 바로 재시도하는 Long Polling이 완성된다.
다음은 전체 코드다.
use sqlx::{
postgres::{PgListener, PgPoolOptions},
Pool,
};
const CONNECTION_URL: &str = "postgres://admin:q1w2e3r4@localhost:5432/postgres";
pub async fn get_connection_pool(connection_url: &str) -> anyhow::Result<Pool<sqlx::Postgres>> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(connection_url)
.await?;
Ok(pool)
}
#[derive(Debug)]
pub struct Message {
pub id: String,
pub topic: String,
pub payload: String,
pub created_at: chrono::NaiveDateTime,
}
async fn begin_transaction(
pool: &Pool<sqlx::Postgres>,
) -> anyhow::Result<sqlx::Transaction<'static, sqlx::Postgres>> {
let transaction = pool.begin().await?;
Ok(transaction)
}
async fn receive_messages(
transaction: &mut sqlx::Transaction<'static, sqlx::Postgres>,
) -> anyhow::Result<Vec<Message>> {
let list = sqlx::query_as::<_, (String, String, String, chrono::NaiveDateTime)>(
r#"
SELECT id, topic, payload, created_at
FROM queue
WHERE topic = 'topic1'
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
"#,
)
.fetch_all(&mut **transaction)
.await?;
let messages = list
.into_iter()
.map(|(id, topic, payload, created_at)| Message {
id,
topic,
payload,
created_at,
})
.collect();
Ok(messages)
}
async fn delete_messages(
transaction: &mut sqlx::Transaction<'static, sqlx::Postgres>,
ids: Vec<String>,
) -> anyhow::Result<()> {
let ids = ids
.iter()
.map(|id| id.as_str())
.collect::<Vec<&str>>()
.join("', '");
sqlx::query(
r#"
DELETE FROM queue
WHERE id IN ($1)
"#,
)
.bind(ids)
.execute(&mut **transaction)
.await?;
Ok(())
}
async fn process_message(message: Message) -> anyhow::Result<()> {
println!("Processing message: {:?}", message);
Ok(())
}
async fn listen_and_wait(pool: &Pool<sqlx::Postgres>) -> anyhow::Result<()> {
let mut listener = PgListener::connect_with(pool).await?;
listener.listen("queue").await?;
let notifications = listener.recv().await?;
println!("Received notification: {:?}", notifications);
Ok(())
}
#[tokio::main]
async fn main() {
let connection_pool = get_connection_pool(CONNECTION_URL).await.unwrap();
// 1. Consume 루프
loop {
// 2. 트랜잭션을 시작
let mut transaction = begin_transaction(&connection_pool).await.unwrap();
// 3. 메세지를 받아옴
let messages = receive_messages(&mut transaction).await.unwrap();
if messages.is_empty() {
println!("No messages to process, waiting for new messages...");
transaction.commit().await.unwrap();
listen_and_wait(&connection_pool).await.unwrap();
continue;
}
let mut message_ids = vec![];
for message in messages {
message_ids.push(message.id.clone());
// 4. 메세지를 하나씩 처리
process_message(message).await.unwrap();
}
// 5. 다 처리됐다면 지움
delete_messages(&mut transaction, message_ids)
.await
.unwrap();
// 6. 트랜잭션 해제
transaction.commit().await.unwrap();
}
}기타 응용
CDC
테이블에서 값이 바뀌었을때 CDC 기반으로 동기화 프로세스를 구현하고 싶다면, 그냥 DB 트리거를 달아서 queue 테이블에 이벤트를 집어넣으면 된다.
Kafka로 bin log 커넥터 달고 뭐 자꾸 늘리는 것보다 훨씬 간단하다.
group 처리
consumer를 그룹으로 나누고 그룹별로 메세지를 분산처리하고 싶다면, queue 테이블에 group_id 같은거 넣고 적당히 정의해서 파티션을 나누도록 하면 된다.
메세지 보존 처리
이미 처리한 메세지에 대해서도 보존을 좀 하고 싶다면, delete할때 다른 보관용 테이블로 옮기거나 delete 플래그를 다는 식으로 할 수 있을 것이다.