Pipeline ETL com PDI/Kettle: Arquitetura, Ingestão de CSV e Carga no Hive

O Pentaho Data Integration (PDI), também conhecido pelo nome histórico Kettle, é uma ferramenta de ETL visual amplamente utilizada para movimentação, transformação e carga de dados. O ambiente gráfico Spoon permite construir fluxos de dados por meio de componentes arrastáveis, reduzindo a necessidade de codificação manual.

Principais capacidades do PDI:

  • Interface drag-and-drop para design de transformações e jobs;
  • Conectividade nativa com bancos relacionais, arquivos planos, APIs, HDFS, Hive e Kafka;
  • Integração com o ecossistema Hadoop via drivers JDBC e steps especializados;
  • Execução em linha de comando por meio dos utilitários Pan e Kitchen;
  • Suporte a variáveis, parâmetros, logs detalhados e tratamento de erros.

Posicionamento do PDI em uma plataforma de dados

Em uma arquitetura de dados moderna, o PDI atua principalmente na camada de ingestão e pré-processamento. Um fluxo típico pode ser representado assim:

[Fontes Heterogêneas] → [PDI/Kettle] → [Data Lake / HDFS] → [Spark / Flink] → [Hive / ClickHouse] → [BI / Dashboards]

Funções desempenhadas pelo PDI nesse cenário:

Função Descrição
Captura Coleta de dados de APIs, bancos, arquivos CSV, JSON e serviços web.
Conversão Normalização de tipos, padronização de datas, unificação de formatos.
Higienização Remoção de duplicatas, registros inválidos, valores nulos e campos sensíveis.
Carga Escrita em arquivos locais, HDFS, tabelas Hive, Kafka topics ou bancos relacionais.

Caso prático: ingestão diária de CSV para o Hive

Objetivo

Construir um pipeline que leia arquivos diários de pedidos no formato CSV, aplique regras de limpeza e insira os registros em uma tabela Hive particionada por data.

Stack tecnológico

  • Fonte: arquivos CSV gerados diariamente em um servidor local ou remoto;
  • ETL: PDI 9.x com interface Spoon;
  • Destino: tabela Hive particionada por dt_processamento;
  • Agendamento: execução via Kitchen acionada por crontab.

Desenvolvendo a transformação no Spoon

1. Leitura do arquivo

Adicione um step Text file input e configure:

  • Caminho do arquivo com variável, por exemplo ${CAMINHO_ENTRADA}/pedidos_*.csv;
  • Codificação UTF-8 e delimitador ; ou |;
  • Mapeamento explícito das colunas e tipos de dados.

2. Tratamento e limpeza

Insira os steps necessários para garantir a qualidade dos dados:

  • Filter rows: descarte registros com id_pedido vazio ou quantidade negativa;
  • String operations / Replace in string: remova espaços em branco e padronize caixas alta/baixa;
  • Select values: renomeie campos e converta tipos para Integer, Decimal e Timestamp;
  • Calculator / Formula: crie colunas derivadas, como valor total e categoria de faixa.

3. Carga no Hive

Conecte um step Table output ou Hive bulk loader com as seguintes configurações:

  • Connection JDBC apontando para o HiveServer2;
  • Schema e nome da tabela de destino;
  • Mapeamento entre campos do fluxo e colunas da tabela;
  • Ativação do batch insert para aumentar a vazão.

Caso a tabela seja particionada, inclua o campo de partição dt_processamento na lista de colunas e garanta que ele seja populado antes da carga.

Orquestrando com Job

Crie um Job (.kjb) para coordenar a execução. Estrutura recomendada:

  1. Start;
  2. Check if file exists: valida a presença do CSV antes de prosseguir;
  3. Transformation: executa a .ktr de limpeza e carga;
  4. Success/Abort: finaliza ou dispara alerta em caso de falha;
  5. Mail: envia notificação com resumo da execução.

Execução e agendamento

Para executar a transformação diretamente, utilize o pan.sh:

/opt/pdi/pan.sh \
  -file="/etl/transformacoes/limpa_pedidos.ktr" \
  -level=Basic

Para executar o Job de forma completa, utilize o kitchen.sh:

/opt/pdi/kitchen.sh \
  -file="/etl/jobs/ingesta_pedidos.kjb" \
  -level=Detailed \
  -logfile="/var/log/pdi/ingesta_pedidos.log"

Agendamento diário via crontab:

30 2 * * * /opt/pdi/kitchen.sh -file="/etl/jobs/ingesta_pedidos.kjb" -logfile="/var/log/pdi/ingesta_pedidos_$(date +\%Y\%m\%d).log" >> /var/log/pdi/cron.log 2>&1

Integração com o ecossistema Big Data

HDFS

Os steps Hadoop File Input e Hadoop File Output permitem ler e escrever arquivos no HDFS. É possível também trabalhar com formatos colunares como ORC e Parquet quando combinados a processos externos ou steps específicos.

Kafka

Steps de entrada e saída para Kafka possibilitam consumir tópicos para ingestão contínua de eventos. Esse padrão é comum em cenários de logs de aplicação, telemetria de dispositivos IoT e eventos de transação.

Spark e Flink

Embora o PDI não realize processamentos distribuídos complexos, ele funciona como uma camada de staging: limpa, enriquece e deposita os dados em um local acessível, de onde motores como Spark e Flink assumem as análises avançadas.

Otimização e resolução de problemas

Situação Ação recomendada
Carga no Hive lenta Use tabela particionada, batch insert e considere carga por arquivo em vez de insert linha a linha.
Arquivo CSV mal interpretado Verifique o encoding, o delimitador e a presença de linhas de cabeçalho.
Falha no agendamento Verifique permissões do script, caminhos absolutos e logs do kitchen.sh.
Alto consumo de memória Aumente o heap do Java, reduza o tamanho do batch e evite lookups desnecessários.
Processamento monolítico Use múltiplas cópias de steps ou divida o fluxo em sub-transformações.

Tags: Kettle Pentaho Data Integration ETL hive HDFS

Publicado em 6-26 02:44