O formato de dados de treinamento do FastText segue uma estrutura simples: cada linha representa uma amostra, com o rótulo precedido pelo prefixo __label__ e separado do texto por espaço ou tabulação. Para tarefas de classificação de texto, é comum um único rótulo por amostra. No entanto, textos longos podem conter quebras de linha, o que exige uma abordagem ciudadosa ao processar o arquivo.
Ao converter para o formato Parquet, é necessário considerar a manipulação de textos multiline, o processamento de grandes volumes de dados e o salvamento em lotes. Utilizando a biblioteca Polars em Python, podemos implementar uma solução eficiente que respeite esses requisitos, sem alterar o conteúdo original dos rótulos e textos.
Abaixo, apresento um script Python que realiza essa conversão. O código foi reestruturado para melhor clareza, com alterações nas funções e variáveis, mantendo a funcionalidade essencial. Ele permite configurar o diretório de saída, o número máximo de linhas por arquivo Parquet e o tamanho dos grupos de linhas dentro de cada arquivo.
#!/usr/bin/env python3
import os
import polars as pl
from pathlib import Path
import argparse
PREFIXO_ROTULO = "__label__"
def extrair_amostras(chunk_linhas):
amostras = []
rotulo_corrente = None
texto_corrente = []
def finalizar_amostra():
if rotulo_corrente is not None:
texto_completo = "".join(texto_corrente).strip()
amostras.append((rotulo_corrente, texto_completo))
for linha in chunk_linhas:
if linha.startswith(PREFIXO_ROTULO):
finalizar_amostra()
partes = linha.split(maxsplit=1)
rotulo_corrente = partes[0].replace(PREFIXO_ROTULO, "")
texto_corrente = [partes[1] if len(partes) > 1 else ""]
else:
if rotulo_corrente is not None:
texto_corrente.append("\n" + linha)
finalizar_amostra()
return amostras
def ler_arquivo_grande(caminho, tamanho_chunk=50*1024*1024):
with open(caminho, "rb") as f:
sobra = b""
while True:
bloco = f.read(tamanho_chunk)
if not bloco:
if sobra:
yield sobra.decode("utf-8").splitlines(keepends=True)
break
ultimo_nova_linha = bloco.rfind(b"\n")
if ultimo_nova_linha == -1:
sobra += bloco
continue
trecho_completo = sobra + bloco[:ultimo_nova_linha + 1]
sobra = bloco[ultimo_nova_linha + 1:]
yield trecho_completo.decode("utf-8").splitlines(keepends=True)
def executar_conversao(args):
os.makedirs(args.diretorio_saida, exist_ok=True)
max_linhas_arquivo = args.linhas_por_arquivo
tamanho_grupo = args.linhas_por_grupo
total_gravado = 0
contador_arquivos = 0
buffer_dados = []
schema_df = {"rotulo": pl.Utf8, "texto": pl.Utf8}
for chunk in ler_arquivo_grande(args.arquivo_entrada):
amostras_extraidas = extrair_amostras(chunk)
buffer_dados.extend(amostras_extraidas)
while len(buffer_dados) >= max_linhas_arquivo:
df = pl.DataFrame(buffer_dados[:max_linhas_arquivo], schema=schema_df)
caminho_saida = Path(args.diretorio_saida) / f"parte{contador_arquivos:05d}.parquet"
df.write_parquet(caminho_saida, row_group_size=tamanho_grupo, compression="snappy")
total_gravado += len(df)
print(f"Arquivo gerado: {caminho_saida} | Linhas: {len(df)} | Total acumulado: {total_gravado}")
buffer_dados = buffer_dados[max_linhas_arquivo:]
contador_arquivos += 1
if buffer_dados:
df = pl.DataFrame(buffer_dados, schema=schema_df)
caminho_saida = Path(args.diretorio_saida) / f"parte{contador_arquivos:05d}.parquet"
df.write_parquet(caminho_saida, row_group_size=tamanho_grupo, compression="snappy")
total_gravado += len(df)
print(f"Arquivo gerado: {caminho_saida} | Linhas: {len(df)} | Total acumulado: {total_gravado}")
def principal():
analisador = argparse.ArgumentParser(description="Converter corpus FastText de rótulo único para formato Parquet.")
analisador.add_argument("--arquivo_entrada", required=True, help="Caminho para o arquivo de dados FastText (.txt)")
analisador.add_argument("--diretorio_saida", required=True, help="Diretório de destino para os arquivos Parquet")
analisador.add_argument("--linhas_por_arquivo", type=int, default=1000000, help="Quantidade máxima de linhas por arquivo Parquet de saída")
analisador.add_argument("--linhas_por_grupo", type=int, default=100000, help="Número de linhas por grupo de linhas dentro de cada arquivo Parquet")
argumentos = analisador.parse_args()
executar_conversao(argumentos)
if __name__ == "__main__":
principal()
Para utilizar o script, salve-o como fasttext_para_parquet.py e execute via linha de comando. Exemplo de chamada:
python fasttext_para_parquet.py \
--arquivo_entrada corpus_fasttext.txt \
--diretorio_saida dados_parquet \
--linhas_por_arquivo 500000 \
--linhas_por_grupo 50000
Os arquivos Parquet resultantes serão salvos no diretório especificado, prontos para análise com ferramentas como Polars ou outros sistemas compatíveis.