Integração do Spring Boot 2.X com RabbitMQ: Modo Topic

O modo topic do RabbitMQ é um padrão de roteamento flexível que permite associar chaves de roteamento (routing keys) com padrões wildcard. A chave para entender este modo está na relação entre três elementos: exchange (troca), routing key (chave de roteamento) e queue name (nome da fila).

  • Produtor (sender): envia mensagens especificando exchange, routing key e o conteúdo da mensagem.

  • Consumidor (receiver): escuta filas específicas que foram configuradas com binding para uma exchange topic.

  • Configuração de binding: define como uma fila está vinculada a uma exchange usando um padrão de routing key.

  • Regras de corespondência: a routing key é dividida por pontos (.). Dois curingas são suportados:

    • * (asterisco) – corresponde a exatamente uma palavra (segmento entre pontos).
    • # (cerquilha) – corresponde a zero ou mais palavras.

    Exemplo: exchange chamada meu.topic.exchange, filas:
    minha.fila.alpha, minha.fila.beta, exemplo.fila.gama.

    • Para enviar a todas as filas que começam com minha: routing key minha.#.
    • Para enviar apenas a filas minha.fila.* (uma palavra após minha.fila): routing key minha.fila.*.
    • O # equivale a qualquer sequência; o * equivale a um único segmento.

Produtor (Sender)

package com.exemplo.remetente;

import com.exemplo.config.RabbitConst;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicoProdutor {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void enviar() {
        String mensagem1 = "Mensagem de email ####";
        System.out.println("Produtor envia 1: " + mensagem1);
        rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPICO, RabbitConst.FILA_EMAIL, mensagem1);

        String mensagem2 = "Mensagem de usuario #####";
        System.out.println("Produtor envia 2: " + mensagem2);
        rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPICO, RabbitConst.FILA_USUARIO, mensagem2);

        String mensagem3 = "Mensagem de erro ###";
        System.out.println("Produtor envia 3: " + mensagem3);
        rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPICO, "erro.chave", mensagem3);
    }
}

Configuração (Binding)

@Bean
public TopicExchange topicExchange() {
    return new TopicExchange(RabbitConst.EXCHANGE_TOPICO);
}

@Bean
public Queue filaEmail() {
    return new Queue(RabbitConst.FILA_EMAIL);
}

@Bean
public Queue filaUsuario() {
    return new Queue(RabbitConst.FILA_USUARIO);
}

@Bean
public Queue filaTodos() {
    return new Queue(RabbitConst.FILA_TODOS);
}

@Bean
public Binding bindingEmail() {
    return BindingBuilder
            .bind(filaEmail())
            .to(topicExchange())
            .with("exemplo.topico.email");
}

@Bean
public Binding bindingUsuario() {
    return BindingBuilder
            .bind(filaUsuario())
            .to(topicExchange())
            .with("exemplo.*.usuario");
}

@Bean
public Binding bindingTodos() {
    return BindingBuilder
            .bind(filaTodos())
            .to(topicExchange())
            .with("exemplo.#");
}

Consumidor (Receiver)

package com.exemplo.receptor;

import com.exemplo.config.RabbitConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class TopicoReceptor {

    @RabbitHandler
    @RabbitListener(queues = "exemplo.topico.email")
    public void processarEmail(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.info("ReceptorEmail recebeu: " + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = "exemplo.topico.usuario")
    public void processarUsuario(Message message, Channel channel) throws IOException {
        // Exemplo de rejeição (NACK)
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
        log.info("ReceptorUsuario recebeu (rejeitado): " + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = "exemplo.topico.todos")
    public void processarTodos(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.info("ReceptorTodos recebeu: " + new String(message.getBody()));
    }
}

Tags: spring-boot RabbitMQ topic-exchange routing-key Mensageria

Publicado em 6-17 03:32