[RabbitMQ] 기본 구조 및 사용법 (with Go)

[원본 링크]

RabbitMQ를 제대로 사용하기 위해서는 그 구조를 개략적으로나마 알아두는 것이 좋다.
일반적인 큐들과 마찬자기로 Pub/Sub 형태로 데이터를 주고받고, 주고받는 데이터는 Queue 내부에 디스크로 저장한다.

조금 특이한 점은 큐에 바로 꽂는게 아니라 Exchange라는 중간 레이어를 통해서 메시지가 큐에 전송된다는 점이다. Exchange를 통해서 추가적인 라우팅을 수행하도록 할 수 있다.
하나의 메시지가 여러개의 큐에 fanout 전송이 되도록 할 수도 있고, 그냥 하나의 큐에 가게 할 수도 있다.

그리고 consumer가 여러개라도 하나의 메시지는 하나의 consumer에게만 전송되는 것이 보장된다.




기본 구성 (with Go)

Go에서 공식 모듈을 사용해서 큐를 생성하고 메세지를 전송해보도록 하겠다.

우선 커넥션을 맺어야 한다.
다음과 같이 URL을 정의하고, 패스워드까지 넣어서 커넥션을 생성해준다.

그다음에는 채널을 생성해준다.
실제로 브로커 수준에서 제공하는 채널은 아니고, I/O를 효율적으로 받아오기 위한 중간 레이어다.

그 다음에는 실제로 사용할 큐를 정의한다.
이름은 대충 hello로 했다.

여러가지 옵션이 있는데, 일단은 대충 무시하고 넘어가도 된다. 기본값으로 하겠다.




Consumer 구현하기

큐에서 메세지를 읽어오는 간단한 프로그램을 먼저 작성해봤다.

채널로 뱅뱅 돌면서 값을 로그로 찍도록 했다.

아래는 전체 코드다.

package main

import (
	"fmt"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

const USER = "tester"
const PASSWORD = "q1w2e3r4"

func main() {
	url := fmt.Sprintf("amqp://%s:%s@localhost:5672/", USER, PASSWORD)

	// RabbitMQ에 연결한다.
	var connection *amqp.Connection

	if conn, err := amqp.Dial(url); err != nil {
		log.Panicf("Failed to connect to RabbitMQ: %s", err)
	} else {
		connection = conn
	}

	defer connection.Close()

	// 채널을 열어서 메시지를 받아올 준비를 한다.
	var channel *amqp.Channel

	if ch, err := connection.Channel(); err != nil {
		log.Panicf("Failed to open a channel: %s", err)
	} else {
		channel = ch
	}
	defer channel.Close()

	// 큐를 선언한다.
	var queue amqp.Queue

	if q, err := channel.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	); err != nil {
		log.Panicf("Failed to declare a queue: %s", err)
	} else {
		queue = q
	}

	// 큐로부터 메시지를 받아온다.
	if messages, err := channel.Consume(
		queue.Name, // queue
		"",         // consumer
		true,       // auto-ack
		false,      // exclusive
		false,      // no-local
		false,      // no-wait
		nil,        // args
	); err != nil {
		log.Panicf("Failed to publish a message: %s", err)
	} else {
		// 채널을 range로 돌면서 메시지를 받아온다.
		for message := range messages {
			log.Printf("Received message with body: %s", message.Body)
		}
	}
}

이대로 실행해도 당장 뭐가 로그에 찍히지는 않을 것이다.
메세지를 보내지 않았으니까!

저대로 조회하면 아마 어드민 페이지에 커넥션이 추가될 것이고

큐도 생성된 것을 볼 수 있을 것이다.





producer 만들기

이번에는 메세지를 전송해주도록 하겠다.
그냥 텍스트를 적절히 직렬화해서 보내도록 하면 된다.

package main

import (
	"fmt"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

const USER = "tester"
const PASSWORD = "q1w2e3r4"

func main() {
	url := fmt.Sprintf("amqp://%s:%s@localhost:5672/", USER, PASSWORD)

	// RabbitMQ에 연결한다.
	var connection *amqp.Connection

	if conn, err := amqp.Dial(url); err != nil {
		log.Panicf("Failed to connect to RabbitMQ: %s", err)
	} else {
		connection = conn
	}

	defer connection.Close()

	// 채널을 열어서 메시지를 보낼 준비를 한다.
	var channel *amqp.Channel

	if ch, err := connection.Channel(); err != nil {
		log.Panicf("Failed to open a channel: %s", err)
	} else {
		channel = ch
	}
	defer channel.Close()

	// 큐를 선언한다.
	var queue amqp.Queue

	if q, err := channel.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	); err != nil {
		log.Panicf("Failed to declare a queue: %s", err)
	} else {
		queue = q
	}

	// 메시지를 보낸다.
	body := "Hello World!"
	if err := channel.Publish(
		"",         // exchange
		queue.Name, // routing key
		false,      // mandatory
		false,      // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		}); err != nil {
		log.Panicf("Failed to publish a message: %s", err)
	}

	log.Printf(" [x] Sent %s", body)
}

저렇게 실행해보면

컨슈머에도 메세지가 날라올 것이다.