Transmissão de Dados em Tempo Real do Kafka para o Frontend usando WebSocket

Ao trabalhar em um novo projeto, surgiu a necessidade de transmitir dados do Kafka para o frontend em tempo real. Inicialmente, considerou-se o uso de polling (requisições periódicas) para obter os dados, porém essa abordagem aprseenta problemas como descoincidência entre a frequência de polling e o ritmo de consumo dos dados, podendo resultar em perda de informações. A solução definitiva foi implementar a transmissão em tempo real utilizando WebSocket.

O protocolo WebSocket é uma evolução da comunicação HTTP, baseado no protocolo TCP. Diferentemente do modelo tradicional request-resposne, o WebSocket estabelece uma conexão persistente que permite comunicação full-duplex, ou seja, o servidor pode enviar informações para o cliente sem necessidade de uma solicitação prévia.

3.1 Dependências do Maven

São necessárias três dependências no arquivo pom.xml: a API do WebSocket e duas dependências do Kafka.

<dependencies>  
 <dependency>  
 <groupId>javax</groupId>  
 <artifactId>javaee-api</artifactId>  
 <version>7.0</version>  
 </dependency>  
 <dependency>  
 <groupId>org.apache.kafka</groupId>  
 <artifactId>kafka_2.9.2</artifactId>  
 <version>0.8.1.1</version>  
 </dependency>  
 <dependency>  
 <groupId>org.apache.kafka</groupId>  
 <artifactId>kafka-clients</artifactId>  
 <version>RELEASE</version>  
 </dependency>   
</dependencies>

3.2 Implementação do Servidor WebSocket

//Definindo o URI do endpoint
@ServerEndpoint("/dataStream")
public class DataSocketEndpoint {
    private Session clientSession;
    //Conjunto estático para armazenar todas as conexões ativas
    private static CopyOnWriteArraySet<DataSocketEndpoint> activeConnections = 
        new CopyOnWriteArraySet<DataSocketEndpoint>();
    
    /**
     * Estabelece a conexão com o cliente.
     */
    @OnOpen
    public void onOpen(Session session){
        this.clientSession = session;
        activeConnections.add(this);
        System.out.println("Nova conexão estabelecida, sessionId: " + session.getId());
    }
    
    /**
     * Encerra a conexão com o cliente.
     */
    @OnClose
    public void onClose(){
        activeConnections.remove(this);
        System.out.println("Conexão encerrada, sessionId: " + clientSession.getId());
    }
    
    /**
     * Recebe mensagens enviadas pelo cliente.
     */
    @OnMessage
    public void onMessage(String message, Session session){
        System.out.println("Mensagem recebida: " + message + " de " + session.getId());
    }
    
    /**
     * Envia mensagem para o cliente conectado.
     */
    public void sendData(String message) throws IOException {
        this.clientSession.getBasicRemote().sendText(message);
    }
}

3.3 Implementação do Consumidor Kafka

public class MessageConsumer extends Thread {
    
    private KafkaConsumer<String, String> kafkaConsumer;
    private String topicName = "dataTopic";
    
    public MessageConsumer() {
        //Construtor padrão vazio
    }
    
    @Override
    public void run() {
        //Configuração das propriedades do consumidor
        Properties config = new Properties();
        config.put("bootstrap.servers", "localhost:9092");
        config.put("group.id", "consumerGroup");
        config.put("enable.auto.commit", "true");
        config.put("auto.commit.interval.ms", "1000");
        config.put("session.timeout.ms", "15000");
        config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        //Instanciando o consumidor
        kafkaConsumer = new KafkaConsumer<String, String>(config);
        kafkaConsumer.subscribe(Arrays.asList(this.topicName));
        
        //Loop infinito para consumo contínuo
        while (true) {
            try {
                //Poll para obter novos registros
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                
                //Processando cada registro
                for (ConsumerRecord<String, String> record : records) {
                    String payload = record.value();
                    //Transmitindo para todos os clientes conectados
                    for (DataSocketEndpoint endpoint : activeConnections) {
                        endpoint.sendData(payload);
                    }
                }
            } catch (IOException e) {
                System.out.println("Erro ao enviar mensagem: " + e.getMessage());
                continue;
            }
        }
    }
    
    public void shutdown() {
        try {
            kafkaConsumer.close();
        } catch (Exception e) {
            System.out.println("Erro ao fechar consumidor: " + e.getMessage());
        }
    }
    
    //Método principal para testes
    public static void main(String[] args) {
        MessageConsumer consumer = new MessageConsumer();
        consumer.start();
    }
}

Nota: O WebSocket possui requisitos específicos quanto à versão do servidor Tomcat. Este exemplo foi testado com a versão 7.0.78.

<html lang="pt-BR">
<head>
    <meta charset="UTF-8">
    <title>Cliente WebSocket</title>
    <script type="text/javascript">
        var connection;
        
        if (typeof (WebSocket) == "undefined"){
            alert("Este navegador não suporta WebSocket");
        }
        
        function connect() {
            //Conectando ao servidor WebSocket
            connection = new WebSocket("ws://127.0.0.1:8080/dataStream");
            
            //Evento de conexão aberta
            connection.onopen = function () {
                alert("WebSocket conectado com sucesso");
            };
            
            //Evento de recebimento de mensagem
            connection.onmessage = function (event) {
                console.log("Dados recebidos: " + event.data);
            };
            
            //Evento de conexão fechada
            connection.onclose = function () {
                alert("WebSocket desconectado");
            };
            
            //Evento de erro
            connection.onerror = function (error) {
                console.error("Erro na conexão: " + error);
            };
        }
        
        function disconnect() {
            connection.close();
        }
        
        function sendMessage() {
            connection.send("Mensagem do cliente");
        }
    </script>
</head>
<body>
    <button onclick="connect()">Conectar</button>
    <button onclick="disconnect()">Desconectar</button>
    <button onclick="sendMessage()">Enviar Mensagem</button>
</body>
</html>

Tags: WebSocket kafka java-ee real-time apache-kafka

Publicado em 6-4 06:36 por Thomas