RabbitMQ com C#: Implementando Padrões de Mensageria (Fila Simples, Work, Fanout e Direct)

O RabbitMQ é um broker de mensagens robusto e de código aberto, amplamente utilizado em arquiteturas distribuídas para desacoplar componentes e gerenciar a comunicação assíncrona. Ele implementa o Advanced Message Queuing Protocol (AMQP), oferecendo recursos poderosos para roteamento e persistência de mensagens.

O que é o RabbitMQ e Por Que Usá-lo?

Desenvolvido em Erlang, uma linguagem conhecida por sua resiliência e capacidade de lidar com alta concorrência e falhas, o RabbitMQ é uma escolha popular para sistemas que exigem alta disponibilidade e garantia de entrega de mensagens. Suas principais vantagens incluem:

  • Confiabilidade: Garante que as mensagens não serão perdidas, mesmo em caso de falhas, através de filas duráveis e confirmação de entrega.
  • Assincronia: Permite que componentes enviem mensagens sem esperar por uma resposta imediata, liberando recursos e melhorando a responsividade da aplicação.
  • Desacoplamento: Produtores e consumidores de mensagens não precisam conhecer uns aos outros diretamente, o que facilita a manutenção e evolução do sistema.
  • Escalabilidade: Facilita a expansão do sistema adicionando mais consumidores ou produtores, e suporta padrões como filas de trabalho para distribuição de carga.

Embora outras soluções de mensageria como o Apache Kafka se destaquem em cenários de alto throughput e streaming de dados, o RabbitMQ brilha em ambientes onde a consistência, estabilidade e confiabilidade da entrega de cada mensagem são críticas, como em sistemas financeiros e corporativos.

Instalação e Configuração do RabbitMQ

Existem diversas maneiras de instalar o RabbitMQ. Uma abordagem comum para desenvolvimento e testes é utilizar Docker. O comando a seguir inicializa uma instância do RabbitMQ com o plugin de gerenciamento habilitado, expondo as portas necessárias e configurando um usuário padrão:

docker run -d --hostname rabbit-host --restart=always --name rabbitmq-broker -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=app_user -e RABBITMQ_DEFAULT_PASS=SuperSecurePwd123 rabbitmq:3-management

Este comando inicia o container em segundo plano, nomeia-o como rabbitmq-broker, mapeia a porta AMQP padrão (5672) e a porta da interface de gerenciamento (15672), e define as credenciais de acesso padrão (usuário: app_user, senha: SuperSecurePwd123). O hostname interno do container será rabbit-host.

Cenários de Uso para Filas de Mensagens

Um sistema de mensageria é ideal em situações como:

  • Dependências de Tarefas Assíncronas: Quando uma operação gera dados que precisam ser processados por múltiplas tarefas downstream, e o produtor não precisa esperar pelo resultado.
  • Processamento de Longa Duração: Para mover tarefas demoradas para um processamento em segundo plano, evitando bloqueios na interface do usuário ou APIs.
  • Fan-out de Eventos: Quendo um evento precisa ser notificado para vários serviços ou módulos, sem que o emissor precise gerenciar cada destinatário individualmente.

Por outro lado, não utilize filas de mensagens em cenários que exigem feedback síncrono e imediato sobre o resultado de uma operação.

Implementação em C# com RabbitMQ

A seguir, exploraremos diferentes padrões de uso do RabbitMQ com exemplos em C#. Para facilitar, utilizaremos uma classe auxiliar que encapsula a lógica de conexão e operação com o broker. Os exemplos são baseados no .NET Core 3.1 e interagem com uma instância do RabbitMQ em Docker, conforme configurado anteriormente.

1. Padrão Fila Simples (Simple Queue)

Neste padrão, uma fila serve como um canal direto onde mensagens são publicadas por um produtor e consumidas por um único consumidor. É a forma mais básica de comunicação.

Código do Cliente de Mensageria (RabbitMqMessagingClient.cs)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace MyMessagingService
{
    public class RabbitMqMessagingClient
    {
        private readonly ConnectionFactory _factory;
        private readonly string _hostname;
        private readonly string _username;
        private readonly string _password;

        public RabbitMqMessagingClient(string hostname = "localhost", string username = "app_user", string password = "SuperSecurePwd123")
        {
            _hostname = hostname;
            _username = username;
            _password = password;

            _factory = new ConnectionFactory()
            {
                HostName = _hostname,
                UserName = _username,
                Password = _password,
                Port = AmqpTcpEndpoint.DefaultAmqpPort // 5672
            };
        }

        // Fila Simples: Publicador
        public void PublicarMensagemSimples(string nomeFila, string conteudoMensagem)
        {
            using (var conexao = _factory.CreateConnection())
            using (var canal = conexao.CreateModel())
            {
                canal.QueueDeclare(queue: nomeFila, durable: false, exclusive: false, autoDelete: false, arguments: null);
                var corpoMensagem = Encoding.UTF8.GetBytes(conteudoMensagem);
                canal.BasicPublish(exchange: "", routingKey: nomeFila, basicProperties: null, body: corpoMensagem);
                Console.WriteLine($" [Produtor] Mensagem enviada para '{nomeFila}': '{conteudoMensagem}'");
            }
        }

        // Fila Simples: Consumidor
        public void ConsumirMensagemSimples(string nomeFila, Action<string> processadorMensagem)
        {
            Console.WriteLine($" [Consumidor] Aguardando mensagens na fila '{nomeFila}'...");

            // Conexão e canal devem permanecer abertos para o consumidor assíncrono
            var conexao = _factory.CreateConnection();
            var canal = conexao.CreateModel();

            canal.QueueDeclare(queue: nomeFila, durable: false, exclusive: false, autoDelete: false, arguments: null);

            var consumidorEventos = new EventingBasicConsumer(canal);
            consumidorEventos.Received += (model, eventoEntrega) =>
            {
                var dadosMensagem = eventoEntrega.Body.ToArray();
                var mensagemTexto = Encoding.UTF8.GetString(dadosMensagem);
                processadorMensagem?.Invoke(mensagemTexto);
            };
            canal.BasicConsume(queue: nomeFila, autoAck: true, consumer: consumidorEventos);

            // O aplicativo chamador deve manter o processo ativo (ex: Console.ReadLine())
            // para que o consumidor continue ouvindo.
        }
        
        // ... Outros métodos virão aqui ...
    }
}

Aplicação Produtora (Console)

using System;
using MyMessagingService;

namespace ProducerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Produtor de Mensagens Simples ====");
            var cliente = new RabbitMqMessagingClient();

            for (int i = 0; i < 50; i++)
            {
                cliente.PublicarMensagemSimples("minhaFilaSimples", $"Olá do produtor: {i}");
            }
            Console.WriteLine("Produção de mensagens concluída!");
            Console.ReadLine();
        }
    }
}

Aplicação Consumidora (Console)

using System;
using MyMessagingService;

namespace ConsumerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Consumidor de Mensagens Simples ====");
            var cliente = new RabbitMqMessagingClient();
            cliente.ConsumirMensagemSimples("minhaFilaSimples", mensagem =>
            {
                Console.WriteLine($" [Recebido] {DateTime.Now}: {mensagem}");
            });
            Console.WriteLine("Pressione [Enter] para sair.");
            Console.ReadLine(); // Mantém o consumidor ativo
        }
    }
}

2. Padrão Filas de Trabalho (Work Queues)

As Filas de Trabalho são usadas para distribuir tarefas entre múltiplos consumidores, evitando que uma única tarefa seja processada por mais de um consumidor. Cada mensagem é entregue a apenas um trabalhador, e a carga de trabalho é balanceada.

Código do Cliente de Mensageria (RabbitMqMessagingClient.cs) - Continuação

// ... (código anterior da classe RabbitMqMessagingClient) ...

        // Fila de Trabalho: Publicador
        public void PublicarTarefaFilaTrabalho(string nomeFila, string conteudoTarefa)
        {
            using (var conexao = _factory.CreateConnection())
            using (var canal = conexao.CreateModel())
            {
                // Declara uma fila durável (as mensagens sobreviverão a reinícios do broker)
                canal.QueueDeclare(queue: nomeFila, durable: true, exclusive: false, autoDelete: false, arguments: null);
                var corpoTarefa = Encoding.UTF8.GetBytes(conteudoTarefa);

                var propriedades = canal.CreateBasicProperties();
                propriedades.Persistent = true; // Marca a mensagem como persistente

                canal.BasicPublish(exchange: "", routingKey: nomeFila, basicProperties: propriedades, body: corpoTarefa);
                Console.WriteLine($" [Produtor Work] Tarefa publicada: {conteudoTarefa}");
            }
        }

        // Fila de Trabalho: Consumidor
        public void ConsumirFilaTrabalho(string nomeFila, Action<string> processadorTarefa)
        {
            Console.WriteLine($" [Consumidor Work] Aguardando tarefas na fila '{nomeFila}'...");

            var conexao = _factory.CreateConnection();
            var canal = conexao.CreateModel();

            canal.QueueDeclare(queue: nomeFila, durable: true, exclusive: false, autoDelete: false, arguments: null);
            
            // Configura o prefetchCount para 1, distribuindo as mensagens um por um entre consumidores
            canal.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

            var consumidorEventos = new EventingBasicConsumer(canal);
            consumidorEventos.Received += (model, eventoEntrega) =>
            {
                var corpoDados = eventoEntrega.Body.ToArray();
                var mensagemRecebida = Encoding.UTF8.GetString(corpoDados);
                processadorTarefa?.Invoke(mensagemRecebida);
                
                // Envia ACK para confirmar o processamento da mensagem
                canal.BasicAck(deliveryTag: eventoEntrega.DeliveryTag, multiple: false);
            };
            // autoAck: false, pois o consumidor irá confirmar manualmente
            canal.BasicConsume(queue: nomeFila, autoAck: false, consumer: consumidorEventos);

            // O aplicativo chamador deve manter o processo ativo.
        }
    }
}

Aplicação Produtora (Console)

using System;
using MyMessagingService;

namespace WorkProducerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Produtor de Filas de Trabalho ====");
            var cliente = new RabbitMqMessagingClient();

            for (int i = 0; i < 500; i++)
            {
                cliente.PublicarTarefaFilaTrabalho("filaDeProcessamento", $"Processar item {i}");
            }
            Console.WriteLine("Publicação de tarefas concluída!");
            Console.ReadLine();
        }
    }
}

Aplicação Consumidora (Console) - Execute várias instâncias para ver a distribuição de carga

using System;
using MyMessagingService;

namespace WorkConsumerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Consumidor de Filas de Trabalho ====");
            var cliente = new RabbitMqMessagingClient();
            cliente.ConsumirFilaTrabalho("filaDeProcessamento", tarefa =>
            {
                // Simula um processamento demorado
                System.Threading.Thread.Sleep(500); 
                Console.WriteLine($" [Consumidor {Environment.ProcessId}] Tarefa recebida: {tarefa}");
            });
            Console.WriteLine("Pressione [Enter] para sair.");
            Console.ReadLine();
        }
    }
}

3. Padrão Publicação/Assinatura (Fanout Exchange)

O padrão Fanout (leque) é ideal para broadcast de mensagens. Um produtor envia uma mensagem para um 'exchange' do tipo Fanout, e este 'exchange' a reencaminha para *todas* as filas que estão vinculadas a ele. Assim, cada consumidor que tem uma fila vinculada ao exchange receberá uma cópia da mensagem.

Código do Cliente de Mansageria (RabbitMqMessagingClient.cs) - Continuação

// ... (código anterior da classe RabbitMqMessagingClient) ...

        // Fanout: Publicador
        public void PublicarBroadcast(string nomeExchange, string conteudoBroadcast)
        {
            using (var conexao = _factory.CreateConnection())
            using (var canal = conexao.CreateModel())
            {
                // Declara um exchange do tipo Fanout
                canal.ExchangeDeclare(exchange: nomeExchange, type: ExchangeType.Fanout);
                var corpoDados = Encoding.UTF8.GetBytes(conteudoBroadcast);
                
                // Publica a mensagem sem uma chave de roteamento, pois Fanout ignora
                canal.BasicPublish(exchange: nomeExchange, routingKey: "", basicProperties: null, body: corpoDados);
                Console.WriteLine($" [Produtor Fanout] Broadcast enviado: {conteudoBroadcast}");
            }
        }

        // Fanout: Assinante
        public void AssinarBroadcast(string nomeExchange, Action<string> processadorBroadcast)
        {
            Console.WriteLine($" [Assinante Fanout] Aguardando mensagens do exchange '{nomeExchange}'...");

            var conexao = _factory.CreateConnection();
            var canal = conexao.CreateModel();

            canal.ExchangeDeclare(exchange: nomeExchange, type: ExchangeType.Fanout);

            // Declara uma fila temporária exclusiva, auto-excluída ao fechar a conexão
            var filaTemporaria = canal.QueueDeclare().QueueName;
            
            // Vincula a fila temporária ao exchange Fanout
            canal.QueueBind(queue: filaTemporaria, exchange: nomeExchange, routingKey: "");

            var consumidorEventos = new EventingBasicConsumer(canal);
            consumidorEventos.Received += (model, eventoEntrega) =>
            {
                var corpoDados = eventoEntrega.Body.ToArray();
                var mensagemRecebida = Encoding.UTF8.GetString(corpoDados);
                processadorBroadcast?.Invoke(mensagemRecebida);
            };
            canal.BasicConsume(queue: filaTemporaria, autoAck: true, consumer: consumidorEventos);
        }
    }
}

Aplicação Produtora (Console)

using System;
using MyMessagingService;

namespace FanoutProducerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Produtor Fanout (Broadcast) ====");
            var cliente = new RabbitMqMessagingClient();

            for (int i = 0; i < 50; i++)
            {
                cliente.PublicarBroadcast("notificacoesGerais", $"Evento de sistema nº {i}");
            }
            Console.WriteLine("Publicação de broadcast concluída!");
            Console.ReadLine();
        }
    }
}

Aplicação Consumidora (Console) - Execute várias instâncias para cada uma receber todas as mensagens

using System;
using MyMessagingService;

namespace FanoutConsumerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Assinante Fanout (Broadcast) ====");
            var cliente = new RabbitMqMessagingClient();
            cliente.AssinarBroadcast("notificacoesGerais", mensagem =>
            {
                Console.WriteLine($" [Assinante {Environment.ProcessId}] Mensagem recebida: {mensagem}");
            });
            Console.WriteLine("Pressione [Enter] para sair.");
            Console.ReadLine();
        }
    }
}

4. Padrão Roteamento Direto (Direct Exchange)

O Direct Exchange permite rotear mensagens para filas específicas com base em uma 'routing key'. O produtor envia a mensagem com uma 'routing key', e o exchange a envia apenas para as filas que foram vinculadas a ele com a mesma 'routing key'.

Código do Cliente de Mensageria (RabbitMqMessagingClient.cs) - Continuação

// ... (código anterior da classe RabbitMqMessagingClient) ...

        // Direct: Publicador
        public void PublicarDiretoComRota(string conteudoMensagem, string nomeExchange = "log_events", string chaveRota = "info")
        {
            using (var conexao = _factory.CreateConnection())
            using (var canal = conexao.CreateModel())
            {
                // Declara um exchange do tipo Direct
                canal.ExchangeDeclare(exchange: nomeExchange, type: ExchangeType.Direct);
                var corpoDados = Encoding.UTF8.GetBytes(conteudoMensagem);
                
                // Publica a mensagem com uma chave de roteamento
                canal.BasicPublish(exchange: nomeExchange, routingKey: chaveRota, basicProperties: null, body: corpoDados);
                Console.WriteLine($" [Produtor Direct] Enviado com rota '{chaveRota}': {conteudoMensagem}");
            }
        }

        // Direct: Consumidor
        public void ConsumirDiretoComRota(string nomeExchange = "log_events", string chaveRota = "info", Action<string> processadorMensagem = null)
        {
            Console.WriteLine($" [Consumidor Direct] Aguardando mensagens do exchange '{nomeExchange}' com rota '{chaveRota}'...");

            var conexao = _factory.CreateConnection();
            var canal = conexao.CreateModel();
            
            canal.ExchangeDeclare(exchange: nomeExchange, type: ExchangeType.Direct);

            // Declara uma fila temporária
            var filaVinculada = canal.QueueDeclare().QueueName;
            
            // Vincula a fila ao exchange com a chave de roteamento específica
            canal.QueueBind(queue: filaVinculada, exchange: nomeExchange, routingKey: chaveRota);

            var consumidorEventos = new EventingBasicConsumer(canal);
            consumidorEventos.Received += (sender, eventoEntrega) =>
            {
                var dadosRecebidos = eventoEntrega.Body.ToArray();
                var chaveDeRotaRecebida = eventoEntrega.RoutingKey;
                var mensagemTexto = Encoding.UTF8.GetString(dadosRecebidos);
                
                processadorMensagem?.Invoke(mensagemTexto);
                Console.WriteLine($" [Consumidor Direct - Rota: {chaveDeRotaRecebida}] Mensagem: {mensagemTexto}");
            };
            canal.BasicConsume(queue: filaVinculada, autoAck: true, consumer: consumidorEventos);
        }
    }
}

Aplicação Produtora (Console) - Pode passar a rota via argumento de linha de comando

using System;
using MyMessagingService;

namespace DirectProducerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Produtor Direct (Roteamento) ====");
            var cliente = new RabbitMqMessagingClient();
            string rotaPadrao = "auditoria"; // Rota padrão se nenhum argumento for fornecido

            if (args.Length > 0 && args[0].StartsWith("--rota="))
            {
                rotaPadrao = args[0].Split('=')[1];
            }
            Console.WriteLine($"Publicando com rota: {rotaPadrao}");

            for (int i = 0; i < 20; i++)
            {
                string msg = $"Registro de evento {i}";
                cliente.PublicarDiretoComRota(msg, chaveRota: rotaPadrao);
            }
            Console.WriteLine("Publicação direta concluída!");
            Console.ReadLine();
        }
    }
}

Exemplo de execução do produtor para enviar mensagens de "erro": dotnet run --project DirectProducerApp -- --rota=erro

Aplicação Consumidora (Console) - Pode passar a rota para assinar via argumento

using System;
using MyMessagingService;

namespace DirectConsumerApp
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("==== Consumidor Direct (Roteamento) ====");
            var cliente = new RabbitMqMessagingClient();
            string rotaAssinada = "auditoria"; // Rota padrão a ser assinada

            if (args.Length > 0 && args[0].StartsWith("--rota="))
            {
                rotaAssinada = args[0].Split('=')[1];
            }
            
            Console.WriteLine($"Assinando mensagens com rota: {rotaAssinada}");
            cliente.ConsumirDiretoComRota(chaveRota: rotaAssinada, handler: msg =>
            {
                Console.WriteLine($" [Consumidor {Environment.ProcessId}] Recebido: {msg}");
            });
            Console.WriteLine("Pressione [Enter] para sair.");
            Console.ReadLine();
        }
    }
}

Exemplo de execução do consumidor para receber mensagens de "auditoria": dotnet run --project DirectConsumerApp -- --rota=auditoria

Tags: C# RabbitMQ AMQP FilasDeMensagens Mensageria

Publicado em 5-30 07:33 por Thomas