Ao trabalhar em um novo projeto, surgiu a necessidade de transmitir dados do Kafka para o frontend em tempo real. Inicialmente, considerou-se o uso de polling (requisições periódicas) para obter os dados, porém essa abordagem aprseenta problemas como descoincidência entre a frequência de polling e o ritmo de consumo dos dados, podendo resultar em perda de informações. A solução definitiva foi implementar a transmissão em tempo real utilizando WebSocket.
O protocolo WebSocket é uma evolução da comunicação HTTP, baseado no protocolo TCP. Diferentemente do modelo tradicional request-resposne, o WebSocket estabelece uma conexão persistente que permite comunicação full-duplex, ou seja, o servidor pode enviar informações para o cliente sem necessidade de uma solicitação prévia.
3.1 Dependências do Maven
São necessárias três dependências no arquivo pom.xml: a API do WebSocket e duas dependências do Kafka.
<dependencies>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
3.2 Implementação do Servidor WebSocket
//Definindo o URI do endpoint
@ServerEndpoint("/dataStream")
public class DataSocketEndpoint {
private Session clientSession;
//Conjunto estático para armazenar todas as conexões ativas
private static CopyOnWriteArraySet<DataSocketEndpoint> activeConnections =
new CopyOnWriteArraySet<DataSocketEndpoint>();
/**
* Estabelece a conexão com o cliente.
*/
@OnOpen
public void onOpen(Session session){
this.clientSession = session;
activeConnections.add(this);
System.out.println("Nova conexão estabelecida, sessionId: " + session.getId());
}
/**
* Encerra a conexão com o cliente.
*/
@OnClose
public void onClose(){
activeConnections.remove(this);
System.out.println("Conexão encerrada, sessionId: " + clientSession.getId());
}
/**
* Recebe mensagens enviadas pelo cliente.
*/
@OnMessage
public void onMessage(String message, Session session){
System.out.println("Mensagem recebida: " + message + " de " + session.getId());
}
/**
* Envia mensagem para o cliente conectado.
*/
public void sendData(String message) throws IOException {
this.clientSession.getBasicRemote().sendText(message);
}
}
3.3 Implementação do Consumidor Kafka
public class MessageConsumer extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private String topicName = "dataTopic";
public MessageConsumer() {
//Construtor padrão vazio
}
@Override
public void run() {
//Configuração das propriedades do consumidor
Properties config = new Properties();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "consumerGroup");
config.put("enable.auto.commit", "true");
config.put("auto.commit.interval.ms", "1000");
config.put("session.timeout.ms", "15000");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//Instanciando o consumidor
kafkaConsumer = new KafkaConsumer<String, String>(config);
kafkaConsumer.subscribe(Arrays.asList(this.topicName));
//Loop infinito para consumo contínuo
while (true) {
try {
//Poll para obter novos registros
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
//Processando cada registro
for (ConsumerRecord<String, String> record : records) {
String payload = record.value();
//Transmitindo para todos os clientes conectados
for (DataSocketEndpoint endpoint : activeConnections) {
endpoint.sendData(payload);
}
}
} catch (IOException e) {
System.out.println("Erro ao enviar mensagem: " + e.getMessage());
continue;
}
}
}
public void shutdown() {
try {
kafkaConsumer.close();
} catch (Exception e) {
System.out.println("Erro ao fechar consumidor: " + e.getMessage());
}
}
//Método principal para testes
public static void main(String[] args) {
MessageConsumer consumer = new MessageConsumer();
consumer.start();
}
}
Nota: O WebSocket possui requisitos específicos quanto à versão do servidor Tomcat. Este exemplo foi testado com a versão 7.0.78.
<html lang="pt-BR">
<head>
<meta charset="UTF-8">
<title>Cliente WebSocket</title>
<script type="text/javascript">
var connection;
if (typeof (WebSocket) == "undefined"){
alert("Este navegador não suporta WebSocket");
}
function connect() {
//Conectando ao servidor WebSocket
connection = new WebSocket("ws://127.0.0.1:8080/dataStream");
//Evento de conexão aberta
connection.onopen = function () {
alert("WebSocket conectado com sucesso");
};
//Evento de recebimento de mensagem
connection.onmessage = function (event) {
console.log("Dados recebidos: " + event.data);
};
//Evento de conexão fechada
connection.onclose = function () {
alert("WebSocket desconectado");
};
//Evento de erro
connection.onerror = function (error) {
console.error("Erro na conexão: " + error);
};
}
function disconnect() {
connection.close();
}
function sendMessage() {
connection.send("Mensagem do cliente");
}
</script>
</head>
<body>
<button onclick="connect()">Conectar</button>
<button onclick="disconnect()">Desconectar</button>
<button onclick="sendMessage()">Enviar Mensagem</button>
</body>
</html>