Para produzir rapidamente um grande volume de dados no Kafka, uma abordagem eifciente é gerar os registros previamente usando o PostgreSQL e depois exportá-los para um tópico Kafka. O processo envolve definir um modelo de dados, criar uma tabela no banco de dados, executar um script para gerar os dados em massa, exportar para um arquivo JSON e, finalmente, publicá-los no Kafka.
Considere o seguinte exemplo de modelo JSON, onde campos como timestamp_criacao, nome_qualificado, nome e identificador variam a cada registro:
{
"versao": {
"versao": "1.0.0",
"partes_versao": [1]
},
"tipo_compressao_msg": "1",
"idx_divisao_msg": 1,
"ip_origem_msg": "10.82.13.74",
"criado_por_msg": "DELL",
"timestamp_criacao_msg": 1711525684104,
"mensagem": {
"tipo": "CRIACAO_ENTIDADE_V2",
"usuario": "DELL",
"entidades": {
"lista_entidades": [
{
"nome_tipo": "tipo_entidade_exemplo",
"atributos": {
"nome_qualificado": "1711525684104",
"id_locatario": "locatario12",
"nome": "teste_milhoes_2024-03-27 07:48:04.104966+00",
"id_usuario": "usuario12",
"versao": 1
},
"identificador": "-1711525684104",
"incompleto": false,
"tipo_proveniencia": 0,
"versao": 0,
"proxy": false
}
]
}
}
}
Primeiro, crie uma tabela no PostgreSQL com colunas correspondentes, ajustando nomes para maior clareza:
CRIAR TABELA registros_teste (
versao JSON,
tipo_compressao VARCHAR,
idx_divisao INTEGER,
ip_origem VARCHAR,
criado_por VARCHAR,
timestamp_criacao BIGINT,
dados_mensagem JSON
);
Em seguida, utilize um script PL/pgSQL para gerar um milhão de registros. Modifique a lógica para usar uma função de tempo atual com precisão e uma estrutura de loop alternativa:
FAÇA $$
DECLARE
contador INTEGER := 1;
payload JSON;
tempo_base BIGINT;
BEGIN
tempo_base := EXTRACT(EPOCH FROM NOW()) * 1000;
WHILE contador <= 1000000 LOOP
payload := json_build_object(
'tipo', 'CRIACAO_ENTIDADE_V2',
'usuario', 'SISTEMA',
'entidades', json_build_object(
'lista_entidades', json_build_array(
json_build_object(
'nome_tipo', 'tipo_dinamico',
'atributos', json_build_object(
'nome_qualificado', tempo_base + contador,
'id_locatario', 'loc_' || contador,
'nome', 'registro_' || (tempo_base + contador),
'id_usuario', 'user_' || (contador % 1000),
'versao', 1
),
'identificador', '-' || (tempo_base + contador),
'incompleto', FALSE,
'tipo_proveniencia', 0,
'versao', 0,
'proxy', FALSE
)
)
)
);
INSERIR EM registros_teste (
versao, tipo_compressao, idx_divisao, ip_origem, criado_por, timestamp_criacao, dados_mensagem
) VALORES (
'{"versao":"1.0.0","partes_versao":[1]}', '1', 1,
'10.82.13.74', 'SISTEMA', tempo_base, payload
);
contador := contador + 1;
END LOOP;
END $$;
Para exportar os dados, use o comando COPY no psql para gerar um arquivo JSON com todos os registros da tabela:
-- No terminal, acesse o banco de dados e execute:
\c nome_do_banco;
COPIAR (SELECIONAR row_to_json(reg) DE registros_teste reg) PARA '/tmp/dados_kafka.json';
Por fim, envie os dados para o Kafka usando o console producer. Certifique-se de que o Kafka está em execução e que o tópico existe:
/opt/kafka_2.13-2.8.1/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic TOPICO_DESTINO < /tmp/dados_kafka.json