Skip to main content

RabbitMQ 기초

·537 words·3 mins
Table of Contents

이 글은 RabbitMQ의 기본적인 개념과 spring에서 연동 방법을 기초 수준에서 설명한다.

RabbitMQ란?
#

플랫폼 중립적인 메시징 및 스트리밍 브로커로,AMQP,MQTT 등의 여러 개방형 표준 프로토콜을 지원해 다양한 언어와 플랫폼 간 메시지 송수신을 처리한다.

오픈소스이며 MPL2.0 라이센스를 따른다.

설치
#

다음 docker-compose.yml 파일로 연습용 RabbitMQ 서버를 간단히 띄울 수 있다. 클러스터 구성 등의 정보도 구글링하면 쉽게찾을 수 있음!!

version: '3.7'
services:
  rabbitmq:
    image: rabbitmq:4.0-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin
#관리 UI: http://localhost:15672, 기본 계정: admin/admin

용어정리
#

  • Producer: 메시지를 생성하고 Exchange로 발행하는 애플리케이션
  • Exchange: Producer로부터 받은 메시지를 Binding 규칙과 Routing Key에 따라 적절한 Queue로 라우팅
  • Queue: Exchange가 라우팅한 메시지를 저장하고 대기시키는 버퍼 역할
  • Routing Key: 메시지를 발행할 때 Producer가 Exchange에 전달하는 키로, Exchange의 라우팅 규칙에 사용
  • Binding: Exchange와 큐 사이의 규칙.(필요하면 binding key를 함께 설정하여 어떤 메시지를 어떤 큐로 보낼지 결정)
  • Consumer: 큐에서 메시지를 가져와 처리하는 애플리케이션

메시지 전송 프로세스
#

Producer → Broker(Exchange → Queue) → Consumer

  1. 메시지 발행
    Producer가 메시지를 발행한다.
  2. 퍼블리셔 컨펌
    브로커가 메시지를 Queue에 저장한 뒤 Producer에 Publisher Ack/Nack을 응답한다. (비동기로 확인을 생략할 수 있다.)
    • Ack: 정상 저장
    • Nack: 저장 실패 → 재전송 로직 필요
  3. 라우팅
    Exchange가 Binding 규칙과 Routing Key를 기준으로 메시지를 적절한 Queue로 라우팅한다.

  4. Queue가 메시지를 내부에 저장하고, Consumer의 요청을 대기한다.
  5. 메시지 전달
    브로커가 Queue에서 메시지를 꺼내 Consumer로 전송한다.
  6. 컨슈머 컨펌
    Consumer가 처리 결과를 Consumer Ack/Nack으로 브로커에 알린다.
    • Ack: 메시지가 큐에서 완전 제거된다.
    • Nack: 설정에 따라 재큐잉(requeue) 또는 DLQ(Dead-Letter Queue)로 이동한다.

스프링 코드
#

org.springframework.boot:spring-boot-starter-amqp 의존성이 필요하다.

빈 등록
#

참고: 예제에 등장하는 Queue, DirectExchange, Binding 등 타입은 모두 org.springframework.amqp.core 패키지이다.

/**
 * Queue Bean 등록 
 *
 * @return Queue
 *   - name: 큐 이름 
 *   - durable: false일 경우 브로커 재시작 시 삭제되는 휘발성 큐
 */
@Bean
fun queue(): Queue {
    return Queue("QUEUE_NAME", false)
}

/**
 * DirectExchange Bean 등록 
 *
 * @return DirectExchange
 *   - name: 익스체인지 이름 ("EXCHANGE_NAME")
 *   - durable: true (브로커 재시작 시에도 유지)
 */
@Bean
fun directExchange(): DirectExchange {
    return DirectExchange("EXCHANGE_NAME")
}

/**
 * Queue와 Exchange를 바인딩
 *
 * @param queue 생성된 Queue 빈
 * @param directExchange 생성된 DirectExchange 빈
 * @return Binding
 *   - routingKey: "ROUTING_KEY"와 일치하는 메시지를 해당 큐로 전달하도록 설정
 */
@Bean
fun binding(queue: Queue, directExchange: DirectExchange): Binding {
    return BindingBuilder
        .bind(queue)
        .to(directExchange)
        .with("ROUTING_KEY")
}


/**
 * JSON 메시지 컨버터 Bean 등록
 *
 * - Producer가 전송하는 객체를 JSON 문자열로 직렬화하고,
 *   Consumer가 수신한 JSON 문자열을 객체로 역직렬화하는 데 사용
 *
 * @return MessageConverter 인스턴스
 */
@Bean
fun messageConverter(): MessageConverter {
    return Jackson2JsonMessageConverter()
}

메시지 발행, 소비
#

// 지정된 토픽 익스체인지로 메시지를 발행
rabbitTemplate.convertAndSend(
   RabbitMQConfig.TOPIC_EXCHANGE, // 전송 대상 익스체인지 이름
   routingKey,                    // 토픽 패턴 기반의 라우팅 키
   message                        // 전송할 페이로드 object (MessageConverter를 통해 직렬화)
)
/**
* 큐에 바인딩된 메시지를 수신하는 Listener 메서드
*
* @param message SomeObject 타입의 메시지 페이로드
*   - 메시지 컨버터에 의해 객체로 역직렬화된 값
*/
@RabbitListener(queues = ["QUEUE_NAME"])
fun consumeWarn(message: SomeObject) {
   logger.info { "message = $message" }
}

추가:Exchange 종류
#

  • DirectExchange
    • 라우팅 키가 바인딩 키와 정확히 일치할 때만 전달
  • TopicExchange
    • 와일드카드(*, #) 패턴 매칭을 통해 복수 큐로 전달
  • FanoutExchange
    • Binding Key를 사용하지 않고, 바인딩된 모든 큐로 브로드캐스팅

브로커는 이름이 빈 문자열인 기본 익스체인지를(Direct) 미리 선언해 두고, 큐를 선언할 때 큐 이름과 같은 라우팅 키로 자동으로 바인딩한다. 따라서 익스체인지를 별도로 등록하지 않아도 기본 익스체인지("")와 큐 이름만으로 메시지를 보낼 수 있다.