RabbitMQ - 메시지 브로커 원리와 Spring 연동
RabbitMQ의 동작 원리, Exchange/Queue/Binding 구조, 그리고 Spring Boot에서 비동기 메시지 처리를 구현하는 방법을 정리한다.
왜 메시지 브로커가 필요한가?
서비스 A가 서비스 B를 직접 호출하면 어떤 문제가 생기는지부터 보자.
직접 호출 (동기)
주문 서비스 → 이메일 서비스.sendConfirmation()
→ 재고 서비스.decreaseStock()
→ 포인트 서비스.addPoints()
문제가 여럿 있다.
의존성: 이메일 서비스가 죽으면 주문도 실패한다. 주문 완료와 이메일 발송은 관계가 없는데 묶여버린다.
속도: 이메일, 재고, 포인트 처리가 끝날 때까지 사용자가 기다려야 한다.
확장: 새로운 후처리(쿠폰 발급 등)가 생기면 주문 서비스 코드를 수정해야 한다.
메시지 브로커를 쓰면:
비동기 메시지
주문 서비스 → [RabbitMQ] ← 이메일 서비스 (구독)
← 재고 서비스 (구독)
← 포인트 서비스 (구독)
주문 서비스는 메시지만 던지고 끝난다. 각 서비스는 자기 속도로 처리한다. 이메일 서비스가 죽어도 주문은 성공하고, 살아나면 밀린 메시지를 처리한다.
RabbitMQ 구조
핵심 구성요소
Producer
│ 메시지 발행
▼
Exchange ←── Binding (라우팅 규칙)
│ 라우팅 키 기반으로 Queue 결정
▼
Queue (메시지 저장)
│
▼
Consumer
│ 메시지 소비
- Producer: 메시지를 보내는 쪽. Exchange로 발행한다.
- Exchange: 메시지를 받아 어느 Queue로 보낼지 결정한다. 실제 라우팅 담당.
- Queue: 메시지가 쌓이는 버퍼. Consumer가 가져갈 때까지 보관한다.
- Binding: Exchange와 Queue를 연결하는 규칙.
- Consumer: Queue에서 메시지를 꺼내 처리하는 쪽.
Producer는 Queue에 직접 보내지 않는다. 반드시 Exchange를 통한다.
Exchange 종류
Exchange 타입에 따라 라우팅 방식이 달라진다.
Direct Exchange: 라우팅 키가 정확히 일치하는 Queue로만 전달한다.
Producer → Exchange (routing key: "order.created")
│
└─→ Queue "order-queue" (binding key: "order.created") ✅
└─→ Queue "email-queue" (binding key: "email.send") ❌
Fanout Exchange: 라우팅 키 무시. 연결된 모든 Queue에 브로드캐스트한다.
Producer → Exchange
│
├─→ Queue "email-queue" ✅
├─→ Queue "sms-queue" ✅
└─→ Queue "push-queue" ✅
알림처럼 여러 채널에 동시에 보낼 때 쓴다.
Topic Exchange: 라우팅 키를 패턴으로 매칭한다. *는 단어 하나, #는 0개 이상의 단어.
"order.created" → "order.*" ✅, "order.#" ✅, "*.created" ✅
"order.item.added" → "order.*" ❌, "order.#" ✅, "*.created" ❌
MSA에서 가장 많이 쓰는 타입이다.
메시지 보장: Acknowledgment
Consumer가 메시지를 받아서 처리하다 갑자기 죽으면 메시지는 어떻게 되는가?
Auto Ack (기본값)
RabbitMQ → Consumer에게 전달 → 즉시 Queue에서 삭제
Consumer가 받는 순간 삭제된다. 처리 도중 죽으면 메시지 유실이다.
Manual Ack
RabbitMQ → Consumer에게 전달 (Queue에서는 아직 안 지움)
│
├─ 처리 성공 → basicAck() → Queue에서 삭제
└─ 처리 실패 → basicNack() → Queue에 다시 넣기 (재시도)
처리가 완전히 끝난 후에 "잘 받았어" 신호를 보낸다. 신호가 없으면 RabbitMQ가 다른 Consumer에게 재전달한다.
@RabbitListener(queues = ["order.queue"], ackMode = "MANUAL")
fun handleOrder(
order: Order,
channel: Channel,
@Header(AmqpHeaders.DELIVERY_TAG) tag: Long
) {
try {
processOrder(order)
channel.basicAck(tag, false) // 성공: 삭제
} catch (e: Exception) {
channel.basicNack(tag, false, true) // 실패: 재큐잉
}
}
Dead Letter Queue (DLQ)
재시도를 반복해도 계속 실패하는 메시지를 무한 루프에 빠뜨리면 안 된다. 일정 횟수 실패하면 DLQ로 보낸다.
Normal Queue
│ 처리 실패 (3회 이상)
▼
Dead Letter Exchange
│
▼
Dead Letter Queue ← 실패한 메시지 보관
│
└─ 나중에 수동 확인 / 알림 발송 / 재처리
@Bean
fun orderQueue(): Queue =
QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "order.dlx")
.withArgument("x-dead-letter-routing-key", "order.dead")
.withArgument("x-message-ttl", 300000) // 5분 후 만료 시 DLQ로
.build()
@Bean
fun deadLetterQueue(): Queue = QueueBuilder.durable("order.dead.queue").build()
Spring Boot 연동
설정 (build.gradle.kts)
dependencies {
implementation("org.springframework.boot:spring-boot-starter-amqp")
}
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
Exchange / Queue / Binding 선언
@Configuration
class RabbitConfig {
// Exchange
@Bean
fun orderExchange(): TopicExchange =
ExchangeBuilder.topicExchange("order.exchange").durable(true).build()
// Queue
@Bean
fun orderCreatedQueue(): Queue =
QueueBuilder.durable("order.created.queue").build()
// Binding: Exchange → Queue (라우팅 키 패턴)
@Bean
fun orderCreatedBinding(): Binding =
BindingBuilder
.bind(orderCreatedQueue())
.to(orderExchange())
.with("order.created")
// JSON 직렬화 설정
@Bean
fun messageConverter(): Jackson2JsonMessageConverter =
Jackson2JsonMessageConverter()
@Bean
fun rabbitTemplate(connectionFactory: ConnectionFactory): RabbitTemplate =
RabbitTemplate(connectionFactory).apply {
messageConverter = Jackson2JsonMessageConverter()
}
}
Producer (메시지 발행)
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val rabbitTemplate: RabbitTemplate
) {
@Transactional
fun createOrder(request: CreateOrderRequest): Order {
val order = orderRepository.save(Order(request))
// 메시지 발행 - 기다리지 않고 바로 반환
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
OrderCreatedEvent(order.id, order.userId, order.totalAmount)
)
return order
}
}
data class OrderCreatedEvent(
val orderId: Long,
val userId: Long,
val totalAmount: BigDecimal
)
Consumer (메시지 소비)
@Component
class OrderEventConsumer {
// 이메일 서비스
@RabbitListener(queues = ["order.created.queue"])
fun sendConfirmationEmail(event: OrderCreatedEvent) {
emailService.sendOrderConfirmation(event.userId, event.orderId)
}
}
@Component
class StockEventConsumer {
// 재고 서비스
@RabbitListener(queues = ["order.created.queue"])
fun decreaseStock(event: OrderCreatedEvent) {
stockService.decrease(event.orderId)
}
}
같은 Queue를 여러 Consumer가 구독하면 경쟁 소비다. 메시지 하나를 한 Consumer만 가져간다. 부하 분산이 필요하면 이 방식을 쓴다.
Fanout으로 이메일·재고·포인트 서비스 각각 다른 Queue를 만들면 메시지 하나를 전부 받는다.
트랜잭션과 메시지 발행 순서
한 가지 주의할 점이 있다.
@Transactional
fun createOrder(request: CreateOrderRequest): Order {
val order = orderRepository.save(order) // DB 저장
rabbitTemplate.convertAndSend("order.exchange", "order.created", event) // 메시지 발행
throw RuntimeException("오류!") // 트랜잭션 롤백
}
DB는 롤백되지만 RabbitMQ 메시지는 이미 나갔다. Consumer가 존재하지 않는 주문을 처리하려다 오류가 난다.
해결 방법으로 Transactional Outbox 패턴을 쓴다.
1. DB 트랜잭션 안에서 주문 저장 + outbox 테이블에 메시지 기록 (같은 트랜잭션)
2. 별도 스케줄러가 outbox 테이블을 읽어서 RabbitMQ로 발행
3. 발행 성공 시 outbox 레코드 삭제
DB 저장과 메시지 발행의 원자성을 보장한다.
정리
| 개념 | 설명 |
|---|---|
| Exchange | 메시지를 어느 Queue로 보낼지 결정하는 라우터 |
| Queue | 메시지가 쌓이는 버퍼, Consumer가 가져갈 때까지 보관 |
| Binding | Exchange와 Queue를 연결하는 라우팅 규칙 |
| Manual Ack | 처리 완료 후 신호 전송 → 메시지 유실 방지 |
| DLQ | 반복 실패 메시지를 별도 Queue로 격리 |
시리즈: Spring Boot 심화
- RabbitMQ - 메시지 브로커 ← 현재 글
- Spring Cloud Gateway - API Gateway