Reconhecimento de Entidades Nomeadas: Pipeline Completo de Processamento e Treinamento

Visão Geral do Reconhecimento de Entidades Nomeadas

Preparação dos Dados

Iniciamos analisando a estrutura dos dados. Precisamos extrair todos os tipos de entidades dos rótulos e criar um mapeamento baseado no esquema BMES, salvando a tabela de correspondência.

import json
import os


def salvar_json(dados, caminho_arquivo):
    """
    Salva dados no formato JSON indentado
    """
    os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
    with open(caminho_arquivo, 'w', encoding='utf-8') as f:
        json.dump(dados, f, ensure_ascii=False, indent=4)


def extrair_tipos_entidade(arquivo_dados):
    """
    Extrai tipos únicos de entidade de um arquivo de dados
    """
    tipos = set()
    with open(arquivo_dados, 'r', encoding='utf-8') as f:
        dados_completos = json.load(f)
        for item in dados_completos:
            for entidade in item['entidades']:
                tipos.add(entidade['tipo'])
    return tipos


def construir_mapeamento_rotulos(arquivos_dados, arquivo_saida):
    """
    Constrói mapeamento de rótulos BMES e salva em arquivo
    """
    todos_tipos_entidade = set()
    for caminho in arquivos_dados:
        todos_tipos_entidade.update(extrair_tipos_entidade(caminho))

    tipos_ordenados = sorted(list(todos_tipos_entidade))
    print(f"Tipos de entidade encontrados: {tipos_ordenados}")

    rotulo_para_id = {'O': 0}
    for tipo_entidade in tipos_ordenados:
        for prefixo in ['B', 'M', 'E', 'S']:
            nome_rotulo = f"{prefixo}-{tipo_entidade}"
            rotulo_para_id[nome_rotulo] = len(rotulo_para_id)

    print(f"\nGerados {len(rotulo_para_id)} mapeamentos de rótulos.")
    salvar_json(rotulo_para_id, arquivo_saida)
    print(f"Mapeamento salvo em: {arquivo_saida}")


if __name__ == '__main__':
    arquivo_treino = './dados/CMeEE-V2_train.json'
    arquivo_val = './dados/CMeEE-V2_dev.json'
    caminho_saida = './dados/categorias.json'

    construir_mapeamento_rotulos(
        arquivos_dados=[arquivo_treino, arquivo_val],
        arquivo_saida=caminho_saida
    )

Para o texto: normalizamos (convertendo caracteres de largura completa para largura parcial), calculamos frequência de caracteres, removemos palavras com frequência abaixo do mínimo, adicionamos tokens de preenchimento (PAD) e desconhecido (UNK). A ordenação garante reprodutibilidade do vocabulário.

import json
import os
from collections import Counter


def salvar_json(dados, caminho_arquivo):
    os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
    with open(caminho_arquivo, 'w', encoding='utf-8') as f:
        json.dump(dados, f, ensure_ascii=False, indent=4)


def normalizar_texto(texto):
    """
    Normaliza texto convertendo caracteres de largura completa para largura parcial
    """
    largura_completa = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&’()*+,-./:;<=>?@[\]^_`{|}~""
    largura_parcial = r"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&'" + r'()*+,-./:;<=>?@[\]^_`{|}~".'
    mapeamento = str.maketrans(largura_completa, largura_parcial)
    return texto.translate(mapeamento)


def criar_vocabulario_caracteres(arquivos_dados, arquivo_saida, freq_minima=1):
    """
    Cria vocabulário em nível de caracteres a partir dos arquivos de dados
    """
    contagem_caracteres = Counter()
    for caminho in arquivos_dados:
        with open(caminho, 'r', encoding='utf-8') as f:
            dados_completos = json.load(f)
            for item in dados_completos:
                texto = normalizar_texto(item['texto'])
                contagem_caracteres.update(list(texto))

    caracteres_frequentes = [caractere for caractere, contagem in contagem_caracteres.items() if contagem >= freq_minima]
    caracteres_frequentes.sort()

    tokens_especiais = ["<PAD>", "<UNK>"]
    vocabulario_final = tokens_especiais + caracteres_frequentes

    print(f"Tamanho do vocabulário (freq_minima={freq_minima}): {len(vocabulario_final)}")
    salvar_json(vocabulario_final, arquivo_saida)
    print(f"Vocabulário salvo em: {arquivo_saida}")


if __name__ == '__main__':
    arquivo_treino = './dados/CMeEE-V2_train.json'
    arquivo_val = './dados/CMeEE-V2_dev.json'
    caminho_saida = './dados/vocabulario.json'

    criar_vocabulario_caracteres(
        arquivos_dados=[arquivo_treino, arquivo_val],
        arquivo_saida=caminho_saida,
        freq_minima=1
    )

Criamos a classe Vocabulary para gerenciar o mapeamento entre tokens e IDs, com métodos __len__ e conversor de tokens para IDs.

Criamos o Dataset, inicializando com o texto, mapeamento de rótulos e vocabulário. Implementamos os métodos __len__ e __getitem__, onde o método __getitem__ converte os dados de entrada para IDs de tokens e mapeamento de rótulos, retornando token_ids e label_ids.

Criamos o DataLoader, construindo primeiro o collate_fn para realizar o padding das sequências e criar a máscara de atenção.

import json
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence


def normalizar_texto(texto):
    """
    Normaliza texto convertendo caracteres de largura completa para largura parcial
    """
    largura_completa = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&’()*+,-./:;<=>?@[\]^_`」|}~""
    largura_parcial = r"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&'" + r'()*+,-./:;<=>?@[\]^_`{|}~".'
    mapeamento = str.maketrans(largura_completa, largura_parcial)
    return texto.translate(mapeamento)


class Vocabulario:
    """
    Gerencia o vocabulário e o mapeamento de tokens para IDs
    """
    def __init__(self, caminho_vocabulario):
        with open(caminho_vocabulario, 'r', encoding='utf-8') as f:
            self.tokens = json.load(f)
        self.token_para_id = {token: i for i, token in enumerate(self.tokens)}
        self.id_pad = self.token_para_id['<PAD>']
        self.id_desconhecido = self.token_para_id['<UNK>']

    def __len__(self):
        return len(self.tokens)

    def converter_tokens_para_ids(self, tokens):
        return [self.token_para_id.get(token, self.id_desconhecido) for token in tokens]


class DatasetNER(Dataset):
    """
    Processa dados de NER e os converte para formato compatível com PyTorch
    """
    def __init__(self, caminho_dados, vocab: Vocabulario, mapeamento_rotulos: dict):
        self.vocab = vocab
        self.rotulo_para_id = mapeamento_rotulos
        with open(caminho_dados, 'r', encoding='utf-8') as f:
            self.registros = json.load(f)

    def __len__(self):
        return len(self.registros)

    def __getitem__(self, idx):
        registro = self.registros[idx]
        texto = normalizar_texto(registro['texto'])
        tokens = list(texto)
        
        token_ids = self.vocab.converter_tokens_para_ids(tokens)

        tags = ['O'] * len(tokens)
        for entidade in registro.get('entidades', []):
            tipo_entidade = entidade['tipo']
            inicio = entidade['indice_inicio']
            fim = entidade['indice_fim']

            if fim >= len(tokens):
                continue

            if inicio == fim:
                tags[inicio] = f'S-{tipo_entidade}'
            else:
                tags[inicio] = f'B-{tipo_entidade}'
                tags[fim] = f'E-{tipo_entidade}'
                for i in range(inicio + 1, fim):
                    tags[i] = f'M-{tipo_entidade}'
        
        label_ids = [self.rotulo_para_id.get(tag, self.rotulo_para_id['O']) for tag in tags]

        return {
            "token_ids": torch.tensor(token_ids, dtype=torch.long),
            "label_ids": torch.tensor(label_ids, dtype=torch.long)
        }


def criar_dataloader_ner(caminho_dados, vocab, mapeamento_rotulos, tamanho_batch, embaralhar=False):
    """
    Cria DataLoader para tarefa de NER
    """
    dataset = DatasetNER(caminho_dados, vocab, mapeamento_rotulos)
    
    def collate_batch(batch):
        lista_token_ids = [item['token_ids'] for item in batch]
        lista_label_ids = [item['label_ids'] for item in batch]

        token_ids_preenchidos = pad_sequence(lista_token_ids, batch_first=True, padding_value=vocab.id_pad)
        label_ids_preenchidos = pad_sequence(lista_label_ids, batch_first=True, padding_value=-100)

        mascara_atencao = (token_ids_preenchidos != vocab.id_pad).long()

        return {
            "token_ids": token_ids_preenchidos,
            "label_ids": label_ids_preenchidos,
            "attention_mask": mascara_atencao
        }

    return DataLoader(dataset, batch_size=tamanho_batch, shuffle=embaralhar, collate_fn=collate_batch)


if __name__ == '__main__':
    arquivo_treino = './dados/CMeEE-V2_train.json'
    arquivo_vocabulario = './dados/vocabulario.json'
    arquivo_categorias = './dados/categorias.json'

    vocabulario = Vocabulario(caminho_vocabulario=arquivo_vocabulario)
    with open(arquivo_categorias, 'r', encoding='utf-8') as f:
        mapeamento_rotulos = json.load(f)
    print("Vocabulário e mapeamento de rótulos carregados.")

    dataloader_treino = criar_dataloader_ner(
        caminho_dados=arquivo_treino,
        vocab=vocabulario,
        mapeamento_rotulos=mapeamento_rotulos,
        tamanho_batch=4,
        embaralhar=True
    )
    print("DataLoader criado.")

    print("\n--- Verificando um lote ---")
    lote = next(iter(dataloader_treino))
    
    print(f"  Token IDs (shape): {lote['token_ids'].shape}")
    print(f"  Label IDs (shape): {lote['label_ids'].shape}")
    print(f"  Attention Mask (shape): {lote['attention_mask'].shape}")
    print(f"  Token IDs (amostra): {lote['token_ids'][0][:20]}...")
    print(f"  Label IDs (amostra): {lote['label_ids'][0][:20]}...")
    print(f"  Attention Mask (amostra): {lote['attention_mask'][0][:20]}...")

nn.ModuleList vs nn.Sequential

No PyTorch, nn.ModuleList e nn.Sequential são ambos contêineres para armazenar múltiplos submódulos, mas possuem filosofias de design e casos de uso distintos:

  • nn.Sequential: Funciona como um pipeline automatizado onde os dados fluem automaticamente por cada camada. É adequado para pilhas lineares simples, mas não permite interações complexas entre camadas.
  • nn.ModuleList: Age mais como uma lista Python comum, armazenando módulos sem executá-los automaticamente. Você precisa escrever loops manualmente no método forward para chamar cada camada, possibilitando adicionar lógica personalizada entre camadas (como conexões residuais).

Construção do Modelo

loss_fn = nn.CrossEntropyLoss(ignore_index=-100, reduction='none')

reduction='none': A perda padrão retorna um único valor. Ao definir este parâmetro, a perda retorna o valor para cada rótulo individualmente, formando uma matriz de perdas.

ignore_index funciona: Observa-se que nas posições de preenchimento com valor -100 em label_ids, a perda correspondente é 0. Com este método, em modelos unidirecionais não precisamos passar a máscara de sequência, mas em modelos bidirecionais isso não é possível.

torch.nn.utils.rnn.pack_padded_sequence: Esta função recebe um tensor de entrada preenchido e uma lista de comprimentos reais. Ela retorna um objeto PackedSequence, que pode ser imaginado como um pacote de dados "comprimido" onde todas as posições de preenchimento são temporariamente removidas. Ao receber este objeto especial, o módulo RNN consegue processar corretamente sequências de comprimento variável de forma eficiente.

É claro, onde há "empacotamento" há "desempacotamento". A função correspondente pad_packed_sequence é responsável por "descompactar" o objeto PackedSequence após o cálculo do RNN, convertendo-o de volta para um Tensor organizado com preenchimento.

import torch
import torch.nn as nn
import torch.nn.utils.rnn as rnn

class RedeBiGRUNER(nn.Module):
    def __init__(self, tamanho_vocabulario, tamanho_oculto, numero_rotulos, numero_camadas_gru=1):
        super().__init__()
        self.embedding = nn.Embedding(tamanho_vocabulario, tamanho_oculto)

        self.camadas_gru = nn.ModuleList()
        for _ in range(numero_camadas_gru):
            self.camadas_gru.append(
                nn.GRU(
                    input_size=tamanho_oculto,
                    hidden_size=tamanho_oculto,
                    num_layers=1,
                    batch_first=True,
                    bidirectional=True
                )
            )

        self.camada_fusao = nn.Linear(tamanho_oculto * 2, tamanho_oculto)
        self.classificador = nn.Linear(tamanho_oculto, numero_rotulos)

    def forward(self, token_ids, mascara_atencao):
        comprimentos = mascara_atencao.sum(dim=1).cpu()

        texto嵌入 = self.embedding(token_ids)

        entrada_packed = rnn.pack_padded_sequence(
            texto嵌入, comprimentos, batch_first=True, enforce_sorted=False
        )

        for camada_gru in self.camadas_gru:
            saida_packed, _ = camada_gru(entrada_packed)

            saida, _ = rnn.pad_packed_sequence(
                saida_packed, batch_first=True, total_length=token_ids.shape[1]
            )

            caracteristicas = self.camada_fusao(saida)

            entrada_descompactada, _ = rnn.pad_packed_sequence(
                entrada_packed, batch_first=True, total_length=token_ids.shape[1]
            )
            entrada_atual = caracteristicas + entrada_descompactada

            entrada_packed = rnn.pack_padded_sequence(
                entrada_atual, comprimentos, batch_first=True, enforce_sorted=False
            )

        saida_final, _ = rnn.pad_packed_sequence(
            entrada_packed, batch_first=True, total_length=token_ids.shape[1]
        )

        logits = self.classificador(saida_final)

        return logits


if __name__ == '__main__':
    token_ids = torch.tensor([
        [210,   18,  871, 147,   0,   0,   0,   0],
        [922, 2962,  842, 210,  18, 871, 147,   0]
    ], dtype=torch.int64)

    mascara_atencao = torch.tensor([
        [1, 1, 1, 1, 0, 0, 0, 0],
        [1, 1, 1, 1, 1, 1, 1, 0]
    ], dtype=torch.int64)

    label_ids = torch.tensor([
        [0, 0, 0, 0, -100, -100, -100, -100],
        [0, 0, 0, 0,    0,    0,    0, -100]
    ], dtype=torch.int64)

    modelo = RedeBiGRUNER(
        tamanho_vocabulario=10000,
        tamanho_oculto=128,
        numero_rotulos=37,
        numero_camadas_gru=2
    )

    logits = modelo(token_ids=token_ids, mascara_atencao=mascara_atencao)

    funcao_perda = nn.CrossEntropyLoss(ignore_index=-100, reduction='none')

    logits_permutados = torch.permute(logits, dims=(0, 2, 1))
    perda = funcao_perda(logits_permutados, label_ids)

    print(f"Logits shape: {logits.shape}")
    print(f"Loss shape: {perda.shape}")
    print("\nPerda para cada Token:")
    print(perda)

Construção de Componentes

Trainer

Antes de escrever a classe Trainer, criamos uma pasta treinador/ no diretório src/ e um arquivo treinador.py para armazenar a definição da classe.

Trainer é responsável apenas pelo "treinamento": A responsabilidade central da classe Trainer é exectuar ciclos padrão de treinamento e avaliação.

__init__: Inicializa o modelo PyTorch, otimizador, função de perda, dataloader de treino, dataloader de validação, função de métrica de avaliação, diretório de saída do modelo e dispositivo de treinamento.

fit(self, epochs): Ponto de entrada principal para treinamento.

_train_one_epoch(self): Encapsula o fluxo de um ciclo de treinamento.

_train_step(self, batch): Encapsula a lógica de um passo de treinamento (forward, perda, backward).

_evaluate(self): Encapsula a lógica de avaliação.

_evaluation_step(self, batch): Encapsula a lógica de um passo de avaliação.

_save_checkpoint(self, is_best=False): Encapsula a lógica de salvamento do modelo.

Em nosso design de "montagem por componentes", embora o Trainer não receba diretamente o objeto de configuração config (para manter o desacoplamento), o config ainda é a fonte de parâmetros para todas as "peças".

import torch
from tqdm import tqdm
import os
from dataclasses import asdict

class Treinador:
    def __init__(self, modelo, otimizador, funcao_perda, dataloader_treino, dataloader_val=None, 
                 funcao_metrica_aval=None, diretorio_saida=None, dispositivo='cpu'):
        self.modelo = modelo.to(dispositivo)
        self.otimizador = otimizador
        self.funcao_perda = funcao_perda
        self.dataloader_treino = dataloader_treino
        self.dataloader_val = dataloader_val
        self.funcao_metrica_aval = funcao_metrica_aval
        self.diretorio_saida = diretorio_saida
        self.dispositivo = torch.device(dispositivo)
        
        if self.diretorio_saida:
            os.makedirs(self.diretorio_saida, exist_ok=True)

    def fit(self, epocas):
        melhor_metrica = float('inf')
        
        for epoca in range(1, epocas + 1):
            perda_treino = self._treinar_uma_epoca()
            print(f"Época {epoca} - Perda de Treino: {perda_treino:.4f}")

            metricas = self._avaliar()
            if metricas:
                print(f"Época {epoca} - Métricas de Validação: {metricas}")
                metrica_atual = metricas.get('perda')
                
                if metrica_atual < melhor_metrica:
                    melhor_metrica = metrica_atual
                    if self.diretorio_saida:
                        self._salvar_checkpoint(eh_melhor=True)
                        print(f"Novo melhor modelo salvo com perda de validação: {melhor_metrica:.4f}")

            if self.diretorio_saida:
                self._salvar_checkpoint(eh_melior=False)

    def _treinar_uma_epoca(self):
        self.modelo.train()
        perda_total = 0
        
        for lote in tqdm(self.dataloader_treino, desc=f"Treinando Época"):
            resultados = self._passo_treinamento(lote)
            perda_total += resultados['perda'].item()
        
        return perda_total / len(self.dataloader_treino)

    def _passo_treinamento(self, lote):
        lote = {k: v.to(self.dispositivo) for k, v in lote.items() if isinstance(v, torch.Tensor)}

        logits = self.modelo(token_ids=lote['token_ids'], mascara_atencao=lote['mascara_atencao'])
        
        perda = self.funcao_perda(logits.permute(0, 2, 1), lote['label_ids'])
            
        self.otimizador.zero_grad()
        perda.backward()
        self.otimizador.step()
        
        return {'perda': perda, 'logits': logits}

    def _avaliar(self):
        if self.dataloader_val is None:
            return None

        self.modelo.eval()
        perda_total = 0
        todos_logits = []
        todos_rotulos = []
        todas_mascaras_atencao = []

        with torch.no_grad():
            for lote in tqdm(self.dataloader_val, desc="Avaliando"):
                resultados = self._passo_avaliacao(lote)
                
                perda_total += resultados['perda'].item()
                todos_logits.append(resultados['logits'].cpu())
                todos_rotulos.append(lote['label_ids'].cpu())
                todas_mascaras_atencao.append(lote['mascara_atencao'].cpu())
        
        metricas = {}
        if self.funcao_metrica_aval:
            metricas = self.funcao_metrica_aval(todos_logits, todos_rotulos, todas_mascaras_atencao)
        
        metricas['perda'] = perda_total / len(self.dataloader_val)
        return metricas

    def _passo_avaliacao(self, lote):
        lote = {k: v.to(self.dispositivo) for k, v in lote.items() if isinstance(v, torch.Tensor)}
        
        logits = self.modelo(token_ids=lote['token_ids'], mascara_atencao=lote['mascara_atencao'])
        
        perda = self.funcao_perda(logits.permute(0, 2, 1), lote['label_ids'])
        
        return {'perda': perda, 'logits': logits}

    def _salvar_checkpoint(self, eh_melhor):
        estado = {'model_state_dict': self.modelo.state_dict()}
        if eh_melhor:
            torch.save(estado, os.path.join(self.diretorio_saida, 'melhor_modelo.pth'))
        torch.save(estado, os.path.join(self.diretorio_saida, 'ultimo_modelo.pth'))

Classe de Configuração
# src/configs/configs.py
import torch
from dataclasses import dataclass, field

@dataclass
class ConfiguracaoNER:
    parametros_caminho: str = "dados"
    arquivo_treino: str = "CMeEE-V2_train.json"
    arquivo_validacao: str = "CMeEE-V2_dev.json"
    arquivo_vocabulario: str = "vocabulary.json"
    arquivo_rotulos: str = "categories.json"
    diretorio_saida: str = "saida"

    parametros_treinamento: int = 32
    epocas: int = 20
    taxa_aprendizado: float = 1e-3
    dispositivo: str = field(default_factory=lambda: 'cuda' if torch.cuda.is_available() else 'cpu')
    
    parametros_modelo: int = 256
    numero_camadas_gru: int = 2

@dataclass é um decorador introduzido no Python 3.7 que simplifica a escrita de classes. Para classes de configuração como ConfiguracaoNER, ele gera automaticamente o construtor (__init__), eliminando a necessidade de escrever código manual de atribuição de parâmetros. Também gera uma representação amigável (__repr__), o que significa que print(config) exibirá claramente todos os parâmetros e valores, facilitando a depuração.

Componente de Modelo

Primeiro passo: Criar diretório de modelos

Criamos uma nova pasta chamada modelos no diretório src/.

Segundo passo: Definir classe base do modelo

Antes de construir modelos específicos, podemos criar um arquivo base.py no diretório src/modelos/ para definir uma classe base do modelo. Esta classe usa o módulo abc do Python (Abstract Base Classes) para definir uma interface的统一 para todos os modelos de NER.

Os benefícios de fazer isso são:

  • Interface forçada: Todos os modelos devem implementar um método forward que recebe os mesmos parâmetros (token_ids, mascara_atencao). Isso garante que o Treinador possa trabalhar perfeitamente com qualquer novo modelo que criarmos (como BERT-NER, LSTM-NER) sem precisar modificar o código do Treinador.
  • Melhor legibilidade e manutenibilidade: A estrutura do código fica mais clara; ao herdar o projeto, basta verificar a classe base para entender a interface dos modelos.
# src/modelos/base.py
import torch.nn as nn
from abc import ABC, abstractmethod

class RedeBaseNER(nn.Module, ABC):
    @abstractmethod
    def forward(self, token_ids, mascara_atencao):
        """
        Define a interface de forward que todos os modelos de NER devem seguir.
        
        Args:
            token_ids (torch.Tensor): [batch_size, seq_len]
            mascara_atencao (torch.Tensor): [batch_size, seq_len]

        Returns:
            torch.Tensor: Logits, [batch_size, seq_len, num_tags]
        """
        raise NotImplementedError

# src/modelos/modelo_ner.py
import torch.nn as nn
import torch.nn.utils.rnn as rnn
from .base import RedeBaseNER

class RedeBiGRUNER(RedeBaseNER):
    def __init__(self, tamanho_vocabulario, tamanho_oculto, numero_rotulos, numero_camadas_gru=1):
        super().__init__()
        # ... (implementação idêntica à seção anterior)

Componente de Carregamento de Dados

Definimos a classe DatasetNER e o DataLoader.

Componente de Tokenizador

Constrói a classe base do tokenizador para garantir compatibilidade ao trocar tokenizadores.

# src/tokenizadores/base.py
from abc import ABC, abstractmethod

class TokenizadorBase(ABC):
    @abstractmethod
    def texto_para_tokens(self, texto: str) -> list[str]:
        """Divide o texto em lista de tokens."""
        raise NotImplementedError

    @abstractmethod
    def tokens_para_ids(self, tokens: list[str]) -> list[int]:
        """Converte lista de tokens para lista de IDs."""
        raise NotImplementedError

    def codificar(self, texto: str) -> list[int]:
        """Método conveniente para codificar texto diretamente para lista de IDs."""
        tokens = self.texto_para_tokens(texto)
        return self.tokens_para_ids(tokens)

    @abstractmethod
    def obter_id_pad(self) -> int:
        """Obtém o ID do token de preenchimento."""
        raise NotImplementedError

# src/tokenizadores/tokenizador_caractere.py
from .vocabulario import Vocabulario
from .base import TokenizadorBase

def normalizar_texto(texto):
    # ... (implementação omitida) ...

class TokenizadorCaractere(TokenizadorBase):
    def __init__(self, vocab: Vocabulario):
        self.vocab = vocab

    def texto_para_tokens(self, texto: str):
        texto_normalizado = normalizar_texto(texto)
        return list(texto_normalizado)

    def tokens_para_ids(self, tokens: list[str]):
        return self.vocab.converter_tokens_para_ids(tokens)
    
    def obter_id_pad(self) -> int:
        return self.vocab.id_pad

Componente de Avaliação

A função _trans_entidade_para_tupla converte a sequência de IDs de rótulos para lista de tuplas de entidades (decodificação BMES estrita). O salvamento ocorre apenas ao encontrar E- ou S-; ao encontrar um novo B- ou O, o fragmento não concluído é descartado.

A implementação atual de calcular_metricas_nivel_entidade enfrenta dois problemas ao lidar com cenários de avaliação em lote do Treinador:

  1. Lidar com preenchimento: Em um lote, sentenças de diferentes comprimentos são preenchidas para o mesmo comprimento. Estas posições de preenchimento não devem participar da avaliação. Precisamos usar o mecanismo de mascara_atencao para filtrar todos os Tokens inválidos causados pelo preenchimento, garantindo que a avaliação seja feita apenas em trechos de sequência válidos.
  2. Rastrear origem da amostra: Ao processar múltiplas amostras de um lote, precisamos ser capazes de distinguir de qual amostra cada entidade veio. Por exemplo, a primeira e a segunda amostra do lote podem ter uma entidade do tipo 'dis' na mesma posição (0, 2). Se não distinguirmos durante a decodificação, essas duas entidades independentes serão erroneamente tratadas como a mesma ao serem armazenadas em um set. Para distinguir com precisão entidades de diferentes amostras no mesmo lote, foi desenvolvido um esquema: adicionar um ID único (ou seja, índice dentro do lote i) a cada entidade decodificada. Isso garante que cada entidade seja identificada por uma tupla única (id_amostra, tipo_entidade, posicao_inicio, posicao_fim), resolvendo fundamentalmente o problema de ambiguidade de pertencimento à entidade.
# src/metricas/metricas_entidade.py
import torch

def _trans_entidade_para_tupla(label_ids, id2rotulo):
    """
    Converte sequência de IDs de rótulos para lista de tuplas de entidades (decodificação BMES estrita).
    O salvamento ocorre apenas ao encontrar E- ou S-; ao encontrar novo B- ou O, o fragmento não concluído é descartado.
    """
    entidades = []
    entidade_atual = None

    for i, id_rotulo in enumerate(label_ids):
        rotulo = id2rotulo.get(id_rotulo.item(), 'O')

        if rotulo.startswith('B-'):
            entidade_atual = (rotulo[2:], i, i + 1)
        elif rotulo.startswith('M-'):
            if entidade_atual and entidade_atual[0] == rotulo[2:]:
                entidade_atual = (entidade_atual[0], entidade_atual[1], i + 1)
            else:
                entidade_atual = None
        elif rotulo.startswith('E-'):
            if entidade_atual and entidade_atual[0] == rotulo[2:]:
                entidade_atual = (entidade_atual[0], entidade_atual[1], i + 1)
                entidades.append(entidade_atual)
            entidade_atual = None
        elif rotulo.startswith('S-'):
            entidades.append((rotulo[2:], i, i + 1))
            entidade_atual = None
        else:
            entidade_atual = None

    return set(entidades)


def calcular_metricas_nivel_entidade(todos_ids_preditos, todos_ids_rotulos, todas_mascaras, id2rotulo):
    """
    Calcula precisão, revocação e F1 em nível de entidade.
    """
    entidades_verdadeiras = set()
    entidades_preditas = set()
    indice_amostra = 0

    for preds_lote, labels_lote, mascaras_lote in zip(todos_ids_preditos, todos_ids_rotulos, todas_mascaras):
        B = labels_lote.shape[0]
        for b in range(B):
            mascara_linha = mascaras_lote[b].bool()
            labels_linha = labels_lote[b][mascara_linha]
            preds_linha = preds_lote[b][mascara_linha]

            te = _trans_entidade_para_tupla(labels_linha, id2rotulo)
            pe = _trans_entidade_para_tupla(preds_linha, id2rotulo)

            entidades_verdadeiras.update({(indice_amostra,) + e for e in te})
            entidades_preditas.update({(indice_amostra,) + e for e in pe})
            indice_amostra += 1

    num_corretos = len(entidades_verdadeiras.intersection(entidades_preditas))
    num_verdadeiros = len(entidades_verdadeiras)
    num_preditos = len(entidades_preditas)

    precisao = num_corretos / num_preditos if num_preditos > 0 else 0.0
    revocacao = num_corretos / num_verdadeiros if num_verdadeiros > 0 else 0.0
    f1 = 2 * (precisao * revocacao) / (precisao + revocacao) if (precisao + revocacao) > 0 else 0.0
    return {"precisao": precisao, "revocacao": revocacao, "f1": f1}

Inferência e Otimização do Modelo

Decodificando a sequência prevista:

O forward do modelo finalmente produz um tensor logits com formato [batch_size, seq_len, num_tags]. Após a operação argmax, obtemos uma sequência de IDs de rótulos, por exemplo: [0, 9, 10, 11, 0, ...].

Esta sequência não é intuitiva por si só. Para realizar avaliação em nível de entidade ou apresentar os resultados da previsão ao usuário, precisamos implementar uma função de decodificação que converta esta sequência numérica em uma lista contendo informações concretas de entidades, por exemplo: [{"texto": "hipertensão arterial", "tipo": "dis", "inicio": 3, "fim": 6}]. O núcleo deste processo de decodificação é analisar os limites e tipos das entidades a partir da sequência de rótulos, usando as regras do esquema de anotação BMES.

Estratégia de decodificação:

A abordagem atual usa um modo "estrito". Qualquer sequência que não siga as especificações (por exemplo, uma entidade com apenas B- sem E-) será diretamente descartada. Esta é a prática mais comum porque garante a normalidade das entidades de saída.

Em alguns cenários de negócios específicos, uma estratégia mais "permissiva" pode ser usada. Por exemplo, se o modelo previr uma sequência B-M-O, podemos escolher outputting a parte B-M como uma entidade em vez de descartá-la completamente. A escolha desta estratégia depende do diferentes énfasis em "revocação" e "precisão" na aplicação específica, precisando ser determinada conforme as necessidades reais.

Fluxo Principle do PrevisaoNER

O objetivo do método __init__ é carregar e preparar todos os componentes necessários para inferência.

Carregar configuração: Do diretório do modelo, carrega config.json para obter hiperparâmetros do modelo e caminhos de arquivos relacionados.

Carregar vocabulário e mapeamento de rótulos: De acordo com os caminhos no arquivo de configuração, carrega vocabulary.json e tags.json, e constrói o mapeamento id2rotulo.

Carregar tokenizador: Inicializa o TokenizadorCaractere.

Inicializar modelo e carregar pesos:

  • Instancia o modelo RedeBiGRUNER de acordo com a configuração.
  • Carrega os pesos do modelo melhor_modelo.pth do diretório do modelo. Aqui precisamos usar map_location=self.dispositivo para garantir que o modelo possa ser carregado no dispositivo especificado (CPU ou GPU).
  • Chama modelo.to(self.dispositivo) para mover o modelo para o dispositivo especificado.
  • Chama modelo.eval() para mudar o modelo para o modo de avaliação, desativando Dropout e BatchNorm que são usados apenas durante o treinamento, garantindo resultados de previsão determinísticos.
# codigo/previsao.py
import torch
import json
import os
import argparse
from src.modelos.modelo_ner import RedeBiGRUNER
from src.tokenizadores.vocabulario import Vocabulario
from src.tokenizadores.tokenizador_caractere import TokenizadorCaractere
from src.utils.leitura_arquivo import carregar_json

class PrevisaoNER:
    def __init__(self, diretorio_modelo, dispositivo='cpu'):
        self.dispositivo = torch.device(dispositivo)
        
        # --- 1. Carregar arquivo de configuração para obter parâmetros do modelo ---
        caminho_config = os.path.join(diretorio_modelo, 'config.json')
        self.config = carregar_json(caminho_config)

        # --- 2. Carregar vocabulário e mapeamento de rótulos ---
        caminho_vocab = os.path.join(self.config["diretorio_dados"], self.config["arquivo_vocabulario"])
        caminho_tags = os.path.join(self.config["diretorio_dados"], self.config["arquivo_rotulos"])

        self.vocabulario = Vocabulario.carregar_de_arquivo(caminho_vocab)
        self.tokenizador = TokenizadorCaractere(self.vocabulario)
        mapeamento_tags = carregar_json(caminho_tags)
        self.id2rotulo = {v: k for k, v in mapeamento_tags.items()}

        # --- 3. Inicializar modelo e carregar pesos ---
        self.modelo = RedeBiGRUNER(
            tamanho_vocabulario=len(self.vocabulario),
            tamanho_oculto=self.config["tamanho_oculto"],
            numero_rotulos=len(mapeamento_tags),
            numero_camadas_gru=self.config["numero_camadas_gru"]
        )
        caminho_modelo = os.path.join(diretorio_modelo, 'melhor_modelo.pth')
        self.modelo.load_state_dict(torch.load(caminho_modelo, map_location=self.dispositivo)['model_state_dict'])
        self.modelo.to(self.dispositivo)
        self.modelo.eval()

    def prever(self, texto):
        tokens = self.tokenizador.texto_para_tokens(texto)
        token_ids = self.tokenizador.tokens_para_ids(tokens)
        
        # --- Pré-processamento ---
        tensor_token_ids = torch.tensor([token_ids], dtype=torch.long).to(self.dispositivo)
        mascara_atencao = torch.ones_like(tensor_token_ids)

        # --- Previsão do modelo ---
        with torch.no_grad():
            logits = self.modelo(tensor_token_ids, mascara_atencao)
        
        # --- Pós-processamento ---
        previsoes = torch.argmax(logits, dim=-1).squeeze(0)
        rotulos = [self.id2rotulo[id_.item()] for id_ in previsoes]

        return self._extrair_entidades(tokens, rotulos)

    def _extrair_entidades(self, tokens, rotulos):
        entidades = []
        entidade_atual = None
        for i, rotulo in enumerate(rotulos):
            if rotulo.startswith('B-'):
                if entidade_atual:
                    pass
                entidade_atual = {"texto": tokens[i], "tipo": rotulo[2:], "inicio": i}
            elif rotulo.startswith('M-'):
                if entidade_atual and entidade_atual["tipo"] == rotulo[2:]:
                    entidade_atual["texto"] += tokens[i]
                else:
                    entidade_atual = None
            elif rotulo.startswith('E-'):
                if entidade_atual and entidade_atual["tipo"] == rotulo[2:]:
                    entidade_atual["texto"] += tokens[i]
                    entidade_atual["fim"] = i + 1
                    entidades.append(entidade_atual)
                entidade_atual = None
            elif rotulo.startswith('S-'):
                entidade_atual = None
                entidades.append({"texto": tokens[i], "tipo": rotulo[2:], "inicio": i, "fim": i + 1})
            else:
                entidade_atual = None
        
        return entidades

def main():
    parser = argparse.ArgumentParser(description="Previsão NER")
    parser.add_argument("--diretorio_modelo", type=str, required=True, help="Diretório do modelo salvo e configuração.")
    parser.add_argument("--texto", type=str, required=True, help="Texto para prever.")
    args = parser.parse_args()

    previsao = PrevisaoNER(diretorio_modelo=args.diretorio_modelo)
    entidades = previsao.prever(args.texto)
    print(f"Texto: {args.texto}")
    print(f"Entidades: {json.dumps(entidades, ensure_ascii=False, indent=2)}")

if __name__ == "__main__":
    main()

Função de Perda Personalizada

Entropia Cruzada Ponderada

O método mais simples é "ponderar". Atribuir um peso maior aos rótulos de entidades escassos (B, M, E, S) e um peso menor ao rótulo abundante de não-entidade (O). Por exemplo, podemos definir o peso da perda de entidade como 10 e o peso da perda de não-entidade como 1. Dessa forma, durante a retropropagação, se o modelo errar um Token de entidade, recebe uma "punição" 10 vezes maior do que se errar um Token de não-entidade, forçando o modelo a prestar mais atenção ao reconhecimento de entidades.

Mineração de Amostras Difíteis

Outra abordagem é "amostragem". Entre as nombreuses amostras de não-entidade, a maioria são "amostras simples" que o modelo pode prever facilmente, contribuindo pouco para a perda e tendo pouco valor de aprendizado repetido. O que realmente tem valor são as "amostras negativas dif ícies" que o modelo tende a errar facilmente, como um Token de não-entidade que o modelo倾向于 previr como entidade.

A mineração de amostras dif ícies funciona assim: ao calcular a perda da parte de não-entidade, em vez de calcular a perda média de todos os Tokens de não-entidade, selecionamos apenas aqueles com os maiores valores de perda (Top-K) para cálculo e retropropagação. Isso equivale a filtrar, do vasto "grupo majoritário", as "amostras mais problemáticas" mais valiosas para aprendizado, melhorando a eficiência e eficácia do treinamento.

# codigo/perda/personalizada.py
import torch
import torch.nn as nn

class PerdaNER(nn.Module):
    def __init__(self, tipo_perda='cross_entropy', peso_entidade=10.0, proporcao_negativa_dificil=0.5, indice_ignorar=-100):
        super().__init__()
        self.tipo_perda = tipo_perda
        self.peso_entidade = peso_entidade
        self.proporcao_negativa_dificil = proporcao_negativa_dificil
        
        self.funcao_perda_base = nn.CrossEntropyLoss(reduction='none', ignore_index=indice_ignorar)

    def forward(self, logits, rotulos):
        if self.tipo_perda == 'weighted_ce':
            return self._entropia_cruzada_ponderada(logits, rotulos)
        elif self.tipo_perda == 'hard_negative_mining':
            return self._mineracao_amostras_dificeis(logits, rotulos)
        else:
            return self.funcao_perda_base(logits, rotulos).mean()

    def _entropia_cruzada_ponderada(self, logits, rotulos):
        perda_por_token = self.funcao_perda_base(logits, rotulos)

        mascara_entidade = (rotulos > 0).float()
        mascara_nao_entidade = (rotulos == 0).float()

        perda_entidade = torch.sum(perda_por_token * mascara_entidade) / (torch.sum(mascara_entidade) + 1e-8)
        perda_nao_entidade = torch.sum(perda_por_token * mascara_nao_entidade) / (torch.sum(mascara_nao_entidade) + 1e-8)

        perda_total = self.peso_entidade * perda_entidade + 1.0 * perda_nao_entidade
        return perda_total, perda_entidade.detach(), perda_nao_entidade.detach()

    def _mineracao_amostras_dificeis(self, logits, rotulos):
        perda_por_token = self.funcao_perda_base(logits, rotulos)

        mascara_entidade = (rotulos > 0).float()
        perda_entidade = torch.sum(perda_por_token * mascara_entidade) / (torch.sum(mascara_entidade) + 1e-8)

        mascara_nao_entidade = (rotulos == 0).float()
        perda_nao_entidade = perda_por_token * mascara_nao_entidade

        num_entidades = torch.sum(mascara_entidade).item()
        num_negativas_dificeis = int(num_entidades * self.proporcao_negativa_dificil)

        if num_negativas_dificeis == 0:
            num_nao_entidades = torch.sum(mascara_nao_entidade).item()
            num_negativas_dificeis = int(num_nao_entidades * 0.1)

        perdas_topk, _ = torch.topk(perda_nao_entidade.view(-1), k=num_negativas_dificeis)
        
        perda_negativa_dificil = torch.mean(perdas_topk)

        perda_total = self.peso_entidade * perda_entidade + 1.0 * perda_negativa_dificil

        return perda_total, perda_entidade.detach(), perda_negativa_dificil.detach()

Tags: named-entity-recognition ner bi-lstm bi-gru Pytorch

Publicado em 6-27 04:33