Neste cenário, configuramos dois filas de negócios no RabbitMQ. Quando o consumo de uma mensagem falha após 3 tentativas de reprocessamento, ela é automaticamente redirecionada para uma fila especial de mensagens mortas, onde pode ser tratada de forma centralizada — por exemplo, persistida no banco de dados ou gerando alertas operacionais.
Pré-requisitos
- Sistema operacional Windows ou Linux
- Java 11+ e Maven
- Servidor RabbitMQ instalado e em execução
- IDE de desenvolvimento (IntelliJ IDEA, Eclipse, VS Code)
Após instalar o RabbitMQ, inicie o serviço e acesse o painel de gerenciamento em http://localhost:15672/ com as credenciais padrão (guest/guest). Crie um virtual host chamado app_vhost e um usuário dedicado com permissões adequadas.
Dependência Maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>
Parâmetros de Conexão
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: app_vhost
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
Declaração de Topologia
A classe abaixo define toda a topologia do broker: exchanges normais e de dead letter, filas de negócio com parâmetros de descarte, filas de dead letter e as ligações entre elas.
package br.com.demo.amqp.infra;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
@Configuration
public class TopologySetup {
// Exchange principal para mensagens de negócio
public static final String BIZ_EXCHANGE = "business.events";
// Exchange dedicada a mensagens que falharam
public static final String DLX_EXCHANGE = "dead.events";
// Filas de consumo normal
public static final String Q_CUSTOMERS = "q.customers";
public static final String Q_ORDERS = "q.orders";
// Filas de dead letter
public static final String DLQ_CUSTOMERS = "dlq.customers";
public static final String DLQ_ORDERS = "dlq.orders";
// Routing keys para exchange principal
public static final String RK_CUSTOMERS = "evt.customer";
public static final String RK_ORDERS = "evt.order";
// Routing keys para dead letter exchange
public static final String DLK_CUSTOMERS = "dl.customer";
public static final String DLK_ORDERS = "dl.order";
@Bean("bizExchange")
public DirectExchange bizExchange() {
return new DirectExchange(BIZ_EXCHANGE, true, false);
}
@Bean("dlxExchange")
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE, true, false);
}
@Bean("customersQueue")
public Queue customersQueue() {
return QueueBuilder.durable(Q_CUSTOMERS)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", DLK_CUSTOMERS)
.build();
}
@Bean("ordersQueue")
public Queue ordersQueue() {
return QueueBuilder.durable(Q_ORDERS)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", DLK_ORDERS)
.build();
}
@Bean("dlqCustomers")
public Queue dlqCustomers() {
return QueueBuilder.durable(DLQ_CUSTOMERS).build();
}
@Bean("dlqOrders")
public Queue dlqOrders() {
return QueueBuilder.durable(DLQ_ORDERS).build();
}
@Bean
public Binding bindCustomers(@Qualifier("customersQueue") Queue q,
@Qualifier("bizExchange") DirectExchange ex) {
return BindingBuilder.bind(q).to(ex).with(RK_CUSTOMERS);
}
@Bean
public Binding bindOrders(@Qualifier("ordersQueue") Queue q,
@Qualifier("bizExchange") DirectExchange ex) {
return BindingBuilder.bind(q).to(ex).with(RK_ORDERS);
}
@Bean
public Binding bindDlqCustomers(@Qualifier("dlqCustomers") Queue q,
@Qualifier("dlxExchange") DirectExchange ex) {
return BindingBuilder.bind(q).to(ex).with(DLK_CUSTOMERS);
}
@Bean
public Binding bindDlqOrders(@Qualifier("dlqOrders") Queue q,
@Qualifier("dlxExchange") DirectExchange ex) {
return BindingBuilder.bind(q).to(ex).with(DLK_ORDERS);
}
}
Componente de Publicação
O produtor encapsula o envio de mensagens e implementa callbacks de confirmação e retorno para garantir rastreabilidade.
package br.com.demo.amqp.producer;
import br.com.demo.amqp.infra.TopologySetup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.UUID;
@Service
@Slf4j
public class MessagePublisher implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
private final RabbitTemplate mqTemplate;
public MessagePublisher(RabbitTemplate mqTemplate) {
this.mqTemplate = mqTemplate;
}
@PostConstruct
private void setupCallbacks() {
mqTemplate.setConfirmCallback(this);
mqTemplate.setReturnsCallback(this);
mqTemplate.setMandatory(true);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String reason) {
if (ack) {
log.info("Confirmação recebida - ID: {}", correlationData != null ? correlationData.getId() : "n/a");
} else {
log.warn("Falha na confirmação - Motivo: {}", reason);
}
}
@Override
public void returnedMessage(org.springframework.amqp.core.ReturnedMessage returned) {
log.error("Mensagem devolvida pelo broker: {}", new String(returned.getMessage().getBody()));
}
public void publishToCustomers(String payload) {
String msgId = UUID.randomUUID().toString();
CorrelationData cd = new CorrelationData(msgId);
mqTemplate.convertAndSend(
TopologySetup.BIZ_EXCHANGE,
TopologySetup.RK_CUSTOMERS,
payload,
cd
);
}
public void publishToOrders(String payload) {
String msgId = UUID.randomUUID().toString();
CorrelationData cd = new CorrelationData(msgId);
mqTemplate.convertAndSend(
TopologySetup.BIZ_EXCHANGE,
TopologySetup.RK_ORDERS,
payload,
cd
);
}
}
Consumidores de Negócio
Cada consumidor processa mensagens com reconhecimento manual. Em caso de falha, a mensagem é rejeitada ou devolvida à fila para nova tentativa, dependendo se já foi reentregue anteriormente.
package br.com.demo.amqp.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class BusinessConsumers {
@RabbitListener(queues = "q.customers")
public void handleCustomerEvent(Message msg, Channel ch) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
try {
ch.basicQos(1);
String content = new String(msg.getBody());
log.info("Processando evento de cliente: {}", content);
// Simulação de falha para testes:
// throw new RuntimeException("Erro simulado");
ch.basicAck(deliveryTag, false);
} catch (Exception ex) {
log.error("Erro ao processar mensagem de cliente: {}", ex.getMessage());
boolean alreadyRetried = msg.getMessageProperties().getRedelivered();
if (alreadyRetried) {
log.warn("Tentativas esgotadas - encaminhando para DLQ");
ch.basicReject(deliveryTag, false);
} else {
log.info("Recolocando na fila para nova tentativa");
ch.basicNack(deliveryTag, false, true);
}
}
}
@RabbitListener(queues = "q.orders")
public void handleOrderEvent(Message msg, Channel ch) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
try {
ch.basicQos(1);
String content = new String(msg.getBody());
log.info("Processando evento de pedido: {}", content);
ch.basicAck(deliveryTag, false);
} catch (Exception ex) {
log.error("Erro ao processar mensagem de pedido: {}", ex.getMessage());
boolean alreadyRetried = msg.getMessageProperties().getRedelivered();
if (alreadyRetried) {
log.warn("Tentativas esgotadas - encaminhando para DLQ");
ch.basicReject(deliveryTag, false);
} else {
log.info("Recolocando na fila para nova tentativa");
ch.basicNack(deliveryTag, false, true);
}
}
}
}
Consumidores de Dead Letter
Estes consumidores tratam as mensagens que não puderam ser processadas após todas as tentativas. Aqui é o ponto ideal para registrar logs detalhados, persistir em banco de dados ou disparar notificações.
package br.com.demo.amqp.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class DeadLetterConsumers {
@RabbitListener(queues = "dlq.customers")
public void processCustomerDLQ(Message msg, Channel ch) throws IOException {
try {
ch.basicQos(1);
String payload = new String(msg.getBody());
log.warn("[DLQ] Mensagem de cliente rejeitada: {}", payload);
// Ações: salvar no banco, enviar alerta, etc.
ch.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
log.error("[DLQ] Falha ao processar dead letter de cliente: {}", ex.getMessage());
}
}
@RabbitListener(queues = "dlq.orders")
public void processOrderDLQ(Message msg, Channel ch) throws IOException {
try {
ch.basicQos(1);
String payload = new String(msg.getBody());
log.warn("[DLQ] Mensagem de pedido rejeitada: {}", payload);
ch.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception ex) {
log.error("[DLQ] Falha ao processar dead letter de pedido: {}", ex.getMessage());
}
}
}
Endpoint de Teste
Um controlador simples para disparar mensagens via HTTP e validar o fluxo completo.
package br.com.demo.amqp.api;
import br.com.demo.amqp.producer.MessagePublisher;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/messages")
public class MessageController {
private final MessagePublisher publisher;
public MessageController(MessagePublisher publisher) {
this.publisher = publisher;
}
@PostMapping("/customers")
public ResponseEntity<String> sendCustomer(@RequestBody String body) {
publisher.publishToCustomers(body);
return ResponseEntity.ok("Mensagem enviada para fila de clientes");
}
@PostMapping("/orders")
public ResponseEntity<String> sendOrder(@RequestBody String body) {
publisher.publishToOrders(body);
return ResponseEntity.ok("Mensagem enviada para fila de pedidos");
}
}
Fluxo de Execução
O mecanismo de dead letter funciona da seguinte forma: quando uma mensagem é rejeitada com basicReject ou basicNack (sem requeue), o RabbitMQ verifica se a filla possui configuração de dead letter exchange. Se existir, a mensagem é automaticamente roteada para a exchange de dead letter usando a routing key especificada no argumento x-dead-letter-routing-key.
No cenário apresentado, após 3 tentativas fracassadas (contabilizadas pelo mecenismo de retry do Spring e pelo flag redelivered), a mensagem é rejeitada definitivamente e direcionada à fila correspondente de dead letter, onde um consumidor dedicado realiza o tratamento adequado.