O Pentaho Data Integration (PDI), também conhecido pelo nome histórico Kettle, é uma ferramenta de ETL visual amplamente utilizada para movimentação, transformação e carga de dados. O ambiente gráfico Spoon permite construir fluxos de dados por meio de componentes arrastáveis, reduzindo a necessidade de codificação manual.
Principais capacidades do PDI:
- Interface drag-and-drop para design de transformações e jobs;
- Conectividade nativa com bancos relacionais, arquivos planos, APIs, HDFS, Hive e Kafka;
- Integração com o ecossistema Hadoop via drivers JDBC e steps especializados;
- Execução em linha de comando por meio dos utilitários
PaneKitchen; - Suporte a variáveis, parâmetros, logs detalhados e tratamento de erros.
Posicionamento do PDI em uma plataforma de dados
Em uma arquitetura de dados moderna, o PDI atua principalmente na camada de ingestão e pré-processamento. Um fluxo típico pode ser representado assim:
[Fontes Heterogêneas] → [PDI/Kettle] → [Data Lake / HDFS] → [Spark / Flink] → [Hive / ClickHouse] → [BI / Dashboards]
Funções desempenhadas pelo PDI nesse cenário:
| Função | Descrição |
|---|---|
| Captura | Coleta de dados de APIs, bancos, arquivos CSV, JSON e serviços web. |
| Conversão | Normalização de tipos, padronização de datas, unificação de formatos. |
| Higienização | Remoção de duplicatas, registros inválidos, valores nulos e campos sensíveis. |
| Carga | Escrita em arquivos locais, HDFS, tabelas Hive, Kafka topics ou bancos relacionais. |
Caso prático: ingestão diária de CSV para o Hive
Objetivo
Construir um pipeline que leia arquivos diários de pedidos no formato CSV, aplique regras de limpeza e insira os registros em uma tabela Hive particionada por data.
Stack tecnológico
- Fonte: arquivos CSV gerados diariamente em um servidor local ou remoto;
- ETL: PDI 9.x com interface Spoon;
- Destino: tabela Hive particionada por
dt_processamento; - Agendamento: execução via
Kitchenacionada por crontab.
Desenvolvendo a transformação no Spoon
1. Leitura do arquivo
Adicione um step Text file input e configure:
- Caminho do arquivo com variável, por exemplo
${CAMINHO_ENTRADA}/pedidos_*.csv; - Codificação UTF-8 e delimitador
;ou|; - Mapeamento explícito das colunas e tipos de dados.
2. Tratamento e limpeza
Insira os steps necessários para garantir a qualidade dos dados:
- Filter rows: descarte registros com
id_pedidovazio ou quantidade negativa; - String operations / Replace in string: remova espaços em branco e padronize caixas alta/baixa;
- Select values: renomeie campos e converta tipos para
Integer,DecimaleTimestamp; - Calculator / Formula: crie colunas derivadas, como valor total e categoria de faixa.
3. Carga no Hive
Conecte um step Table output ou Hive bulk loader com as seguintes configurações:
- Connection JDBC apontando para o HiveServer2;
- Schema e nome da tabela de destino;
- Mapeamento entre campos do fluxo e colunas da tabela;
- Ativação do batch insert para aumentar a vazão.
Caso a tabela seja particionada, inclua o campo de partição dt_processamento na lista de colunas e garanta que ele seja populado antes da carga.
Orquestrando com Job
Crie um Job (.kjb) para coordenar a execução. Estrutura recomendada:
- Start;
- Check if file exists: valida a presença do CSV antes de prosseguir;
- Transformation: executa a
.ktrde limpeza e carga; - Success/Abort: finaliza ou dispara alerta em caso de falha;
- Mail: envia notificação com resumo da execução.
Execução e agendamento
Para executar a transformação diretamente, utilize o pan.sh:
/opt/pdi/pan.sh \
-file="/etl/transformacoes/limpa_pedidos.ktr" \
-level=Basic
Para executar o Job de forma completa, utilize o kitchen.sh:
/opt/pdi/kitchen.sh \
-file="/etl/jobs/ingesta_pedidos.kjb" \
-level=Detailed \
-logfile="/var/log/pdi/ingesta_pedidos.log"
Agendamento diário via crontab:
30 2 * * * /opt/pdi/kitchen.sh -file="/etl/jobs/ingesta_pedidos.kjb" -logfile="/var/log/pdi/ingesta_pedidos_$(date +\%Y\%m\%d).log" >> /var/log/pdi/cron.log 2>&1
Integração com o ecossistema Big Data
HDFS
Os steps Hadoop File Input e Hadoop File Output permitem ler e escrever arquivos no HDFS. É possível também trabalhar com formatos colunares como ORC e Parquet quando combinados a processos externos ou steps específicos.
Kafka
Steps de entrada e saída para Kafka possibilitam consumir tópicos para ingestão contínua de eventos. Esse padrão é comum em cenários de logs de aplicação, telemetria de dispositivos IoT e eventos de transação.
Spark e Flink
Embora o PDI não realize processamentos distribuídos complexos, ele funciona como uma camada de staging: limpa, enriquece e deposita os dados em um local acessível, de onde motores como Spark e Flink assumem as análises avançadas.
Otimização e resolução de problemas
| Situação | Ação recomendada |
|---|---|
| Carga no Hive lenta | Use tabela particionada, batch insert e considere carga por arquivo em vez de insert linha a linha. |
| Arquivo CSV mal interpretado | Verifique o encoding, o delimitador e a presença de linhas de cabeçalho. |
| Falha no agendamento | Verifique permissões do script, caminhos absolutos e logs do kitchen.sh. |
| Alto consumo de memória | Aumente o heap do Java, reduza o tamanho do batch e evite lookups desnecessários. |
| Processamento monolítico | Use múltiplas cópias de steps ou divida o fluxo em sub-transformações. |