Em uma aplicação social, os usuários recebem alertas por meio de dois mecanismos distintos:
- Consulta iniciada pelo cliente: após autenticação, o usuário pergunta ao servidor se há novas notificações.
- Entrega iniciada pelo servidor: enquanto o usuário está online, o servidor envia alertas imediatamente.
A abordagem anterior baseava-se em uma tabela chamada tb_notice_fresh e em requisições periódicas do cliente. Esse modelo apresentava dois gargalos principais:
- Acesso frequente ao banco de dados para verificar alertas pendentes.
- Uso de polling HTTP, que simula entrega em tempo real sem oferecer um canal persistente.
A nova arquitetura adota três mudanças:
- As notificações pendentes passam a residir no RabbitMQ, não mais na tabela relacional.
- O polling é substituído por uma conexão persistente WebSocket.
- O microserviço de notificações incorpora o Netty para gerenciar essas conexões.
Fluxos de entrega
Consulta após login
- O cliente abre uma conexão WebSocket com o servidor Netty.
- O servidor armazena essa conexão em um mapa indexado pelo identificador do usuário.
- O Netty consulta as filas do RabbitMQ associadas ao usuário.
- Se houver mensagens, o servidor responde com a quantidade de notificações não lidas.
Entrega ativa durante a sessão
- O RabbitMQ entrega uma nova mensagem ao Netty.
- O Netty localiza a conexão WebSocket ativa do destinatário.
- A contagem atualizada de notificações é enviada pela conexão ativa.
Infraestrutura RabbitMQ
Para executar o broker em um contêiner Docker:
docker run -id --name=rabbit-notify \
-p 5671:5671 \
-p 5672:5672 \
-p 4369:4369 \
-p 15672:15672 \
-p 25672:25672 \
rabbitmq:management
A console de administração fica disponível em http://192.168.240.134:15672, com credenciais padrão guest/guest.
Assinatura de artigos com RabbitMQ
Configuração do microserviço de artigos
O serviço responsável pelos artigos precisa publicar e gerenciar filas. Inclua a dependência apropriada no pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
E adicione as propriedades de conexão no application.yml:
spring:
rabbitmq:
host: 192.168.240.134
port: 5672
username: guest
password: guest
Lógica de assinatura e desassinatura
A operação de assinar um autor cria uma fila exclusiva para o leitor e a vincula a um exchange direct usando o identificador do autor como chave de roteamento. A desassinatura remove a ligação.
@Autowired
private RabbitTemplate templateAmqp;
public Boolean alternarAssinatura(String leitorId, String artigoId) {
String autorId = artigoDao.selectById(artigoId).getUserid();
RabbitAdmin admin = new RabbitAdmin(templateAmqp.getConnectionFactory());
DirectExchange exchange = new DirectExchange("artigo.assinatura");
admin.declareExchange(exchange);
String nomeFila = "artigo.assinatura." + leitorId;
Queue fila = new Queue(nomeFila, true);
Binding ligacao = BindingBuilder.bind(fila).to(exchange).with(autorId);
String chaveLeitor = "assinaturas.leitor." + leitorId;
String chaveAutor = "assinantes.autor." + autorId;
Boolean jaAssinou = redisTemplate.boundSetOps(chaveLeitor).isMember(autorId);
if (jaAssinou) {
redisTemplate.boundSetOps(chaveLeitor).remove(autorId);
redisTemplate.boundSetOps(chaveAutor).remove(leitorId);
admin.removeBinding(ligacao);
return false;
}
redisTemplate.boundSetOps(chaveLeitor).add(autorId);
redisTemplate.boundSetOps(chaveAutor).add(leitorId);
admin.declareQueue(fila);
admin.declareBinding(ligacao);
return true;
}
Disparo de notificações ao publicar
Quando um novo artigo é salvo, o serviço publica uma mensagem no exchange, usando o autor como chave de roteamento:
public void publicar(Artigo artigo) {
artigoDao.insert(artigo);
String autorId = artigo.getUserid();
String artigoId = artigo.getId();
templateAmqp.convertAndSend("artigo.assinatura", autorId, artigoId);
}
Ajuste no serviço de notificações
Como o RabbitMQ agora gerencia os alertas pendentes, removemos a inserção na tabela auxiliar tb_notice_fresh:
public void armazenar(Notice alerta) {
alerta.setState("0");
alerta.setCreatetime(new Date());
alerta.setId(idWorker.nextId() + "");
noticeDao.insert(alerta);
// O lembrete de novas mensagens agora é controlado pelo RabbitMQ
}
De I/O bloqueante a I/O não bloqueante
I/O tradicional
No modelo clássico, cada conexão recebe uma thread dedicada. O servidor aguarda dados e, enquanto espera, a thread permanece bloqueada.
package demo.io.classico;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class ServidorBloqueante {
public static void main(String[] args) throws Exception {
ServerSocket servidor = new ServerSocket(8000);
while (true) {
Socket conexao = servidor.accept();
new Thread(() -> {
try (InputStream entrada = conexao.getInputStream()) {
byte[] buffer = new byte[1024];
int lidos;
while ((lidos = entrada.read(buffer)) != -1) {
System.out.println(Thread.currentThread().getName()
+ ": " + new String(buffer, 0, lidos));
}
} catch (Exception ignorada) { }
}).start();
}
}
}
Esse modelo não escala bem: milhares de conexões simultâneas exigem milhares de threads, consumindo memória e aumentando a pressão sobre o escalonador do sistema operacional.
I/O não bloqueante (NIO)
O NIO introduz três elementos centrais:
- Canal (Channel): via de comunicação bidirecional, distinta dos fluxos unidirecionais do I/O clássico.
- Buffer: região de memória usada para leitura e escrita em blocos.
- Seletor (Selector): permite que uma única thread monitore vários canais simultaneamente.
Com o seletor, uma thread verifica quais canais possuem dados prontos, evitando uma thread por conexão.
package demo.io.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class ServidorNio {
public static void main(String[] args) throws Exception {
Selector selectorConexoes = Selector.open();
Selector selectorDados = Selector.open();
new Thread(() -> {
try {
ServerSocketChannel canalServidor = ServerSocketChannel.open();
canalServidor.socket().bind(new InetSocketAddress(8000));
canalServidor.configureBlocking(false);
canalServidor.register(selectorConexoes, SelectionKey.OP_ACCEPT);
while (true) {
if (selectorConexoes.select(1) > 0) {
Set<SelectionKey> chaves = selectorConexoes.selectedKeys();
Iterator<SelectionKey> iterador = chaves.iterator();
while (iterador.hasNext()) {
SelectionKey chave = iterador.next();
if (chave.isAcceptable()) {
SocketChannel cliente = ((ServerSocketChannel) chave.channel()).accept();
cliente.configureBlocking(false);
cliente.register(selectorDados, SelectionKey.OP_READ);
}
iterador.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
while (true) {
if (selectorDados.select(1) > 0) {
Set<SelectionKey> chaves = selectorDados.selectedKeys();
Iterator<SelectionKey> iterador = chaves.iterator();
while (iterador.hasNext()) {
SelectionKey chave = iterador.next();
if (chave.isReadable()) {
SocketChannel cliente = (SocketChannel) chave.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
cliente.read(buffer);
buffer.flip();
System.out.println(Thread.currentThread().getName()
+ ": " + Charset.defaultCharset()
.newDecoder().decode(buffer).toString());
}
iterador.remove();
chave.interestOps(SelectionKey.OP_READ);
}
}
}
} catch (Exception ignorada) { }
}).start();
}
}
Por que adotar o Netty
O Netty encapsula a complexidade do NIO e oferece uma pilha completa para aplicações de rede. Entre suas vantagens:
- Modelo de eventos assíncronos que simplifica o código.
- Capacidade de alternar entre I/O bloqueante e não bloqueante com poucas mudanças.
- Decodificadores e codificadores embutidos.
- Correção de problemas conhecidos do NIO do JDK, como busy-wait.
- Modelo de threads otimizado para alta concorrência.
- Suporte a diversos protocolos, incluindo WebSocket, HTTP e protobuf.
Adicione a dependência:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>
Servidor básico com Netty
package demo.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class ServidorNetty {
public static void main(String[] args) {
NioEventLoopGroup grupoAceite = new NioEventLoopGroup();
NioEventLoopGroup grupoTrabalho = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(grupoAceite, grupoTrabalho)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel canal) {
ChannelPipeline linha = canal.pipeline();
linha.addLast(new StringDecoder());
linha.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String mensagem) {
System.out.println(mensagem);
}
});
}
})
.bind(8000);
}
}
Cliente básico com Netty
package demo.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
public class ClienteNetty {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup grupo = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(grupo)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel canal) {
canal.pipeline().addLast(new StringEncoder());
}
});
Channel canal = bootstrap.connect("127.0.0.1", 8000).sync().channel();
while (true) {
canal.writeAndFlush("dados de teste");
Thread.sleep(2000);
}
}
}
Integrando Netty e WebSocket no microserviço de notificações
Dependências e configuração
No microserviço de notificações, adicione:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>
Inicialização do servidor Netty
package notify.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class NotificacaoNettyServer {
public void iniciar(int porta) {
EventLoopGroup grupoAceite = new NioEventLoopGroup();
EventLoopGroup grupoTrabalho = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(grupoAceite, grupoTrabalho)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel canal) {
canal.pipeline().addLast(new HttpServerCodec());
canal.pipeline().addLast(new HttpObjectAggregator(65536));
canal.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
canal.pipeline().addLast(new NotificacaoWebSocketHandler());
}
})
.bind(porta);
}
}
Para subir o servidor junto com a aplicação Spring Boot, crie um bean de configuração:
package notify.config;
import notify.netty.NotificacaoNettyServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NotificacaoNettyConfig {
@Bean
public NotificacaoNettyServer servidorNetty() {
NotificacaoNettyServer servidor = new NotificacaoNettyServer();
new Thread(() -> servidor.iniciar(1234)).start();
return servidor;
}
}
Utilitário para acessar o contexto Spring
package notify.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ContextoAplicacaoUtil implements ApplicationContextAware {
private static ApplicationContext contexto;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
contexto = applicationContext;
}
public static ApplicationContext obterContexto() {
return contexto;
}
public static <T> T obterBean(Class<T> classe) {
return contexto.getBean(classe);
}
}
Handler WebSocket
package notify.netty;
import com.fasterxml.jackson.databind.ObjectMapper;
import notify.config.ContextoAplicacaoUtil;
import notify.vo.Resposta;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
public class NotificacaoWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final ObjectMapper CONVERSOR = new ObjectMapper();
public static final ConcurrentHashMap<String, Channel> CANAIS_ATIVOS = new ConcurrentHashMap<>();
private final RabbitTemplate template = ContextoAplicacaoUtil.obterBean(RabbitTemplate.class);
private final SimpleMessageListenerContainer containerAssinaturas =
(SimpleMessageListenerContainer) ContextoAplicacaoUtil.obterContexto()
.getBean("containerAssinaturas");
private final SimpleMessageListenerContainer containerCurtidas =
(SimpleMessageListenerContainer) ContextoAplicacaoUtil.obterContexto()
.getBean("containerCurtidas");
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame quadro) throws Exception {
String json = quadro.text();
String usuarioId = CONVERSOR.readTree(json).get("userId").asText();
Channel canal = CANAIS_ATIVOS.computeIfAbsent(usuarioId, id -> ctx.channel());
RabbitAdmin admin = new RabbitAdmin(template);
String filaAssinaturas = "artigo.assinatura." + usuarioId;
Properties propsAssinaturas = admin.getQueueProperties(filaAssinaturas);
int qtdAssinaturas = propsAssinaturas == null ? 0
: (int) propsAssinaturas.get("QUEUE_MESSAGE_COUNT");
String filaCurtidas = "artigo.curtida." + usuarioId;
Properties propsCurtidas = admin.getQueueProperties(filaCurtidas);
int qtdCurtidas = propsCurtidas == null ? 0
: (int) propsCurtidas.get("QUEUE_MESSAGE_COUNT");
Map<String, Integer> contagens = new HashMap<>();
contagens.put("assinaturas", qtdAssinaturas);
contagens.put("curtidas", qtdCurtidas);
Resposta resposta = new Resposta(200, "OK", contagens);
canal.writeAndFlush(new TextWebSocketFrame(CONVERSOR.writeValueAsString(resposta)));
if (qtdAssinaturas > 0) {
admin.purgeQueue(filaAssinaturas, true);
}
if (qtdCurtidas > 0) {
admin.purgeQueue(filaCurtidas, true);
}
containerAssinaturas.addQueueNames(filaAssinaturas);
containerCurtidas.addQueueNames(filaCurtidas);
}
}
Configuração dos listeners RabbitMQ
package notify.config;
import notify.listener.CurtidaListener;
import notify.listener.AssinaturaListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitListenerConfig {
@Bean("containerAssinaturas")
public SimpleMessageListenerContainer criarContainerAssinaturas(ConnectionFactory factory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
container.setExposeListenerChannel(true);
container.setMessageListener(new AssinaturaListener());
return container;
}
@Bean("containerCurtidas")
public SimpleMessageListenerContainer criarContainerCurtidas(ConnectionFactory factory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
container.setExposeListenerChannel(true);
container.setMessageListener(new CurtidaListener());
return container;
}
}
Listeners de entrega ativa
package notify.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import notify.netty.NotificacaoWebSocketHandler;
import notify.vo.Resposta;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import java.util.HashMap;
import java.util.Map;
public class AssinaturaListener implements ChannelAwareMessageListener {
private static final ObjectMapper CONVERSOR = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String nomeFila = message.getMessageProperties().getConsumerQueue();
String usuarioId = nomeFila.substring(nomeFila.lastIndexOf(".") + 1);
io.netty.channel.Channel canalWs = NotificacaoWebSocketHandler.CANAIS_ATIVOS.get(usuarioId);
if (canalWs != null) {
Map<String, Integer> dados = new HashMap<>();
dados.put("assinaturas", 1);
Resposta resposta = new Resposta(200, "Nova notificação", dados);
canalWs.writeAndFlush(new TextWebSocketFrame(CONVERSOR.writeValueAsString(resposta)));
}
}
}
package notify.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import notify.netty.NotificacaoWebSocketHandler;
import notify.vo.Resposta;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import java.util.HashMap;
import java.util.Map;
public class CurtidaListener implements ChannelAwareMessageListener {
private static final ObjectMapper CONVERSOR = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String nomeFila = message.getMessageProperties().getConsumerQueue();
String usuarioId = nomeFila.substring(nomeFila.lastIndexOf(".") + 1);
io.netty.channel.Channel canalWs = NotificacaoWebSocketHandler.CANAIS_ATIVOS.get(usuarioId);
if (canalWs != null) {
Map<String, Integer> dados = new HashMap<>();
dados.put("curtidas", 1);
Resposta resposta = new Resposta(200, "Nova curtida", dados);
canalWs.writeAndFlush(new TextWebSocketFrame(CONVERSOR.writeValueAsString(resposta)));
}
}
}
Notificações pontuais de curtidas
Quando um usuário curte um artigo, o sistema deve notificar o autor. Essa comunicação usa uma fila dedicada por autor, sem exchange intermediário.
public void curtir(String artigoId, String usuarioId) {
Artigo artigo = artigoDao.selectById(artigoId);
artigo.setThumbup(artigo.getThumbup() + 1);
artigoDao.updateById(artigo);
Notice alerta = new Notice();
alerta.setReceiverId(artigo.getUserid());
alerta.setOperatorId(usuarioId);
alerta.setAction("thumbup");
alerta.setTargetType("article");
alerta.setTargetId(artigoId);
alerta.setCreatetime(new Date());
alerta.setType("user");
alerta.setState("0");
noticeClient.save(alerta);
RabbitAdmin admin = new RabbitAdmin(templateAmqp.getConnectionFactory());
String filaAutor = "artigo.curtida." + artigo.getUserid();
Queue fila = new Queue(filaAutor, true);
admin.declareQueue(fila);
templateAmqp.convertAndSend(filaAutor, artigoId);
}
Página de teste WebSocket
Para validar a comunicação, crie um arquivo index.html em src/main/resources/static:
<html>
<head>
<meta charset="utf-8">
<title>Teste WebSocket - Notificações</title>
</head>
<body>
<h1>Teste de notificações em tempo real</h1>
<button onclick="conectar()">Conectar</button>
<button onclick="desconectar()">Desconectar</button>
<br><br>
<textarea id="mensagem" rows="6" cols="40">{ "userId": "1" }</textarea>
<button onclick="enviar()">Enviar</button>
<hr>
<h3>Respostas do servidor</h3>
<textarea id="resposta" rows="10" cols="60" readonly></textarea>
<script>
var socket;
function conectar() {
socket = new WebSocket("ws://127.0.0.1:1234/ws");
socket.onopen = function() {
document.getElementById("resposta").value += "\nConectado";
};
socket.onmessage = function(evento) {
var area = document.getElementById("resposta");
area.value += "\n" + evento.data;
area.scrollTop = area.scrollHeight;
};
socket.onclose = function() {
document.getElementById("resposta").value += "\nDesconectado";
};
}
function desconectar() {
if (socket) {
socket.close();
}
}
function enviar() {
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(document.getElementById("mensagem").value);
}
}
</script>
</body>
</html>
Suba os microserviços eureka, user, article e notice. Publique um artigo ou execute uma curtida; a página deverá exibir a contagem atualizada de notificações sem recarregar.