Conversão de Dados de Treinamento FastText para Parquet usando Polars

O formato de dados de treinamento do FastText segue uma estrutura simples: cada linha representa uma amostra, com o rótulo precedido pelo prefixo __label__ e separado do texto por espaço ou tabulação. Para tarefas de classificação de texto, é comum um único rótulo por amostra. No entanto, textos longos podem conter quebras de linha, o que exige uma abordagem ciudadosa ao processar o arquivo.

Ao converter para o formato Parquet, é necessário considerar a manipulação de textos multiline, o processamento de grandes volumes de dados e o salvamento em lotes. Utilizando a biblioteca Polars em Python, podemos implementar uma solução eficiente que respeite esses requisitos, sem alterar o conteúdo original dos rótulos e textos.

Abaixo, apresento um script Python que realiza essa conversão. O código foi reestruturado para melhor clareza, com alterações nas funções e variáveis, mantendo a funcionalidade essencial. Ele permite configurar o diretório de saída, o número máximo de linhas por arquivo Parquet e o tamanho dos grupos de linhas dentro de cada arquivo.

#!/usr/bin/env python3
import os
import polars as pl
from pathlib import Path
import argparse

PREFIXO_ROTULO = "__label__"

def extrair_amostras(chunk_linhas):
    amostras = []
    rotulo_corrente = None
    texto_corrente = []
    
    def finalizar_amostra():
        if rotulo_corrente is not None:
            texto_completo = "".join(texto_corrente).strip()
            amostras.append((rotulo_corrente, texto_completo))
    
    for linha in chunk_linhas:
        if linha.startswith(PREFIXO_ROTULO):
            finalizar_amostra()
            partes = linha.split(maxsplit=1)
            rotulo_corrente = partes[0].replace(PREFIXO_ROTULO, "")
            texto_corrente = [partes[1] if len(partes) > 1 else ""]
        else:
            if rotulo_corrente is not None:
                texto_corrente.append("\n" + linha)
    finalizar_amostra()
    return amostras

def ler_arquivo_grande(caminho, tamanho_chunk=50*1024*1024):
    with open(caminho, "rb") as f:
        sobra = b""
        while True:
            bloco = f.read(tamanho_chunk)
            if not bloco:
                if sobra:
                    yield sobra.decode("utf-8").splitlines(keepends=True)
                break
            ultimo_nova_linha = bloco.rfind(b"\n")
            if ultimo_nova_linha == -1:
                sobra += bloco
                continue
            trecho_completo = sobra + bloco[:ultimo_nova_linha + 1]
            sobra = bloco[ultimo_nova_linha + 1:]
            yield trecho_completo.decode("utf-8").splitlines(keepends=True)

def executar_conversao(args):
    os.makedirs(args.diretorio_saida, exist_ok=True)
    max_linhas_arquivo = args.linhas_por_arquivo
    tamanho_grupo = args.linhas_por_grupo
    total_gravado = 0
    contador_arquivos = 0
    buffer_dados = []
    schema_df = {"rotulo": pl.Utf8, "texto": pl.Utf8}
    
    for chunk in ler_arquivo_grande(args.arquivo_entrada):
        amostras_extraidas = extrair_amostras(chunk)
        buffer_dados.extend(amostras_extraidas)
        
        while len(buffer_dados) >= max_linhas_arquivo:
            df = pl.DataFrame(buffer_dados[:max_linhas_arquivo], schema=schema_df)
            caminho_saida = Path(args.diretorio_saida) / f"parte{contador_arquivos:05d}.parquet"
            df.write_parquet(caminho_saida, row_group_size=tamanho_grupo, compression="snappy")
            total_gravado += len(df)
            print(f"Arquivo gerado: {caminho_saida} | Linhas: {len(df)} | Total acumulado: {total_gravado}")
            buffer_dados = buffer_dados[max_linhas_arquivo:]
            contador_arquivos += 1
    
    if buffer_dados:
        df = pl.DataFrame(buffer_dados, schema=schema_df)
        caminho_saida = Path(args.diretorio_saida) / f"parte{contador_arquivos:05d}.parquet"
        df.write_parquet(caminho_saida, row_group_size=tamanho_grupo, compression="snappy")
        total_gravado += len(df)
        print(f"Arquivo gerado: {caminho_saida} | Linhas: {len(df)} | Total acumulado: {total_gravado}")

def principal():
    analisador = argparse.ArgumentParser(description="Converter corpus FastText de rótulo único para formato Parquet.")
    analisador.add_argument("--arquivo_entrada", required=True, help="Caminho para o arquivo de dados FastText (.txt)")
    analisador.add_argument("--diretorio_saida", required=True, help="Diretório de destino para os arquivos Parquet")
    analisador.add_argument("--linhas_por_arquivo", type=int, default=1000000, help="Quantidade máxima de linhas por arquivo Parquet de saída")
    analisador.add_argument("--linhas_por_grupo", type=int, default=100000, help="Número de linhas por grupo de linhas dentro de cada arquivo Parquet")
    argumentos = analisador.parse_args()
    executar_conversao(argumentos)

if __name__ == "__main__":
    principal()

Para utilizar o script, salve-o como fasttext_para_parquet.py e execute via linha de comando. Exemplo de chamada:

python fasttext_para_parquet.py \
  --arquivo_entrada corpus_fasttext.txt \
  --diretorio_saida dados_parquet \
  --linhas_por_arquivo 500000 \
  --linhas_por_grupo 50000

Os arquivos Parquet resultantes serão salvos no diretório especificado, prontos para análise com ferramentas como Polars ou outros sistemas compatíveis.

Tags: FastText Parquet Polars Python Conversão de Dados

Publicado em 6-16 03:25 por Thomas