Integração Spring Boot com RabbitMQ Utilizando Dead Letter Exchange

Neste cenário, configuramos dois filas de negócios no RabbitMQ. Quando o consumo de uma mensagem falha após 3 tentativas de reprocessamento, ela é automaticamente redirecionada para uma fila especial de mensagens mortas, onde pode ser tratada de forma centralizada — por exemplo, persistida no banco de dados ou gerando alertas operacionais.

Pré-requisitos

  • Sistema operacional Windows ou Linux
  • Java 11+ e Maven
  • Servidor RabbitMQ instalado e em execução
  • IDE de desenvolvimento (IntelliJ IDEA, Eclipse, VS Code)

Após instalar o RabbitMQ, inicie o serviço e acesse o painel de gerenciamento em http://localhost:15672/ com as credenciais padrão (guest/guest). Crie um virtual host chamado app_vhost e um usuário dedicado com permissões adequadas.

Dependência Maven

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.0</version>
</dependency>

Parâmetros de Conexão

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: app_vhost
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000

Declaração de Topologia

A classe abaixo define toda a topologia do broker: exchanges normais e de dead letter, filas de negócio com parâmetros de descarte, filas de dead letter e as ligações entre elas.

package br.com.demo.amqp.infra;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;

@Configuration
public class TopologySetup {

    // Exchange principal para mensagens de negócio
    public static final String BIZ_EXCHANGE = "business.events";
    // Exchange dedicada a mensagens que falharam
    public static final String DLX_EXCHANGE = "dead.events";

    // Filas de consumo normal
    public static final String Q_CUSTOMERS = "q.customers";
    public static final String Q_ORDERS = "q.orders";

    // Filas de dead letter
    public static final String DLQ_CUSTOMERS = "dlq.customers";
    public static final String DLQ_ORDERS = "dlq.orders";

    // Routing keys para exchange principal
    public static final String RK_CUSTOMERS = "evt.customer";
    public static final String RK_ORDERS = "evt.order";

    // Routing keys para dead letter exchange
    public static final String DLK_CUSTOMERS = "dl.customer";
    public static final String DLK_ORDERS = "dl.order";

    @Bean("bizExchange")
    public DirectExchange bizExchange() {
        return new DirectExchange(BIZ_EXCHANGE, true, false);
    }

    @Bean("dlxExchange")
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE, true, false);
    }

    @Bean("customersQueue")
    public Queue customersQueue() {
        return QueueBuilder.durable(Q_CUSTOMERS)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", DLK_CUSTOMERS)
                .build();
    }

    @Bean("ordersQueue")
    public Queue ordersQueue() {
        return QueueBuilder.durable(Q_ORDERS)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", DLK_ORDERS)
                .build();
    }

    @Bean("dlqCustomers")
    public Queue dlqCustomers() {
        return QueueBuilder.durable(DLQ_CUSTOMERS).build();
    }

    @Bean("dlqOrders")
    public Queue dlqOrders() {
        return QueueBuilder.durable(DLQ_ORDERS).build();
    }

    @Bean
    public Binding bindCustomers(@Qualifier("customersQueue") Queue q,
                                 @Qualifier("bizExchange") DirectExchange ex) {
        return BindingBuilder.bind(q).to(ex).with(RK_CUSTOMERS);
    }

    @Bean
    public Binding bindOrders(@Qualifier("ordersQueue") Queue q,
                              @Qualifier("bizExchange") DirectExchange ex) {
        return BindingBuilder.bind(q).to(ex).with(RK_ORDERS);
    }

    @Bean
    public Binding bindDlqCustomers(@Qualifier("dlqCustomers") Queue q,
                                    @Qualifier("dlxExchange") DirectExchange ex) {
        return BindingBuilder.bind(q).to(ex).with(DLK_CUSTOMERS);
    }

    @Bean
    public Binding bindDlqOrders(@Qualifier("dlqOrders") Queue q,
                                 @Qualifier("dlxExchange") DirectExchange ex) {
        return BindingBuilder.bind(q).to(ex).with(DLK_ORDERS);
    }
}

Componente de Publicação

O produtor encapsula o envio de mensagens e implementa callbacks de confirmação e retorno para garantir rastreabilidade.

package br.com.demo.amqp.producer;

import br.com.demo.amqp.infra.TopologySetup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Service
@Slf4j
public class MessagePublisher implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    private final RabbitTemplate mqTemplate;

    public MessagePublisher(RabbitTemplate mqTemplate) {
        this.mqTemplate = mqTemplate;
    }

    @PostConstruct
    private void setupCallbacks() {
        mqTemplate.setConfirmCallback(this);
        mqTemplate.setReturnsCallback(this);
        mqTemplate.setMandatory(true);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String reason) {
        if (ack) {
            log.info("Confirmação recebida - ID: {}", correlationData != null ? correlationData.getId() : "n/a");
        } else {
            log.warn("Falha na confirmação - Motivo: {}", reason);
        }
    }

    @Override
    public void returnedMessage(org.springframework.amqp.core.ReturnedMessage returned) {
        log.error("Mensagem devolvida pelo broker: {}", new String(returned.getMessage().getBody()));
    }

    public void publishToCustomers(String payload) {
        String msgId = UUID.randomUUID().toString();
        CorrelationData cd = new CorrelationData(msgId);
        mqTemplate.convertAndSend(
                TopologySetup.BIZ_EXCHANGE,
                TopologySetup.RK_CUSTOMERS,
                payload,
                cd
        );
    }

    public void publishToOrders(String payload) {
        String msgId = UUID.randomUUID().toString();
        CorrelationData cd = new CorrelationData(msgId);
        mqTemplate.convertAndSend(
                TopologySetup.BIZ_EXCHANGE,
                TopologySetup.RK_ORDERS,
                payload,
                cd
        );
    }
}

Consumidores de Negócio

Cada consumidor processa mensagens com reconhecimento manual. Em caso de falha, a mensagem é rejeitada ou devolvida à fila para nova tentativa, dependendo se já foi reentregue anteriormente.

package br.com.demo.amqp.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@Slf4j
public class BusinessConsumers {

    @RabbitListener(queues = "q.customers")
    public void handleCustomerEvent(Message msg, Channel ch) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        try {
            ch.basicQos(1);
            String content = new String(msg.getBody());
            log.info("Processando evento de cliente: {}", content);

            // Simulação de falha para testes:
            // throw new RuntimeException("Erro simulado");

            ch.basicAck(deliveryTag, false);
        } catch (Exception ex) {
            log.error("Erro ao processar mensagem de cliente: {}", ex.getMessage());
            boolean alreadyRetried = msg.getMessageProperties().getRedelivered();
            if (alreadyRetried) {
                log.warn("Tentativas esgotadas - encaminhando para DLQ");
                ch.basicReject(deliveryTag, false);
            } else {
                log.info("Recolocando na fila para nova tentativa");
                ch.basicNack(deliveryTag, false, true);
            }
        }
    }

    @RabbitListener(queues = "q.orders")
    public void handleOrderEvent(Message msg, Channel ch) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        try {
            ch.basicQos(1);
            String content = new String(msg.getBody());
            log.info("Processando evento de pedido: {}", content);
            ch.basicAck(deliveryTag, false);
        } catch (Exception ex) {
            log.error("Erro ao processar mensagem de pedido: {}", ex.getMessage());
            boolean alreadyRetried = msg.getMessageProperties().getRedelivered();
            if (alreadyRetried) {
                log.warn("Tentativas esgotadas - encaminhando para DLQ");
                ch.basicReject(deliveryTag, false);
            } else {
                log.info("Recolocando na fila para nova tentativa");
                ch.basicNack(deliveryTag, false, true);
            }
        }
    }
}

Consumidores de Dead Letter

Estes consumidores tratam as mensagens que não puderam ser processadas após todas as tentativas. Aqui é o ponto ideal para registrar logs detalhados, persistir em banco de dados ou disparar notificações.

package br.com.demo.amqp.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@Slf4j
public class DeadLetterConsumers {

    @RabbitListener(queues = "dlq.customers")
    public void processCustomerDLQ(Message msg, Channel ch) throws IOException {
        try {
            ch.basicQos(1);
            String payload = new String(msg.getBody());
            log.warn("[DLQ] Mensagem de cliente rejeitada: {}", payload);
            // Ações: salvar no banco, enviar alerta, etc.
            ch.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception ex) {
            log.error("[DLQ] Falha ao processar dead letter de cliente: {}", ex.getMessage());
        }
    }

    @RabbitListener(queues = "dlq.orders")
    public void processOrderDLQ(Message msg, Channel ch) throws IOException {
        try {
            ch.basicQos(1);
            String payload = new String(msg.getBody());
            log.warn("[DLQ] Mensagem de pedido rejeitada: {}", payload);
            ch.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception ex) {
            log.error("[DLQ] Falha ao processar dead letter de pedido: {}", ex.getMessage());
        }
    }
}

Endpoint de Teste

Um controlador simples para disparar mensagens via HTTP e validar o fluxo completo.

package br.com.demo.amqp.api;

import br.com.demo.amqp.producer.MessagePublisher;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    private final MessagePublisher publisher;

    public MessageController(MessagePublisher publisher) {
        this.publisher = publisher;
    }

    @PostMapping("/customers")
    public ResponseEntity<String> sendCustomer(@RequestBody String body) {
        publisher.publishToCustomers(body);
        return ResponseEntity.ok("Mensagem enviada para fila de clientes");
    }

    @PostMapping("/orders")
    public ResponseEntity<String> sendOrder(@RequestBody String body) {
        publisher.publishToOrders(body);
        return ResponseEntity.ok("Mensagem enviada para fila de pedidos");
    }
}

Fluxo de Execução

O mecanismo de dead letter funciona da seguinte forma: quando uma mensagem é rejeitada com basicReject ou basicNack (sem requeue), o RabbitMQ verifica se a filla possui configuração de dead letter exchange. Se existir, a mensagem é automaticamente roteada para a exchange de dead letter usando a routing key especificada no argumento x-dead-letter-routing-key.

No cenário apresentado, após 3 tentativas fracassadas (contabilizadas pelo mecenismo de retry do Spring e pelo flag redelivered), a mensagem é rejeitada definitivamente e direcionada à fila correspondente de dead letter, onde um consumidor dedicado realiza o tratamento adequado.

Tags: Spring Boot RabbitMQ Dead Letter Exchange AMQP Mensagem Assíncrona

Publicado em 6-9 20:09 por Thomas