Introdução ao Kafka
O que é o Kafka?
O Apache Kafka é uma plataforma distribuída de processamento de fluxo de eventos, desenvolvida originalmente em Scala e Java. Projetado para alta vazão, baixa latência e escalabilidade horizontal, tornou-se um componente essencial em arquiteturas de dados modernas.
O projeto teve origem na LinkedIn em 2010, quando a empresa enfrentava problemas de gargalos e indisponibilidade na transmissão de mensagens. O arquiteto principal Jay Kreps batizou a plataforma em homenagem ao escritor Franz Kafka. Em 2011, o projeto foi doado à Apache Software Foundation, alcançando o status de projeto de nível superior em outubro de 2012.
Site oficial: https://kafka.apache.org/
Sistemas de Filas de Mensagens
Soluções como RabbitMQ, ActiveMQ e RocketMQ compartilham a denominação MQ (Message Queue). No ecossistema Java, a especificação JMS (Java Message Service) define interfaces padronizadas para comunicação entre aplicações através de mensagens, de forma semelhante ao JDBC para bancos de dados relacionais.
A especificação JMS estabelece diversos componentes:
- JMS Provider: Implementação do middleware de mensagens (ex: ActiveMQ, RabbitMQ).
- JMS Message: Estrutura de dados composta por cabeçalho, propriedades e corpo.
- JMS Producer: Aplicação cliente que envia mensagens através das interfaces JMS.
- JMS Consumer: Aplicação cliente que recebe mensagens do provider.
Existem dois modelos de comunicação definidos pelo JMS:
- Ponto-a-ponto (P2P): Baseado em filas, cada mensagem é consumida por exatamente um consumidor. O envio é assíncrono e independente do estado do receptor.
- Publicação/Assinatura (Pub/Sub): As mensagens são classificadas em topics (tópicos). Produtores publicam em tópicos específicos e consumidores assinam os tópicos de interesse. Uma única mensagem pode ser consumida por múltiplos consumidores simultaneamente. O Kafka adota este modelo.
Embora o Kafka se inspire no JMS, ele não segue integralmente essa especificação, o que explica o nome "Kafka" em vez de "KafkaMQ".
Padrão Produtor-Consumidor
Este padrão desacopla produtores e consumidores através de um intermediário (buffer). Em ambientes distribuídos, softwares como o Kafka atuam como esse intermediário, promovendo desacoplamento e balenceamento de carga — o chamado achatamento de picos (peak shaving).
Comparação entre Middlewares de Mensagens
| Característica | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| Vazão por nó | ~10 mil/s | ~10 mil/s | ~100 mil/s | ~100 mil/s |
| Latência | milissegundos | microssegundos | milissegundos | milissegundos |
| Disponibilidade | Alta (mestre-escravo) | Alta (mestre-escravo) | Muito alta (distribuído) | Muito alta (distribuído) |
| Confiabilidade | Baixa probabilidade de perda | Raramente perde dados | Zero perdas (configurado) | Zero perdas (configurado) |
| Funcionalidades | Completas para MQ | Alta concorrência, baixa latência | Bem completo, escalável | Simples, focado em Big Data |
Em cenários de Big Data, o Kafka é a escolha predominante. Em desenvolvimento JavaEE, as outras alternativas são mais comuns.
Papel do ZooKeeper
O ZooKeeper é um serviço de coordenação distribuída que armazena metadados e configurações globais do sistema. Ele utiliza uma estrutura de dados em árvore (Tree) com nós temporários e permanentes, além de um mecanismo de Watch para notificar mudanças.
No Kafka, o ZooKeeper é responsável por coordenar os nós da cluster. Entretanto, a partir da versão 2.8.X, o Kafka começou a experimentar o algoritmo Raft como substituto. A versão 4.X planeja remover completamente a dependência do ZooKeeper.
Instalação e Primeiros Passos no Windows
Pré-requisitos
É necessário ter o Java 8 instalado. O Kafka 3.X recomenda Java 11, mas o Java 8 ainda é compatível. Como o Kafka inclui o runtime Scala embutido, nenhuma instalação adicional é necessária.
Instalação do Kafka
Baixe o pacote kafka_2.12-3.6.1.tgz em https://kafka.apache.org/downloads e extraia para um diretório de sua preferência. A estrutura de pastas inclui:
- bin: Scripts executáveis para Linux
- bin/windows: Scripts executáveis para Windows
- config: Arquivos de configuração
- libs: Bibliotecas de dependência
Iniciando o ZooKeeper
Edite o arquivo config/zookeeper.properties definindo o diretório de dados:
# Configuração do diretório de armazenamento do ZooKeeper
dataDir=C:/kafka-3.6.1/zk-data
Execute o comando para iniciar o ZooKeeper:
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
Para facilitar operações recorrentes, crie um script iniciar-zk.bat:
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
Iniciando o Kafka
Edite o arquivo config/server.properties e configure o diretório de dados:
# Caminho para armazenamento dos dados do Kafka
log.dirs=C:/kafka-3.6.1/kafka-data
Inicie o broker Kafka:
bin/windows/kafka-server-start.bat config/server.properties
Crie o script iniciar-kafka.bat para uso futuro:
call bin/windows/kafka-server-start.bat config/server.properties
Para verificar os processos ativos, execute jps no terminal. O processo QuorumPeerMain corresponde ao ZooKeeper e o processo Kafka ao broker.
Ordem de inicialização: ZooKeeper → Kafka
Ordem de encerramento: Kafka → ZooKeeper
Gerenciamento de Tópicos
Tópicos são classificações lógicas de mensagens que permitem aos consumidores assinar apenas os dados de interesse. A operação pode ser feita via linha de comando, ferramentas, API Java ou criação automática.
Criando um Tópico
kafka-topics.bat --bootstrap-server localhost:9092 --create --topic eventos
Listando Tópicos
kafka-topics.bat --bootstrap-server localhost:9092 --list
Detalhes de um Tópico
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic eventos
Alterando Partições
kafka-topics.bat --bootstrap-server localhost:9092 --topic eventos --alter --partitions 3
Removendo um Tópico
kafka-topics.bat --bootstrap-server localhost:9092 --topic eventos --delete
Nota: No Windows, a remoção pode causar instabilidade no broker. Para ambientes Linux ou durante desenvolvimento, pode ser necessário limpar manualmente o diretório de dados e reiniciar.
Produzindo e Consumindo Dados
Via Linha de Comando
Inicie um produtor no terminal:
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic eventos
Em outro terminal, inicie um consumidor:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic eventos
Via API Java
Adicione a dependência Maven ao seu projeto:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
</dependencies>
Exemplo de Produtor:
package br.com.kafka.demo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class GeradorMensagens {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> gerador = new KafkaProducer<>(props);
for (int idx = 0; idx < 15; idx++) {
ProducerRecord<String, String> mensagem = new ProducerRecord<>(
"eventos", "chave-" + idx, "conteudo-" + idx
);
gerador.send(mensagem);
}
gerador.close();
}
}
Exemplo de Consumidor:
package br.com.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class LeitorMensagens {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", "grupo-processamento");
KafkaConsumer<String, String> leitor = new KafkaConsumer<>(props);
leitor.subscribe(Collections.singletonList("eventos"));
while (true) {
ConsumerRecords<String, String> registros = leitor.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> registro : registros) {
System.out.println(registro);
}
}
}
}
Conceitos Fundamentais do Kafka
Arquitetura de Cluster
Broker
Cada processo Kafka em execução é denominado Broker. Em ambiente de cluster, cada broker possui um identificador único configurado em server.properties:
# ID único do broker no cluster
broker.id=1
Controller
O Kafka adota uma arquitetura mestre-escravo. O nó Controller gerencia o cluster inteiro, auxiliado pelo ZooKeeper. Suas responsabilidades incluem:
- Gerenciamento de brokers (adição/remoção)
- Criação, alteração e exclusão de tópicos
- Gerenciamento de partições e réplicas
- Inicialização das máquinas de estado de partições e réplicas
Se o Controller falhar, o ZooKeeper promove uma nova eleição entre os brokers disponíveis para garantir alta disponibilidade.
Inicialização do ZooKeeper e Eleição do Controller
O processo de eleição utiliza três mecanismos do ZooKeeper:
- Criação de nós: Cada broker tenta criar um nó temporário
/controller - Exclusividade: Apenas um broker consegue criar o nó com sucesso
- Monitoramento: Os demais brokers monitoram o nó
/controllerpara detectar falhas
Quando o Controller falha, o nó temporário é removido automaticamente, e os outros brokers competem para criar um novo nó, elegendo assim um novo Controller.
Componentes Internos do Broker
Durante a inicialização, cada broker cria diversos componentes:
- KafkaScheduler: Agendador de tarefas internas
- LogManager: Gerencia operações de criação, consulta e limpeza de dados
- RemoteLogManager: Sincronização de dados entre brokers
- ReplicaManager: Gerencia réplicas dos tópicos
- ZkMetadataCache: Cache local de metadados do ZooKeeper
- SocketServer: Camada de comunicação de rede (NIO)
O broker também registra seu ID no ZooKeeper como nó temporário. Se a conexão cair, o nó é removido automaticamente.
Tópicos, Partições e Réplicas
Tópico (Topic)
Um tópico é a classificação lógica das mensagens no Kafka. Produtores enviam dados a tópicos específicos e consumidores assinam tópicos de interesse. Recomenda-se evitar nomes com underscores e pontos simultaneamente para prevenir conflitos com métricas.
Partição (Partition)
Para suportar alto volume de dados, um tópico pode ser dividido em múltiplas partições. Cada partição é uma fila ordenada onde cada mensagem recebe um offset sequencial crescente. As partições são distribuídas entre os brokers para balanceamento de carga.
Por padrão, um tópico é criado com 1 partição, configurável via parâmetro --partitions.
Réplica (Replication)
Para tolerância a falhas, cada partição pode ter múltiplas réplicas distribuídas em diferentes brokers. Uma réplica é designada como Leader (responde a leituras e escritas) e as demais como Followers (apenas fazem backup). Réplicas de uma mesma partição nunca devem estar no mesmo broker.
Log
As mensagens são armazenadas em arquivos com extensão .log. Quando um tópico é criado, os diretórios e arquivos de log correspondentes são gerados automaticamente nos brokers designados.
Produção de Dados
Passos Básicos
1. Configuração do Produtor
| Parâmetro | Descrição | Tipo | Padrão |
|---|---|---|---|
bootstrap.servers |
Endereço do cluster (broker1:porta,broker2:porta) | Obrigatório | - |
key.serializer |
Classe de serialização da chave | Obrigatório | - |
value.serializer |
Classe de serialização do valor | Obrigatório | - |
acks |
Tipo de confirmação: 0, 1, all(-1) | Opcional | all |
retries |
Número de tentativas de reenvio | Opcional | Integer.MAX_VALUE |
batch.size |
Tamanho do lote em bytes | Opcional | 16384 |
buffer.memory |
Memória do buffer do coletor de dados | Opcional | 33554432 |
linger.ms |
Tempo de espera no buffer antes do envio | Opcional | 0 |
max.in.flight.requests.per.connection |
Requisições simultâneas por conexão | Opcional | 5 |
enable.idempotence |
Habilitar idempotência | Opcional | true |
2. Preparação da Mensagem
Os dados são encapsulados em um ProducerRecord contendo tópico (obrigatório), valor (obrigatório), chave (opcional, usada para roteamento de partição) e número da partição (opcional).
3. Componentes Internos do Envio
O processo de envio envolve três componentes principais:
- KafkaProducer: Aplica interceptores, seiralização e calcula a partição de destino antes de enviar ao buffer.
- RecordAccumulator: Buffer que acumula mensagens em lotes (batch) por partição. O tamanho padrão do lote é 16KB. Lotes fechados aguardam envio.
- Sender: Thread que lê lotes fechados do buffer, reorganiza por broker e envia requisições de produção pela rede.
Interceptores
Interceptores permitem processar mensagens antes do envio e após o recebimento de confirmações. Implementam a interface ProducerInterceptor:
package br.com.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TransformadorValor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> registro) {
String valorTransformado = registro.value().toUpperCase();
return new ProducerRecord<>(
registro.topic(), registro.partition(),
registro.timestamp(), registro.key(),
valorTransformado, registro.headers()
);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception excecao) { }
@Override
public void close() { }
@Override
public void configure(Map<String, ?> configs) { }
}
Para registrar o interceptor, adicione à configuração:
props.put("interceptor.classes", TransformadorValor.class.getName());
Múltiplos interceptores são executados sequencialmente. Exceções em um interceptor não afetam os subsequentes.
Métodos de Envio
Envio com Callback (Assíncrono)
gerador.send(mensagem, (metadata, excecao) -> {
if (excecao != null) {
System.err.println("Falha no envio: " + excecao.getMessage());
} else {
System.out.println("Enviado com sucesso: " + metadata);
}
});
Envio Síncrono (via Future)
Future<RecordMetadata> resultado = gerador.send(mensagem);
RecordMetadata metadata = resultado.get(); // bloqueia até confirmação
Roteamento de Partições
Partição Específica
ProducerRecord<String, String> msg = new ProducerRecord<>(
"eventos", 0, "minha-chave", "meu-valor"
);
Partição por Chave
Quando uma chave é fornecida, o Kafka calcula a partição usando o algoritmo murmur2 sobre os bytes serializados da chave, aplicando módulo com o número de partições.
Sem Chave (Sticky Partitioning)
Quando nenhuma chave é fornecida, o Kafka utiliza a estratégia de partição adesiva: escolhe uma partição aleatoriamente e direciona mensagens para ela até atingir o tamanho do lote, depois muda para outra partição.
Partitioner Personalizado
package br.com.kafka.particionador;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class ParticionadorCircular implements Partitioner {
private final AtomicLong contador = new AtomicLong(0);
@Override
public int partition(String topico, Object chave, byte[] bytesChave,
Object valor, byte[] bytesValor, Cluster cluster) {
int totalParticoes = cluster.partitionCountForTopic(topico);
return (int) (Math.abs(contador.getAndIncrement()) % totalParticoes);
}
@Override
public void close() { }
@Override
public void configure(Map<String, ?> configs) { }
}
Registre a classe na configuração:
props.put("partitioner.class", ParticionadorCircular.class.getName());
Confiabilidade das Mensagens (ACK)
| ACK | Comportamento | Confiabilidade | Desempenho |
|---|---|---|---|
| 0 | Resposta imediata após envio à rede | Baixa (pode perder dados) | Muito alto |
| 1 | Resposta após Leader persistir localmente | Moderada (falha do Leader perde dados) | Alto |
| all (-1) | Resposta após todos os ISR persistirem | Alta | Moderado |
O parâmetro min.insync.replicas define o número mínimo de réplicas que devem estar sincronizadas para que escritas com acks=all sejam aceitas.
Idempotência
Para resolver problemas de duplicação e desordenação causados por retentativas, o Kafka implementa idempotência através de:
- Producer ID (PID): Identificador único do produtor
- Número de Sequência: Sequencial crescente por partição para cada produtor
O broker mantém um buffer dos últimos 5 lotes por partição/produtor. Antes de aceitar um novo lote, verifica:
- Se o número de sequência já existe → descarta (duplicata)
- Se o número de sequência não é contíguo → erro de ordem
- Caso contrário → aceita e atualiza o buffer
Limitações: Idempotência funciona apenas para uma partição dentro de uma sessão do produtor.
Transações
Para superar as limitações da idempotência, o Kafka oferece suporte a transações distribuídas. O transactional.id é vinculado ao Producer ID, garantindo consistência entre reinicializações.
props.put("transactional.id", "minha-transacao-001");
props.put("enable.idempotence", true);
KafkaProducer<String, String> gerador = new KafkaProducer<>(props);
gerador.initTransactions();
try {
gerador.beginTransaction();
// ... enviar mensagens ...
gerador.commitTransaction();
} catch (Exception e) {
gerador.abortTransaction();
}
As transações utilizam commit em duas fases para garantir atomicidade. Consumidores com isolation.level=read_committed só enxergam dados de transações confirmadas.
Semânticas de Entrega
| Semântica | Descrição | Exemplo |
|---|---|---|
| No máximo uma vez | Dados podem ser perdidos, nunca duplicados | ACK=0 |
| No mínimo uma vez | Dados nunca perdidos, podem ser duplicados | ACK=1 |
| Exatamente uma vez | Dados nem perdidos nem duplicados | Idempotência + Transações + ACK=all |
Armazenamento de Mensagens
Fluxo de Armazenamento no Broker
Quando o broker recebe uma requisição de produção, os seguintes passos são executados:
- Validação de ACK: Verifica se o tipo de confirmação solicitado é válido
- Validação de tópico interno: Impede escrita nos tópicos internos
__transaction_statee__consumer_offsets - Validação de ISR: Para ACK=all, verifica se o número mínimo de réplicas sincronizadas (
min.insync.replicas) é atendido - Rotação do log: Cria novos segmentos de log quando necessário (tamanho > 1GB ou tempo > 7 dias)
- Verificação de duplicatas: Valida números de sequência para garantir idempotência
- Persistência: Escreve os dados via FileChannel do LogSegment
Formato dos Arquivos de Armazenamento
Arquivo de Log (.log)
Cada arquivo de log contém os dados reais das mensagens. O nome do arquivo (20 dígitos) representa o offset base do primeiro registro contido nele.
O conteúdo de um arquivo de log pode ser inspecionado com:
kafka-run-class.bat kafka.tools.DumpLogSegments ^
--files caminho/para/00000000000000000000.log ^
--print-data-log
Cada registro no arquivo é composto por um cabeçalho do lote (61 bytes) contendo offset base, timestamps, producer ID, CRC e contagem de registros, seguido pelos corpos das mensagens individuais com chave, valor e metadados.
Arquivo de Índice (.index)
Para acesso eficiente, o Kafka mantém um índice esparso que mapeia offsets lógicos para posições físicas no arquivo de log. A cada 4KB de dados gravados (configurável via log.index.interval.bytes), uma entrada de índice é adicionada. A busca utiliza busca binária para localizar registros rapidamente.
Arquivo de Índice Temporal (.timeindex)
Este arquivo mapeia timestamps para offsets, permitindo consultas baseadas em tempo. A partir do timestamp, encontra-se o offset, que por sua vez permite a localização física via índice.
Replicação de Dados entre Réplicas
Réplicas Follower executam threads ReplicaFetcherThread que periodicamente:
- Truncam: Ajustam o log local para alinhar com o Leader (via Leader Epoch ou High Watermark)
- Buscam: Solicitam dados novos ao Leader
- Persistem: Escrevem os dados recebidos em seus próprios logs
- Atualizam offsets: Sincronizam LEO e HW
Offsets e Marcadores de Água
- Offset: Posição sequencial de cada mensagem (começa em 0)
- LSO (Log Start Offset): Offset inicial do log de uma réplica
- LEO (Log End Offset): Próximo offset a ser gravado
- HW (High Watermark): Offset até o qual os dados estão replicados em todos os ISR — consumidores só enxergam dados até este ponto
Propagação do High Watermark
O HW é calculado como o menor LEO entre todas as réplicas do ISR. Quando um Follower replica dados com sucesso, o Leader recalcula o HW incluindo o novo LEO do Follower. Este mecanismo garante que consumidores nunca leiam dados não replicados, mesmo em caso de falha do Leader.
ISR (In-Sync Replicas)
O ISR é o subconjunto de réplicas que estão sincronizadas com o Leader. O ReplicaManager periodicamente:
- Contrai o ISR: Remove réplicas que excedem o tempo máximo de atraso (
replica.lag.time.max.ms, padrão 30s) - Expande o ISR: Readiciona réplicas que recuperaram sincronização
- Propaga mudanças: Notifica outros brokers sobre alterações no ISR a cada 2500ms
Consumo de Mensagens
Configuração do Consumidor
| Parâmetro | Descrição | Tipo | Padrão |
|---|---|---|---|
bootstrap.servers |
Endereço do cluster | Obrigatório | - |
key.deserializer |
Classe de deserialização da chave | Obrigatório | - |
value.deserializer |
Classe de deserialização do valor | Obrigatório | - |
group.id |
Identificador do grupo de consumidores | Obrigatório | - |
auto.offset.reset |
Comportamento sem offset inicial | Opcional | latest |
enable.auto.commit |
Auto-commit de offsets | Opcional | true |
auto.commit.interval.ms |
Intervalo de auto-commit | Opcional | 5000 |
fetch.max.bytes |
Máximo de bytes por requisição fetch | Opcional | 52428800 |
Grupo de Consumidores (Consumer Group)
O Kafka adota o modelo pull (consumidor solicita dados), em vez de push, permitindo que cada consumidor processe dados em seu próprio ritmo.
Para escalabilidade horizontal, consumidores são organizados em grupos. As regras fundamentais são:
- Cada partição é consumida por apenas um consumidor dentro do grupo
- Um consumidor pode ler de múltiplas partições
- O número ideal de consumidores ≤ número de partições
Coordenador do Grupo (Group Coordinator)
Cada broker possui um componente Group Coordinator que gerencia:
- Adesão e saída de membros do grupo
- Eleição do líder do grupo (primeiro membro a entrar)
- Distribuição de partições entre consumidores
- Gerenciamento de offsets
Estratégias de Distribuição de Partições
O líder do grupo determina como as partições são distribuídas. Quatro estratégias estão disponíveis:
RoundRobinAssignor (Distribuição Circular)
Ordena consumidores por memberId e distribui partições circularmente. Pode ser desbalanceado quando consumidores assinam diferentes tópicos.
RangeAssignor (Distribuição por Faixa)
Para cada tópico, divide as partições igualmente entre consumidores. Consumidores com memberId menor podem receber partições extras. Pode causar desbalanceamento com múltiplos tópicos.
StickyAssignor (Distribuição Adesiva)
Durante rebalanceamentos, tenta manter as atribuições originais, redistribuindo apenas o necessário. Resulta em distribuição mais uniforme e eficiente.
CooperativeStickyAssignor (Distribuição Adesiva Cooperativa)
Evolução do StickyAssignor (Kafka 2.4+) que utiliza o protocolo COOPERATIVE: em vez de um rebalanceamento global, realiza pequenos rebalanceamentos incrementais até convergir, minimizando interrupções.
Configuração:
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Gerenciamento de Offsets no Consumidor
Offset Inicial
O parâmetro auto.offset.reset define o comportamento quando não há offset salvo:
- earliest: Consuma desde o início do tópico
- latest: Consuma apenas mensagens novas (padrão)
- none: Gera exceção se não houver offset
Seek Manual
É possível posicionar o consumidor em um offset específico:
leitor.subscribe(Collections.singletonList("eventos"));
leitor.poll(Duration.ofMillis(100)); // necessário para obter atribuições
for (TopicPartition part : leitor.assignment()) {
if ("eventos".equals(part.topic())) {
leitor.seek(part, 10L); // começa do offset 10
}
}
Auto-commit
Por padrão, o consumidor envia offsets automaticamente a cada 5 segundos. Isto pode causar:
- Reprocessamento: Se o consumidor falhar antes do próximo commit, mensagens serão reprocessadas
- Perda: Se o commit ocorrer antes do processamento, mensagens podem ser perdidas
Manual Commit
Para controle preciso, desabilite o auto-commit e faça commit manualmente:
props.put("enable.auto.commit", false);
// No loop de consumo:
while (true) {
ConsumerRecords<String, String> registros = leitor.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> reg : registros) {
processar(reg);
}
leitor.commitSync(); // síncrono (mais seguro)
// leitor.commitAsync(); // assíncrono (mais rápido)
}
- commitSync: Bloqueia até confirmação; retenta em caso de falha
- commitAsync: Não bloqueia; sem retentativa automática em caso de falha
Níveis de Isolamento de Transações
// Somente dados de transações confirmadas (padrão)
props.put("isolation.level", "read_committed");
// Inclui dados de transações não confirmadas
props.put("isolation.level", "read_uncommitted");
Armazenamento de Offsets
Os offsets dos consumidores são gravados no tópico interno __consumer_offsets (50 partições por padrão). A partição de destino é calculada por hash(groupId) % numParticoes. A chave do registro é groupId + tópico + partição e o valor é o offset.
Fluxo de Leitura de Dados
Quando um consumidor solicita dados:
- O broker recebe a requisição FetchRequest
- O ReplicaManager identifica a partição-alvo
- A partir do Kafka 2.4, o broker pode servir dados de Follower preferido (para otimização de tráfego em clusters multi-datacenter)
- O LogSegment lê os dados do disco
- A transferência utiliza zero-copy (FileChannel do NIO) para máxima eficiência