Apache Kafka: Conceitos Fundamentais e Configuração Prática

Introdução ao Kafka

O que é o Kafka?

O Apache Kafka é uma plataforma distribuída de processamento de fluxo de eventos, desenvolvida originalmente em Scala e Java. Projetado para alta vazão, baixa latência e escalabilidade horizontal, tornou-se um componente essencial em arquiteturas de dados modernas.

O projeto teve origem na LinkedIn em 2010, quando a empresa enfrentava problemas de gargalos e indisponibilidade na transmissão de mensagens. O arquiteto principal Jay Kreps batizou a plataforma em homenagem ao escritor Franz Kafka. Em 2011, o projeto foi doado à Apache Software Foundation, alcançando o status de projeto de nível superior em outubro de 2012.

Site oficial: https://kafka.apache.org/

Sistemas de Filas de Mensagens

Soluções como RabbitMQ, ActiveMQ e RocketMQ compartilham a denominação MQ (Message Queue). No ecossistema Java, a especificação JMS (Java Message Service) define interfaces padronizadas para comunicação entre aplicações através de mensagens, de forma semelhante ao JDBC para bancos de dados relacionais.

A especificação JMS estabelece diversos componentes:

  • JMS Provider: Implementação do middleware de mensagens (ex: ActiveMQ, RabbitMQ).
  • JMS Message: Estrutura de dados composta por cabeçalho, propriedades e corpo.
  • JMS Producer: Aplicação cliente que envia mensagens através das interfaces JMS.
  • JMS Consumer: Aplicação cliente que recebe mensagens do provider.

Existem dois modelos de comunicação definidos pelo JMS:

  • Ponto-a-ponto (P2P): Baseado em filas, cada mensagem é consumida por exatamente um consumidor. O envio é assíncrono e independente do estado do receptor.
  • Publicação/Assinatura (Pub/Sub): As mensagens são classificadas em topics (tópicos). Produtores publicam em tópicos específicos e consumidores assinam os tópicos de interesse. Uma única mensagem pode ser consumida por múltiplos consumidores simultaneamente. O Kafka adota este modelo.

Embora o Kafka se inspire no JMS, ele não segue integralmente essa especificação, o que explica o nome "Kafka" em vez de "KafkaMQ".

Padrão Produtor-Consumidor

Este padrão desacopla produtores e consumidores através de um intermediário (buffer). Em ambientes distribuídos, softwares como o Kafka atuam como esse intermediário, promovendo desacoplamento e balenceamento de carga — o chamado achatamento de picos (peak shaving).

Comparação entre Middlewares de Mensagens

Característica ActiveMQ RabbitMQ RocketMQ Kafka
Vazão por nó ~10 mil/s ~10 mil/s ~100 mil/s ~100 mil/s
Latência milissegundos microssegundos milissegundos milissegundos
Disponibilidade Alta (mestre-escravo) Alta (mestre-escravo) Muito alta (distribuído) Muito alta (distribuído)
Confiabilidade Baixa probabilidade de perda Raramente perde dados Zero perdas (configurado) Zero perdas (configurado)
Funcionalidades Completas para MQ Alta concorrência, baixa latência Bem completo, escalável Simples, focado em Big Data

Em cenários de Big Data, o Kafka é a escolha predominante. Em desenvolvimento JavaEE, as outras alternativas são mais comuns.

Papel do ZooKeeper

O ZooKeeper é um serviço de coordenação distribuída que armazena metadados e configurações globais do sistema. Ele utiliza uma estrutura de dados em árvore (Tree) com nós temporários e permanentes, além de um mecanismo de Watch para notificar mudanças.

No Kafka, o ZooKeeper é responsável por coordenar os nós da cluster. Entretanto, a partir da versão 2.8.X, o Kafka começou a experimentar o algoritmo Raft como substituto. A versão 4.X planeja remover completamente a dependência do ZooKeeper.


Instalação e Primeiros Passos no Windows

Pré-requisitos

É necessário ter o Java 8 instalado. O Kafka 3.X recomenda Java 11, mas o Java 8 ainda é compatível. Como o Kafka inclui o runtime Scala embutido, nenhuma instalação adicional é necessária.

Instalação do Kafka

Baixe o pacote kafka_2.12-3.6.1.tgz em https://kafka.apache.org/downloads e extraia para um diretório de sua preferência. A estrutura de pastas inclui:

  • bin: Scripts executáveis para Linux
  • bin/windows: Scripts executáveis para Windows
  • config: Arquivos de configuração
  • libs: Bibliotecas de dependência

Iniciando o ZooKeeper

Edite o arquivo config/zookeeper.properties definindo o diretório de dados:

# Configuração do diretório de armazenamento do ZooKeeper
dataDir=C:/kafka-3.6.1/zk-data

Execute o comando para iniciar o ZooKeeper:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Para facilitar operações recorrentes, crie um script iniciar-zk.bat:

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Iniciando o Kafka

Edite o arquivo config/server.properties e configure o diretório de dados:

# Caminho para armazenamento dos dados do Kafka
log.dirs=C:/kafka-3.6.1/kafka-data

Inicie o broker Kafka:

bin/windows/kafka-server-start.bat config/server.properties

Crie o script iniciar-kafka.bat para uso futuro:

call bin/windows/kafka-server-start.bat config/server.properties

Para verificar os processos ativos, execute jps no terminal. O processo QuorumPeerMain corresponde ao ZooKeeper e o processo Kafka ao broker.

Ordem de inicialização: ZooKeeper → Kafka
Ordem de encerramento: Kafka → ZooKeeper


Gerenciamento de Tópicos

Tópicos são classificações lógicas de mensagens que permitem aos consumidores assinar apenas os dados de interesse. A operação pode ser feita via linha de comando, ferramentas, API Java ou criação automática.

Criando um Tópico

kafka-topics.bat --bootstrap-server localhost:9092 --create --topic eventos

Listando Tópicos

kafka-topics.bat --bootstrap-server localhost:9092 --list

Detalhes de um Tópico

kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic eventos

Alterando Partições

kafka-topics.bat --bootstrap-server localhost:9092 --topic eventos --alter --partitions 3

Removendo um Tópico

kafka-topics.bat --bootstrap-server localhost:9092 --topic eventos --delete

Nota: No Windows, a remoção pode causar instabilidade no broker. Para ambientes Linux ou durante desenvolvimento, pode ser necessário limpar manualmente o diretório de dados e reiniciar.


Produzindo e Consumindo Dados

Via Linha de Comando

Inicie um produtor no terminal:

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic eventos

Em outro terminal, inicie um consumidor:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic eventos

Via API Java

Adicione a dependência Maven ao seu projeto:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.1</version>
    </dependency>
</dependencies>

Exemplo de Produtor:

package br.com.kafka.demo;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class GeradorMensagens {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());

        KafkaProducer<String, String> gerador = new KafkaProducer<>(props);

        for (int idx = 0; idx < 15; idx++) {
            ProducerRecord<String, String> mensagem = new ProducerRecord<>(
                "eventos", "chave-" + idx, "conteudo-" + idx
            );
            gerador.send(mensagem);
        }

        gerador.close();
    }
}

Exemplo de Consumidor:

package br.com.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class LeitorMensagens {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("group.id", "grupo-processamento");

        KafkaConsumer<String, String> leitor = new KafkaConsumer<>(props);
        leitor.subscribe(Collections.singletonList("eventos"));

        while (true) {
            ConsumerRecords<String, String> registros = leitor.poll(Duration.ofMillis(200));
            for (ConsumerRecord<String, String> registro : registros) {
                System.out.println(registro);
            }
        }
    }
}


Conceitos Fundamentais do Kafka

Arquitetura de Cluster

Broker

Cada processo Kafka em execução é denominado Broker. Em ambiente de cluster, cada broker possui um identificador único configurado em server.properties:

# ID único do broker no cluster
broker.id=1

Controller

O Kafka adota uma arquitetura mestre-escravo. O nó Controller gerencia o cluster inteiro, auxiliado pelo ZooKeeper. Suas responsabilidades incluem:

  • Gerenciamento de brokers (adição/remoção)
  • Criação, alteração e exclusão de tópicos
  • Gerenciamento de partições e réplicas
  • Inicialização das máquinas de estado de partições e réplicas

Se o Controller falhar, o ZooKeeper promove uma nova eleição entre os brokers disponíveis para garantir alta disponibilidade.

Inicialização do ZooKeeper e Eleição do Controller

O processo de eleição utiliza três mecanismos do ZooKeeper:

  1. Criação de nós: Cada broker tenta criar um nó temporário /controller
  2. Exclusividade: Apenas um broker consegue criar o nó com sucesso
  3. Monitoramento: Os demais brokers monitoram o nó /controller para detectar falhas

Quando o Controller falha, o nó temporário é removido automaticamente, e os outros brokers competem para criar um novo nó, elegendo assim um novo Controller.

Componentes Internos do Broker

Durante a inicialização, cada broker cria diversos componentes:

  • KafkaScheduler: Agendador de tarefas internas
  • LogManager: Gerencia operações de criação, consulta e limpeza de dados
  • RemoteLogManager: Sincronização de dados entre brokers
  • ReplicaManager: Gerencia réplicas dos tópicos
  • ZkMetadataCache: Cache local de metadados do ZooKeeper
  • SocketServer: Camada de comunicação de rede (NIO)

O broker também registra seu ID no ZooKeeper como nó temporário. Se a conexão cair, o nó é removido automaticamente.


Tópicos, Partições e Réplicas

Tópico (Topic)

Um tópico é a classificação lógica das mensagens no Kafka. Produtores enviam dados a tópicos específicos e consumidores assinam tópicos de interesse. Recomenda-se evitar nomes com underscores e pontos simultaneamente para prevenir conflitos com métricas.

Partição (Partition)

Para suportar alto volume de dados, um tópico pode ser dividido em múltiplas partições. Cada partição é uma fila ordenada onde cada mensagem recebe um offset sequencial crescente. As partições são distribuídas entre os brokers para balanceamento de carga.

Por padrão, um tópico é criado com 1 partição, configurável via parâmetro --partitions.

Réplica (Replication)

Para tolerância a falhas, cada partição pode ter múltiplas réplicas distribuídas em diferentes brokers. Uma réplica é designada como Leader (responde a leituras e escritas) e as demais como Followers (apenas fazem backup). Réplicas de uma mesma partição nunca devem estar no mesmo broker.

Log

As mensagens são armazenadas em arquivos com extensão .log. Quando um tópico é criado, os diretórios e arquivos de log correspondentes são gerados automaticamente nos brokers designados.


Produção de Dados

Passos Básicos

1. Configuração do Produtor

Parâmetro Descrição Tipo Padrão
bootstrap.servers Endereço do cluster (broker1:porta,broker2:porta) Obrigatório -
key.serializer Classe de serialização da chave Obrigatório -
value.serializer Classe de serialização do valor Obrigatório -
acks Tipo de confirmação: 0, 1, all(-1) Opcional all
retries Número de tentativas de reenvio Opcional Integer.MAX_VALUE
batch.size Tamanho do lote em bytes Opcional 16384
buffer.memory Memória do buffer do coletor de dados Opcional 33554432
linger.ms Tempo de espera no buffer antes do envio Opcional 0
max.in.flight.requests.per.connection Requisições simultâneas por conexão Opcional 5
enable.idempotence Habilitar idempotência Opcional true

2. Preparação da Mensagem

Os dados são encapsulados em um ProducerRecord contendo tópico (obrigatório), valor (obrigatório), chave (opcional, usada para roteamento de partição) e número da partição (opcional).

3. Componentes Internos do Envio

O processo de envio envolve três componentes principais:

  • KafkaProducer: Aplica interceptores, seiralização e calcula a partição de destino antes de enviar ao buffer.
  • RecordAccumulator: Buffer que acumula mensagens em lotes (batch) por partição. O tamanho padrão do lote é 16KB. Lotes fechados aguardam envio.
  • Sender: Thread que lê lotes fechados do buffer, reorganiza por broker e envia requisições de produção pela rede.

Interceptores

Interceptores permitem processar mensagens antes do envio e após o recebimento de confirmações. Implementam a interface ProducerInterceptor:

package br.com.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;

public class TransformadorValor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> registro) {
        String valorTransformado = registro.value().toUpperCase();
        return new ProducerRecord<>(
            registro.topic(), registro.partition(),
            registro.timestamp(), registro.key(),
            valorTransformado, registro.headers()
        );
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception excecao) { }

    @Override
    public void close() { }

    @Override
    public void configure(Map<String, ?> configs) { }
}

Para registrar o interceptor, adicione à configuração:

props.put("interceptor.classes", TransformadorValor.class.getName());

Múltiplos interceptores são executados sequencialmente. Exceções em um interceptor não afetam os subsequentes.


Métodos de Envio

Envio com Callback (Assíncrono)

gerador.send(mensagem, (metadata, excecao) -> {
    if (excecao != null) {
        System.err.println("Falha no envio: " + excecao.getMessage());
    } else {
        System.out.println("Enviado com sucesso: " + metadata);
    }
});

Envio Síncrono (via Future)

Future<RecordMetadata> resultado = gerador.send(mensagem);
RecordMetadata metadata = resultado.get(); // bloqueia até confirmação


Roteamento de Partições

Partição Específica

ProducerRecord<String, String> msg = new ProducerRecord<>(
    "eventos", 0, "minha-chave", "meu-valor"
);

Partição por Chave

Quando uma chave é fornecida, o Kafka calcula a partição usando o algoritmo murmur2 sobre os bytes serializados da chave, aplicando módulo com o número de partições.

Sem Chave (Sticky Partitioning)

Quando nenhuma chave é fornecida, o Kafka utiliza a estratégia de partição adesiva: escolhe uma partição aleatoriamente e direciona mensagens para ela até atingir o tamanho do lote, depois muda para outra partição.

Partitioner Personalizado

package br.com.kafka.particionador;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class ParticionadorCircular implements Partitioner {

    private final AtomicLong contador = new AtomicLong(0);

    @Override
    public int partition(String topico, Object chave, byte[] bytesChave,
                         Object valor, byte[] bytesValor, Cluster cluster) {
        int totalParticoes = cluster.partitionCountForTopic(topico);
        return (int) (Math.abs(contador.getAndIncrement()) % totalParticoes);
    }

    @Override
    public void close() { }

    @Override
    public void configure(Map<String, ?> configs) { }
}

Registre a classe na configuração:

props.put("partitioner.class", ParticionadorCircular.class.getName());


Confiabilidade das Mensagens (ACK)

ACK Comportamento Confiabilidade Desempenho
0 Resposta imediata após envio à rede Baixa (pode perder dados) Muito alto
1 Resposta após Leader persistir localmente Moderada (falha do Leader perde dados) Alto
all (-1) Resposta após todos os ISR persistirem Alta Moderado

O parâmetro min.insync.replicas define o número mínimo de réplicas que devem estar sincronizadas para que escritas com acks=all sejam aceitas.


Idempotência

Para resolver problemas de duplicação e desordenação causados por retentativas, o Kafka implementa idempotência através de:

  • Producer ID (PID): Identificador único do produtor
  • Número de Sequência: Sequencial crescente por partição para cada produtor

O broker mantém um buffer dos últimos 5 lotes por partição/produtor. Antes de aceitar um novo lote, verifica:

  1. Se o número de sequência já existe → descarta (duplicata)
  2. Se o número de sequência não é contíguo → erro de ordem
  3. Caso contrário → aceita e atualiza o buffer

Limitações: Idempotência funciona apenas para uma partição dentro de uma sessão do produtor.

Transações

Para superar as limitações da idempotência, o Kafka oferece suporte a transações distribuídas. O transactional.id é vinculado ao Producer ID, garantindo consistência entre reinicializações.

props.put("transactional.id", "minha-transacao-001");
props.put("enable.idempotence", true);

KafkaProducer<String, String> gerador = new KafkaProducer<>(props);
gerador.initTransactions();

try {
    gerador.beginTransaction();
    // ... enviar mensagens ...
    gerador.commitTransaction();
} catch (Exception e) {
    gerador.abortTransaction();
}

As transações utilizam commit em duas fases para garantir atomicidade. Consumidores com isolation.level=read_committed só enxergam dados de transações confirmadas.

Semânticas de Entrega

Semântica Descrição Exemplo
No máximo uma vez Dados podem ser perdidos, nunca duplicados ACK=0
No mínimo uma vez Dados nunca perdidos, podem ser duplicados ACK=1
Exatamente uma vez Dados nem perdidos nem duplicados Idempotência + Transações + ACK=all

Armazenamento de Mensagens

Fluxo de Armazenamento no Broker

Quando o broker recebe uma requisição de produção, os seguintes passos são executados:

  1. Validação de ACK: Verifica se o tipo de confirmação solicitado é válido
  2. Validação de tópico interno: Impede escrita nos tópicos internos __transaction_state e __consumer_offsets
  3. Validação de ISR: Para ACK=all, verifica se o número mínimo de réplicas sincronizadas (min.insync.replicas) é atendido
  4. Rotação do log: Cria novos segmentos de log quando necessário (tamanho > 1GB ou tempo > 7 dias)
  5. Verificação de duplicatas: Valida números de sequência para garantir idempotência
  6. Persistência: Escreve os dados via FileChannel do LogSegment

Formato dos Arquivos de Armazenamento

Arquivo de Log (.log)

Cada arquivo de log contém os dados reais das mensagens. O nome do arquivo (20 dígitos) representa o offset base do primeiro registro contido nele.

O conteúdo de um arquivo de log pode ser inspecionado com:

kafka-run-class.bat kafka.tools.DumpLogSegments ^
  --files caminho/para/00000000000000000000.log ^
  --print-data-log

Cada registro no arquivo é composto por um cabeçalho do lote (61 bytes) contendo offset base, timestamps, producer ID, CRC e contagem de registros, seguido pelos corpos das mensagens individuais com chave, valor e metadados.

Arquivo de Índice (.index)

Para acesso eficiente, o Kafka mantém um índice esparso que mapeia offsets lógicos para posições físicas no arquivo de log. A cada 4KB de dados gravados (configurável via log.index.interval.bytes), uma entrada de índice é adicionada. A busca utiliza busca binária para localizar registros rapidamente.

Arquivo de Índice Temporal (.timeindex)

Este arquivo mapeia timestamps para offsets, permitindo consultas baseadas em tempo. A partir do timestamp, encontra-se o offset, que por sua vez permite a localização física via índice.

Replicação de Dados entre Réplicas

Réplicas Follower executam threads ReplicaFetcherThread que periodicamente:

  1. Truncam: Ajustam o log local para alinhar com o Leader (via Leader Epoch ou High Watermark)
  2. Buscam: Solicitam dados novos ao Leader
  3. Persistem: Escrevem os dados recebidos em seus próprios logs
  4. Atualizam offsets: Sincronizam LEO e HW

Offsets e Marcadores de Água

  • Offset: Posição sequencial de cada mensagem (começa em 0)
  • LSO (Log Start Offset): Offset inicial do log de uma réplica
  • LEO (Log End Offset): Próximo offset a ser gravado
  • HW (High Watermark): Offset até o qual os dados estão replicados em todos os ISR — consumidores só enxergam dados até este ponto

Propagação do High Watermark

O HW é calculado como o menor LEO entre todas as réplicas do ISR. Quando um Follower replica dados com sucesso, o Leader recalcula o HW incluindo o novo LEO do Follower. Este mecanismo garante que consumidores nunca leiam dados não replicados, mesmo em caso de falha do Leader.

ISR (In-Sync Replicas)

O ISR é o subconjunto de réplicas que estão sincronizadas com o Leader. O ReplicaManager periodicamente:

  • Contrai o ISR: Remove réplicas que excedem o tempo máximo de atraso (replica.lag.time.max.ms, padrão 30s)
  • Expande o ISR: Readiciona réplicas que recuperaram sincronização
  • Propaga mudanças: Notifica outros brokers sobre alterações no ISR a cada 2500ms

Consumo de Mensagens

Configuração do Consumidor

Parâmetro Descrição Tipo Padrão
bootstrap.servers Endereço do cluster Obrigatório -
key.deserializer Classe de deserialização da chave Obrigatório -
value.deserializer Classe de deserialização do valor Obrigatório -
group.id Identificador do grupo de consumidores Obrigatório -
auto.offset.reset Comportamento sem offset inicial Opcional latest
enable.auto.commit Auto-commit de offsets Opcional true
auto.commit.interval.ms Intervalo de auto-commit Opcional 5000
fetch.max.bytes Máximo de bytes por requisição fetch Opcional 52428800

Grupo de Consumidores (Consumer Group)

O Kafka adota o modelo pull (consumidor solicita dados), em vez de push, permitindo que cada consumidor processe dados em seu próprio ritmo.

Para escalabilidade horizontal, consumidores são organizados em grupos. As regras fundamentais são:

  • Cada partição é consumida por apenas um consumidor dentro do grupo
  • Um consumidor pode ler de múltiplas partições
  • O número ideal de consumidores ≤ número de partições

Coordenador do Grupo (Group Coordinator)

Cada broker possui um componente Group Coordinator que gerencia:

  • Adesão e saída de membros do grupo
  • Eleição do líder do grupo (primeiro membro a entrar)
  • Distribuição de partições entre consumidores
  • Gerenciamento de offsets

Estratégias de Distribuição de Partições

O líder do grupo determina como as partições são distribuídas. Quatro estratégias estão disponíveis:

RoundRobinAssignor (Distribuição Circular)

Ordena consumidores por memberId e distribui partições circularmente. Pode ser desbalanceado quando consumidores assinam diferentes tópicos.

RangeAssignor (Distribuição por Faixa)

Para cada tópico, divide as partições igualmente entre consumidores. Consumidores com memberId menor podem receber partições extras. Pode causar desbalanceamento com múltiplos tópicos.

StickyAssignor (Distribuição Adesiva)

Durante rebalanceamentos, tenta manter as atribuições originais, redistribuindo apenas o necessário. Resulta em distribuição mais uniforme e eficiente.

CooperativeStickyAssignor (Distribuição Adesiva Cooperativa)

Evolução do StickyAssignor (Kafka 2.4+) que utiliza o protocolo COOPERATIVE: em vez de um rebalanceamento global, realiza pequenos rebalanceamentos incrementais até convergir, minimizando interrupções.

Configuração:

props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");


Gerenciamento de Offsets no Consumidor

Offset Inicial

O parâmetro auto.offset.reset define o comportamento quando não há offset salvo:

  • earliest: Consuma desde o início do tópico
  • latest: Consuma apenas mensagens novas (padrão)
  • none: Gera exceção se não houver offset

Seek Manual

É possível posicionar o consumidor em um offset específico:

leitor.subscribe(Collections.singletonList("eventos"));
leitor.poll(Duration.ofMillis(100)); // necessário para obter atribuições

for (TopicPartition part : leitor.assignment()) {
    if ("eventos".equals(part.topic())) {
        leitor.seek(part, 10L); // começa do offset 10
    }
}

Auto-commit

Por padrão, o consumidor envia offsets automaticamente a cada 5 segundos. Isto pode causar:

  • Reprocessamento: Se o consumidor falhar antes do próximo commit, mensagens serão reprocessadas
  • Perda: Se o commit ocorrer antes do processamento, mensagens podem ser perdidas

Manual Commit

Para controle preciso, desabilite o auto-commit e faça commit manualmente:

props.put("enable.auto.commit", false);

// No loop de consumo:
while (true) {
    ConsumerRecords<String, String> registros = leitor.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> reg : registros) {
        processar(reg);
    }
    leitor.commitSync();    // síncrono (mais seguro)
    // leitor.commitAsync(); // assíncrono (mais rápido)
}

  • commitSync: Bloqueia até confirmação; retenta em caso de falha
  • commitAsync: Não bloqueia; sem retentativa automática em caso de falha

Níveis de Isolamento de Transações

// Somente dados de transações confirmadas (padrão)
props.put("isolation.level", "read_committed");

// Inclui dados de transações não confirmadas
props.put("isolation.level", "read_uncommitted");

Armazenamento de Offsets

Os offsets dos consumidores são gravados no tópico interno __consumer_offsets (50 partições por padrão). A partição de destino é calculada por hash(groupId) % numParticoes. A chave do registro é groupId + tópico + partição e o valor é o offset.

Fluxo de Leitura de Dados

Quando um consumidor solicita dados:

  1. O broker recebe a requisição FetchRequest
  2. O ReplicaManager identifica a partição-alvo
  3. A partir do Kafka 2.4, o broker pode servir dados de Follower preferido (para otimização de tráfego em clusters multi-datacenter)
  4. O LogSegment lê os dados do disco
  5. A transferência utiliza zero-copy (FileChannel do NIO) para máxima eficiência

Tags: kafka Apache Kafka Message Queue Distributed Systems Event Streaming

Publicado em 6-12 21:25 por Thomas