O modo topic do RabbitMQ é um padrão de roteamento flexível que permite associar chaves de roteamento (routing keys) com padrões wildcard. A chave para entender este modo está na relação entre três elementos: exchange (troca), routing key (chave de roteamento) e queue name (nome da fila).
-
Produtor (sender): envia mensagens especificando exchange, routing key e o conteúdo da mensagem.
-
Consumidor (receiver): escuta filas específicas que foram configuradas com binding para uma exchange topic.
-
Configuração de binding: define como uma fila está vinculada a uma exchange usando um padrão de routing key.
-
Regras de corespondência: a routing key é dividida por pontos (
.). Dois curingas são suportados:*(asterisco) – corresponde a exatamente uma palavra (segmento entre pontos).#(cerquilha) – corresponde a zero ou mais palavras.
Exemplo: exchange chamada
meu.topic.exchange, filas:
minha.fila.alpha,minha.fila.beta,exemplo.fila.gama.- Para enviar a todas as filas que começam com
minha: routing keyminha.#. - Para enviar apenas a filas
minha.fila.*(uma palavra apósminha.fila): routing keyminha.fila.*. - O
#equivale a qualquer sequência; o*equivale a um único segmento.
Produtor (Sender)
package com.exemplo.remetente;
import com.exemplo.config.RabbitConst;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicoProdutor {
@Autowired
private RabbitTemplate rabbitTemplate;
public void enviar() {
String mensagem1 = "Mensagem de email ####";
System.out.println("Produtor envia 1: " + mensagem1);
rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPICO, RabbitConst.FILA_EMAIL, mensagem1);
String mensagem2 = "Mensagem de usuario #####";
System.out.println("Produtor envia 2: " + mensagem2);
rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPICO, RabbitConst.FILA_USUARIO, mensagem2);
String mensagem3 = "Mensagem de erro ###";
System.out.println("Produtor envia 3: " + mensagem3);
rabbitTemplate.convertAndSend(RabbitConst.EXCHANGE_TOPICO, "erro.chave", mensagem3);
}
}
Configuração (Binding)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(RabbitConst.EXCHANGE_TOPICO);
}
@Bean
public Queue filaEmail() {
return new Queue(RabbitConst.FILA_EMAIL);
}
@Bean
public Queue filaUsuario() {
return new Queue(RabbitConst.FILA_USUARIO);
}
@Bean
public Queue filaTodos() {
return new Queue(RabbitConst.FILA_TODOS);
}
@Bean
public Binding bindingEmail() {
return BindingBuilder
.bind(filaEmail())
.to(topicExchange())
.with("exemplo.topico.email");
}
@Bean
public Binding bindingUsuario() {
return BindingBuilder
.bind(filaUsuario())
.to(topicExchange())
.with("exemplo.*.usuario");
}
@Bean
public Binding bindingTodos() {
return BindingBuilder
.bind(filaTodos())
.to(topicExchange())
.with("exemplo.#");
}
Consumidor (Receiver)
package com.exemplo.receptor;
import com.exemplo.config.RabbitConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class TopicoReceptor {
@RabbitHandler
@RabbitListener(queues = "exemplo.topico.email")
public void processarEmail(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.info("ReceptorEmail recebeu: " + new String(message.getBody()));
}
@RabbitHandler
@RabbitListener(queues = "exemplo.topico.usuario")
public void processarUsuario(Message message, Channel channel) throws IOException {
// Exemplo de rejeição (NACK)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
log.info("ReceptorUsuario recebeu (rejeitado): " + new String(message.getBody()));
}
@RabbitHandler
@RabbitListener(queues = "exemplo.topico.todos")
public void processarTodos(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.info("ReceptorTodos recebeu: " + new String(message.getBody()));
}
}