Aprimoramento do Sistema de Notificações em Tempo Real com RabbitMQ, Netty e WebSocket

Em uma aplicação social, os usuários recebem alertas por meio de dois mecanismos distintos:

  • Consulta iniciada pelo cliente: após autenticação, o usuário pergunta ao servidor se há novas notificações.
  • Entrega iniciada pelo servidor: enquanto o usuário está online, o servidor envia alertas imediatamente.

A abordagem anterior baseava-se em uma tabela chamada tb_notice_fresh e em requisições periódicas do cliente. Esse modelo apresentava dois gargalos principais:

  1. Acesso frequente ao banco de dados para verificar alertas pendentes.
  2. Uso de polling HTTP, que simula entrega em tempo real sem oferecer um canal persistente.

A nova arquitetura adota três mudanças:

  • As notificações pendentes passam a residir no RabbitMQ, não mais na tabela relacional.
  • O polling é substituído por uma conexão persistente WebSocket.
  • O microserviço de notificações incorpora o Netty para gerenciar essas conexões.

Fluxos de entrega

Consulta após login

  1. O cliente abre uma conexão WebSocket com o servidor Netty.
  2. O servidor armazena essa conexão em um mapa indexado pelo identificador do usuário.
  3. O Netty consulta as filas do RabbitMQ associadas ao usuário.
  4. Se houver mensagens, o servidor responde com a quantidade de notificações não lidas.

Entrega ativa durante a sessão

  1. O RabbitMQ entrega uma nova mensagem ao Netty.
  2. O Netty localiza a conexão WebSocket ativa do destinatário.
  3. A contagem atualizada de notificações é enviada pela conexão ativa.

Infraestrutura RabbitMQ

Para executar o broker em um contêiner Docker:

docker run -id --name=rabbit-notify \
  -p 5671:5671 \
  -p 5672:5672 \
  -p 4369:4369 \
  -p 15672:15672 \
  -p 25672:25672 \
  rabbitmq:management

A console de administração fica disponível em http://192.168.240.134:15672, com credenciais padrão guest/guest.

Assinatura de artigos com RabbitMQ

Configuração do microserviço de artigos

O serviço responsável pelos artigos precisa publicar e gerenciar filas. Inclua a dependência apropriada no pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

E adicione as propriedades de conexão no application.yml:

spring:
  rabbitmq:
    host: 192.168.240.134
    port: 5672
    username: guest
    password: guest

Lógica de assinatura e desassinatura

A operação de assinar um autor cria uma fila exclusiva para o leitor e a vincula a um exchange direct usando o identificador do autor como chave de roteamento. A desassinatura remove a ligação.

    @Autowired
    private RabbitTemplate templateAmqp;

    public Boolean alternarAssinatura(String leitorId, String artigoId) {
        String autorId = artigoDao.selectById(artigoId).getUserid();

        RabbitAdmin admin = new RabbitAdmin(templateAmqp.getConnectionFactory());
        DirectExchange exchange = new DirectExchange("artigo.assinatura");
        admin.declareExchange(exchange);

        String nomeFila = "artigo.assinatura." + leitorId;
        Queue fila = new Queue(nomeFila, true);
        Binding ligacao = BindingBuilder.bind(fila).to(exchange).with(autorId);

        String chaveLeitor = "assinaturas.leitor." + leitorId;
        String chaveAutor = "assinantes.autor." + autorId;

        Boolean jaAssinou = redisTemplate.boundSetOps(chaveLeitor).isMember(autorId);

        if (jaAssinou) {
            redisTemplate.boundSetOps(chaveLeitor).remove(autorId);
            redisTemplate.boundSetOps(chaveAutor).remove(leitorId);
            admin.removeBinding(ligacao);
            return false;
        }

        redisTemplate.boundSetOps(chaveLeitor).add(autorId);
        redisTemplate.boundSetOps(chaveAutor).add(leitorId);
        admin.declareQueue(fila);
        admin.declareBinding(ligacao);
        return true;
    }

Disparo de notificações ao publicar

Quando um novo artigo é salvo, o serviço publica uma mensagem no exchange, usando o autor como chave de roteamento:

    public void publicar(Artigo artigo) {
        artigoDao.insert(artigo);

        String autorId = artigo.getUserid();
        String artigoId = artigo.getId();

        templateAmqp.convertAndSend("artigo.assinatura", autorId, artigoId);
    }

Ajuste no serviço de notificações

Como o RabbitMQ agora gerencia os alertas pendentes, removemos a inserção na tabela auxiliar tb_notice_fresh:

    public void armazenar(Notice alerta) {
        alerta.setState("0");
        alerta.setCreatetime(new Date());
        alerta.setId(idWorker.nextId() + "");
        noticeDao.insert(alerta);

        // O lembrete de novas mensagens agora é controlado pelo RabbitMQ
    }

De I/O bloqueante a I/O não bloqueante

I/O tradicional

No modelo clássico, cada conexão recebe uma thread dedicada. O servidor aguarda dados e, enquanto espera, a thread permanece bloqueada.

package demo.io.classico;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class ServidorBloqueante {

    public static void main(String[] args) throws Exception {
        ServerSocket servidor = new ServerSocket(8000);

        while (true) {
            Socket conexao = servidor.accept();

            new Thread(() -> {
                try (InputStream entrada = conexao.getInputStream()) {
                    byte[] buffer = new byte[1024];
                    int lidos;

                    while ((lidos = entrada.read(buffer)) != -1) {
                        System.out.println(Thread.currentThread().getName()
                                + ": " + new String(buffer, 0, lidos));
                    }
                } catch (Exception ignorada) { }
            }).start();
        }
    }
}

Esse modelo não escala bem: milhares de conexões simultâneas exigem milhares de threads, consumindo memória e aumentando a pressão sobre o escalonador do sistema operacional.

I/O não bloqueante (NIO)

O NIO introduz três elementos centrais:

  • Canal (Channel): via de comunicação bidirecional, distinta dos fluxos unidirecionais do I/O clássico.
  • Buffer: região de memória usada para leitura e escrita em blocos.
  • Seletor (Selector): permite que uma única thread monitore vários canais simultaneamente.

Com o seletor, uma thread verifica quais canais possuem dados prontos, evitando uma thread por conexão.

package demo.io.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class ServidorNio {

    public static void main(String[] args) throws Exception {
        Selector selectorConexoes = Selector.open();
        Selector selectorDados = Selector.open();

        new Thread(() -> {
            try {
                ServerSocketChannel canalServidor = ServerSocketChannel.open();
                canalServidor.socket().bind(new InetSocketAddress(8000));
                canalServidor.configureBlocking(false);
                canalServidor.register(selectorConexoes, SelectionKey.OP_ACCEPT);

                while (true) {
                    if (selectorConexoes.select(1) > 0) {
                        Set<SelectionKey> chaves = selectorConexoes.selectedKeys();
                        Iterator<SelectionKey> iterador = chaves.iterator();

                        while (iterador.hasNext()) {
                            SelectionKey chave = iterador.next();

                            if (chave.isAcceptable()) {
                                SocketChannel cliente = ((ServerSocketChannel) chave.channel()).accept();
                                cliente.configureBlocking(false);
                                cliente.register(selectorDados, SelectionKey.OP_READ);
                            }

                            iterador.remove();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                while (true) {
                    if (selectorDados.select(1) > 0) {
                        Set<SelectionKey> chaves = selectorDados.selectedKeys();
                        Iterator<SelectionKey> iterador = chaves.iterator();

                        while (iterador.hasNext()) {
                            SelectionKey chave = iterador.next();

                            if (chave.isReadable()) {
                                SocketChannel cliente = (SocketChannel) chave.channel();
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                cliente.read(buffer);
                                buffer.flip();

                                System.out.println(Thread.currentThread().getName()
                                        + ": " + Charset.defaultCharset()
                                        .newDecoder().decode(buffer).toString());
                            }

                            iterador.remove();
                            chave.interestOps(SelectionKey.OP_READ);
                        }
                    }
                }
            } catch (Exception ignorada) { }
        }).start();
    }
}

Por que adotar o Netty

O Netty encapsula a complexidade do NIO e oferece uma pilha completa para aplicações de rede. Entre suas vantagens:

  • Modelo de eventos assíncronos que simplifica o código.
  • Capacidade de alternar entre I/O bloqueante e não bloqueante com poucas mudanças.
  • Decodificadores e codificadores embutidos.
  • Correção de problemas conhecidos do NIO do JDK, como busy-wait.
  • Modelo de threads otimizado para alta concorrência.
  • Suporte a diversos protocolos, incluindo WebSocket, HTTP e protobuf.

Adicione a dependência:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.5.Final</version>
</dependency>

Servidor básico com Netty

package demo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class ServidorNetty {

    public static void main(String[] args) {
        NioEventLoopGroup grupoAceite = new NioEventLoopGroup();
        NioEventLoopGroup grupoTrabalho = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(grupoAceite, grupoTrabalho)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel canal) {
                        ChannelPipeline linha = canal.pipeline();
                        linha.addLast(new StringDecoder());
                        linha.addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String mensagem) {
                                System.out.println(mensagem);
                            }
                        });
                    }
                })
                .bind(8000);
    }
}

Cliente básico com Netty

package demo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

public class ClienteNetty {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup grupo = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(grupo)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel canal) {
                        canal.pipeline().addLast(new StringEncoder());
                    }
                });

        Channel canal = bootstrap.connect("127.0.0.1", 8000).sync().channel();

        while (true) {
            canal.writeAndFlush("dados de teste");
            Thread.sleep(2000);
        }
    }
}

Integrando Netty e WebSocket no microserviço de notificações

Dependências e configuração

No microserviço de notificações, adicione:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.5.Final</version>
</dependency>

Inicialização do servidor Netty

package notify.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

public class NotificacaoNettyServer {

    public void iniciar(int porta) {
        EventLoopGroup grupoAceite = new NioEventLoopGroup();
        EventLoopGroup grupoTrabalho = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(grupoAceite, grupoTrabalho)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel canal) {
                        canal.pipeline().addLast(new HttpServerCodec());
                        canal.pipeline().addLast(new HttpObjectAggregator(65536));
                        canal.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
                        canal.pipeline().addLast(new NotificacaoWebSocketHandler());
                    }
                })
                .bind(porta);
    }
}

Para subir o servidor junto com a aplicação Spring Boot, crie um bean de configuração:

package notify.config;

import notify.netty.NotificacaoNettyServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NotificacaoNettyConfig {

    @Bean
    public NotificacaoNettyServer servidorNetty() {
        NotificacaoNettyServer servidor = new NotificacaoNettyServer();

        new Thread(() -> servidor.iniciar(1234)).start();

        return servidor;
    }
}

Utilitário para acessar o contexto Spring

package notify.config;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class ContextoAplicacaoUtil implements ApplicationContextAware {

    private static ApplicationContext contexto;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        contexto = applicationContext;
    }

    public static ApplicationContext obterContexto() {
        return contexto;
    }

    public static <T> T obterBean(Class<T> classe) {
        return contexto.getBean(classe);
    }
}

Handler WebSocket

package notify.netty;

import com.fasterxml.jackson.databind.ObjectMapper;
import notify.config.ContextoAplicacaoUtil;
import notify.vo.Resposta;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

public class NotificacaoWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final ObjectMapper CONVERSOR = new ObjectMapper();

    public static final ConcurrentHashMap<String, Channel> CANAIS_ATIVOS = new ConcurrentHashMap<>();

    private final RabbitTemplate template = ContextoAplicacaoUtil.obterBean(RabbitTemplate.class);

    private final SimpleMessageListenerContainer containerAssinaturas =
            (SimpleMessageListenerContainer) ContextoAplicacaoUtil.obterContexto()
                    .getBean("containerAssinaturas");

    private final SimpleMessageListenerContainer containerCurtidas =
            (SimpleMessageListenerContainer) ContextoAplicacaoUtil.obterContexto()
                    .getBean("containerCurtidas");

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame quadro) throws Exception {
        String json = quadro.text();
        String usuarioId = CONVERSOR.readTree(json).get("userId").asText();

        Channel canal = CANAIS_ATIVOS.computeIfAbsent(usuarioId, id -> ctx.channel());

        RabbitAdmin admin = new RabbitAdmin(template);

        String filaAssinaturas = "artigo.assinatura." + usuarioId;
        Properties propsAssinaturas = admin.getQueueProperties(filaAssinaturas);
        int qtdAssinaturas = propsAssinaturas == null ? 0
                : (int) propsAssinaturas.get("QUEUE_MESSAGE_COUNT");

        String filaCurtidas = "artigo.curtida." + usuarioId;
        Properties propsCurtidas = admin.getQueueProperties(filaCurtidas);
        int qtdCurtidas = propsCurtidas == null ? 0
                : (int) propsCurtidas.get("QUEUE_MESSAGE_COUNT");

        Map<String, Integer> contagens = new HashMap<>();
        contagens.put("assinaturas", qtdAssinaturas);
        contagens.put("curtidas", qtdCurtidas);

        Resposta resposta = new Resposta(200, "OK", contagens);
        canal.writeAndFlush(new TextWebSocketFrame(CONVERSOR.writeValueAsString(resposta)));

        if (qtdAssinaturas > 0) {
            admin.purgeQueue(filaAssinaturas, true);
        }
        if (qtdCurtidas > 0) {
            admin.purgeQueue(filaCurtidas, true);
        }

        containerAssinaturas.addQueueNames(filaAssinaturas);
        containerCurtidas.addQueueNames(filaCurtidas);
    }
}

Configuração dos listeners RabbitMQ

package notify.config;

import notify.listener.CurtidaListener;
import notify.listener.AssinaturaListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitListenerConfig {

    @Bean("containerAssinaturas")
    public SimpleMessageListenerContainer criarContainerAssinaturas(ConnectionFactory factory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        container.setExposeListenerChannel(true);
        container.setMessageListener(new AssinaturaListener());
        return container;
    }

    @Bean("containerCurtidas")
    public SimpleMessageListenerContainer criarContainerCurtidas(ConnectionFactory factory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        container.setExposeListenerChannel(true);
        container.setMessageListener(new CurtidaListener());
        return container;
    }
}

Listeners de entrega ativa

package notify.listener;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import notify.netty.NotificacaoWebSocketHandler;
import notify.vo.Resposta;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import java.util.HashMap;
import java.util.Map;

public class AssinaturaListener implements ChannelAwareMessageListener {

    private static final ObjectMapper CONVERSOR = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String nomeFila = message.getMessageProperties().getConsumerQueue();
        String usuarioId = nomeFila.substring(nomeFila.lastIndexOf(".") + 1);

        io.netty.channel.Channel canalWs = NotificacaoWebSocketHandler.CANAIS_ATIVOS.get(usuarioId);

        if (canalWs != null) {
            Map<String, Integer> dados = new HashMap<>();
            dados.put("assinaturas", 1);
            Resposta resposta = new Resposta(200, "Nova notificação", dados);
            canalWs.writeAndFlush(new TextWebSocketFrame(CONVERSOR.writeValueAsString(resposta)));
        }
    }
}

package notify.listener;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import notify.netty.NotificacaoWebSocketHandler;
import notify.vo.Resposta;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import java.util.HashMap;
import java.util.Map;

public class CurtidaListener implements ChannelAwareMessageListener {

    private static final ObjectMapper CONVERSOR = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String nomeFila = message.getMessageProperties().getConsumerQueue();
        String usuarioId = nomeFila.substring(nomeFila.lastIndexOf(".") + 1);

        io.netty.channel.Channel canalWs = NotificacaoWebSocketHandler.CANAIS_ATIVOS.get(usuarioId);

        if (canalWs != null) {
            Map<String, Integer> dados = new HashMap<>();
            dados.put("curtidas", 1);
            Resposta resposta = new Resposta(200, "Nova curtida", dados);
            canalWs.writeAndFlush(new TextWebSocketFrame(CONVERSOR.writeValueAsString(resposta)));
        }
    }
}

Notificações pontuais de curtidas

Quando um usuário curte um artigo, o sistema deve notificar o autor. Essa comunicação usa uma fila dedicada por autor, sem exchange intermediário.

    public void curtir(String artigoId, String usuarioId) {
        Artigo artigo = artigoDao.selectById(artigoId);
        artigo.setThumbup(artigo.getThumbup() + 1);
        artigoDao.updateById(artigo);

        Notice alerta = new Notice();
        alerta.setReceiverId(artigo.getUserid());
        alerta.setOperatorId(usuarioId);
        alerta.setAction("thumbup");
        alerta.setTargetType("article");
        alerta.setTargetId(artigoId);
        alerta.setCreatetime(new Date());
        alerta.setType("user");
        alerta.setState("0");
        noticeClient.save(alerta);

        RabbitAdmin admin = new RabbitAdmin(templateAmqp.getConnectionFactory());
        String filaAutor = "artigo.curtida." + artigo.getUserid();
        Queue fila = new Queue(filaAutor, true);
        admin.declareQueue(fila);

        templateAmqp.convertAndSend(filaAutor, artigoId);
    }

Página de teste WebSocket

Para validar a comunicação, crie um arquivo index.html em src/main/resources/static:


<html>
<head>
    <meta charset="utf-8">
    <title>Teste WebSocket - Notificações</title>
</head>
<body>
<h1>Teste de notificações em tempo real</h1>

<button onclick="conectar()">Conectar</button>
<button onclick="desconectar()">Desconectar</button>
<br><br>

<textarea id="mensagem" rows="6" cols="40">{ "userId": "1" }</textarea>
<button onclick="enviar()">Enviar</button>

<hr>

<h3>Respostas do servidor</h3>
<textarea id="resposta" rows="10" cols="60" readonly></textarea>

<script>
    var socket;

    function conectar() {
        socket = new WebSocket("ws://127.0.0.1:1234/ws");

        socket.onopen = function() {
            document.getElementById("resposta").value += "\nConectado";
        };

        socket.onmessage = function(evento) {
            var area = document.getElementById("resposta");
            area.value += "\n" + evento.data;
            area.scrollTop = area.scrollHeight;
        };

        socket.onclose = function() {
            document.getElementById("resposta").value += "\nDesconectado";
        };
    }

    function desconectar() {
        if (socket) {
            socket.close();
        }
    }

    function enviar() {
        if (socket && socket.readyState === WebSocket.OPEN) {
            socket.send(document.getElementById("mensagem").value);
        }
    }
</script>
</body>
</html>

Suba os microserviços eureka, user, article e notice. Publique um artigo ou execute uma curtida; a página deverá exibir a contagem atualizada de notificações sem recarregar.

Tags: RabbitMQ Netty WebSocket Spring Boot nio

Publicado em 6-23 22:56