Implementando o Topic Exchange com RabbitMQ e Spring Boot

Visão geral do Topic Exchange

O Topic Exchange é um dos tipos de exchange disponíveis no RabbitMQ e permite rotear mensaegns com base em padrões flexíveis de routing key. Diferentemente do Direct Exchange, que exige correspondência exata, aqui é possível utilizar curingas para capturar múltiplas variações de chave de roteamento.

Curingas de roteamento

As routing keys são formadas por palavras separadas por pontos, e o Topic Exchange reconhece dois curingas:

  • *: corresponde a exatamente uma palavra.
  • #: corresponde a zero, uma ou várias palavras.

Por exemplo, o padrão pedidos.*.confirmado reconhece pedidos.eletronico.confirmado, mas não reconhece pedidos.eletronico.pago.confirmado. Já pedidos.# reconhece ambas as variações.

Configuração do exchange, fila e binding

A configuração no Spring Boot é feita por uma classe anotada com @Configuration. No exemplo abaixo, criamos um Topic Exchange durável, uma fila durável e um binding com o padrão estoque.#.

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfiguracaoRabbit {

    @Bean
    public TopicExchange exchangeDeEstoque() {
        return ExchangeBuilder
                .topicExchange("exchange.estoque")
                .durable(true)
                .build();
    }

    @Bean
    public Queue filaDeAtualizacoes() {
        return QueueBuilder
                .durable("fila.atualizacoes")
                .build();
    }

    @Bean
    public Binding vinculoEstoque(TopicExchange exchangeDeEstoque,
                                  Queue filaDeAtualizacoes) {
        return BindingBuilder
                .bind(filaDeAtualizacoes)
                .to(exchangeDeEstoque)
                .with("estoque.#");
    }
}

Consumidor da fila

O consumidor é implementado com a anotação @RabbitListener, apontando para a fila configurada.

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumidorDeEstoque {

    @RabbitListener(queues = "fila.atualizacoes")
    public void processar(String payload) {
        System.out.println("Atualização recebida: " + payload);
    }
}

Produtor de mensagens

O produtor injeta o RabbitTemplate e envia mensagens indicando o exchange e a routing key. Mensagens cuja chave não atenda ao padrão estoque.# não serão entregues à fila.

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestesRabbitMq {

    @Autowired
    private RabbitTemplate template;

    @Test
    public void enviarAtualizacoes() {
        template.convertAndSend("exchange.estoque", "estoque.adicionado", "Novo item inserido");
        template.convertAndSend("exchange.estoque", "estoque.removido.lote", "Lote removido");
        template.convertAndSend("exchange.estoque", "pagamento.aprovado", "Pagamento confirmado");
    }
}

Resultado esperado

Ao executar o teste do produtor, apenas as duas primeiras mensagens chegarão à fila fila.atualizacoes, pois suas routing keys iniciam com estoque. e satisfazem o padrão estoque.#. A mensagem com chave pagamento.aprovado não é roteada para essa fila.

Exchange interno de rastreamento

O RabbitMQ também disponibiliza o exchange interno amq.rabbitmq.trace, utilizado para registrar e rastrear eventos do broker. Por se tratar de um recurso interno de monitoramento, não faz parte do fluxo comum de aplicações produtoras e consumidoras.

Tags: RabbitMQ Topic Exchange Spring AMQP RabbitTemplate RabbitListener

Publicado em 6-17 22:13