Visão Geral do Reconhecimento de Entidades Nomeadas
Preparação dos Dados
Iniciamos analisando a estrutura dos dados. Precisamos extrair todos os tipos de entidades dos rótulos e criar um mapeamento baseado no esquema BMES, salvando a tabela de correspondência.
import json
import os
def salvar_json(dados, caminho_arquivo):
"""
Salva dados no formato JSON indentado
"""
os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
with open(caminho_arquivo, 'w', encoding='utf-8') as f:
json.dump(dados, f, ensure_ascii=False, indent=4)
def extrair_tipos_entidade(arquivo_dados):
"""
Extrai tipos únicos de entidade de um arquivo de dados
"""
tipos = set()
with open(arquivo_dados, 'r', encoding='utf-8') as f:
dados_completos = json.load(f)
for item in dados_completos:
for entidade in item['entidades']:
tipos.add(entidade['tipo'])
return tipos
def construir_mapeamento_rotulos(arquivos_dados, arquivo_saida):
"""
Constrói mapeamento de rótulos BMES e salva em arquivo
"""
todos_tipos_entidade = set()
for caminho in arquivos_dados:
todos_tipos_entidade.update(extrair_tipos_entidade(caminho))
tipos_ordenados = sorted(list(todos_tipos_entidade))
print(f"Tipos de entidade encontrados: {tipos_ordenados}")
rotulo_para_id = {'O': 0}
for tipo_entidade in tipos_ordenados:
for prefixo in ['B', 'M', 'E', 'S']:
nome_rotulo = f"{prefixo}-{tipo_entidade}"
rotulo_para_id[nome_rotulo] = len(rotulo_para_id)
print(f"\nGerados {len(rotulo_para_id)} mapeamentos de rótulos.")
salvar_json(rotulo_para_id, arquivo_saida)
print(f"Mapeamento salvo em: {arquivo_saida}")
if __name__ == '__main__':
arquivo_treino = './dados/CMeEE-V2_train.json'
arquivo_val = './dados/CMeEE-V2_dev.json'
caminho_saida = './dados/categorias.json'
construir_mapeamento_rotulos(
arquivos_dados=[arquivo_treino, arquivo_val],
arquivo_saida=caminho_saida
)
Para o texto: normalizamos (convertendo caracteres de largura completa para largura parcial), calculamos frequência de caracteres, removemos palavras com frequência abaixo do mínimo, adicionamos tokens de preenchimento (PAD) e desconhecido (UNK). A ordenação garante reprodutibilidade do vocabulário.
import json
import os
from collections import Counter
def salvar_json(dados, caminho_arquivo):
os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
with open(caminho_arquivo, 'w', encoding='utf-8') as f:
json.dump(dados, f, ensure_ascii=False, indent=4)
def normalizar_texto(texto):
"""
Normaliza texto convertendo caracteres de largura completa para largura parcial
"""
largura_completa = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&’()*+,-./:;<=>?@[\]^_`{|}~""
largura_parcial = r"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&'" + r'()*+,-./:;<=>?@[\]^_`{|}~".'
mapeamento = str.maketrans(largura_completa, largura_parcial)
return texto.translate(mapeamento)
def criar_vocabulario_caracteres(arquivos_dados, arquivo_saida, freq_minima=1):
"""
Cria vocabulário em nível de caracteres a partir dos arquivos de dados
"""
contagem_caracteres = Counter()
for caminho in arquivos_dados:
with open(caminho, 'r', encoding='utf-8') as f:
dados_completos = json.load(f)
for item in dados_completos:
texto = normalizar_texto(item['texto'])
contagem_caracteres.update(list(texto))
caracteres_frequentes = [caractere for caractere, contagem in contagem_caracteres.items() if contagem >= freq_minima]
caracteres_frequentes.sort()
tokens_especiais = ["<PAD>", "<UNK>"]
vocabulario_final = tokens_especiais + caracteres_frequentes
print(f"Tamanho do vocabulário (freq_minima={freq_minima}): {len(vocabulario_final)}")
salvar_json(vocabulario_final, arquivo_saida)
print(f"Vocabulário salvo em: {arquivo_saida}")
if __name__ == '__main__':
arquivo_treino = './dados/CMeEE-V2_train.json'
arquivo_val = './dados/CMeEE-V2_dev.json'
caminho_saida = './dados/vocabulario.json'
criar_vocabulario_caracteres(
arquivos_dados=[arquivo_treino, arquivo_val],
arquivo_saida=caminho_saida,
freq_minima=1
)
Criamos a classe Vocabulary para gerenciar o mapeamento entre tokens e IDs, com métodos __len__ e conversor de tokens para IDs.
Criamos o Dataset, inicializando com o texto, mapeamento de rótulos e vocabulário. Implementamos os métodos __len__ e __getitem__, onde o método __getitem__ converte os dados de entrada para IDs de tokens e mapeamento de rótulos, retornando token_ids e label_ids.
Criamos o DataLoader, construindo primeiro o collate_fn para realizar o padding das sequências e criar a máscara de atenção.
import json
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
def normalizar_texto(texto):
"""
Normaliza texto convertendo caracteres de largura completa para largura parcial
"""
largura_completa = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&’()*+,-./:;<=>?@[\]^_`」|}~""
largura_parcial = r"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&'" + r'()*+,-./:;<=>?@[\]^_`{|}~".'
mapeamento = str.maketrans(largura_completa, largura_parcial)
return texto.translate(mapeamento)
class Vocabulario:
"""
Gerencia o vocabulário e o mapeamento de tokens para IDs
"""
def __init__(self, caminho_vocabulario):
with open(caminho_vocabulario, 'r', encoding='utf-8') as f:
self.tokens = json.load(f)
self.token_para_id = {token: i for i, token in enumerate(self.tokens)}
self.id_pad = self.token_para_id['<PAD>']
self.id_desconhecido = self.token_para_id['<UNK>']
def __len__(self):
return len(self.tokens)
def converter_tokens_para_ids(self, tokens):
return [self.token_para_id.get(token, self.id_desconhecido) for token in tokens]
class DatasetNER(Dataset):
"""
Processa dados de NER e os converte para formato compatível com PyTorch
"""
def __init__(self, caminho_dados, vocab: Vocabulario, mapeamento_rotulos: dict):
self.vocab = vocab
self.rotulo_para_id = mapeamento_rotulos
with open(caminho_dados, 'r', encoding='utf-8') as f:
self.registros = json.load(f)
def __len__(self):
return len(self.registros)
def __getitem__(self, idx):
registro = self.registros[idx]
texto = normalizar_texto(registro['texto'])
tokens = list(texto)
token_ids = self.vocab.converter_tokens_para_ids(tokens)
tags = ['O'] * len(tokens)
for entidade in registro.get('entidades', []):
tipo_entidade = entidade['tipo']
inicio = entidade['indice_inicio']
fim = entidade['indice_fim']
if fim >= len(tokens):
continue
if inicio == fim:
tags[inicio] = f'S-{tipo_entidade}'
else:
tags[inicio] = f'B-{tipo_entidade}'
tags[fim] = f'E-{tipo_entidade}'
for i in range(inicio + 1, fim):
tags[i] = f'M-{tipo_entidade}'
label_ids = [self.rotulo_para_id.get(tag, self.rotulo_para_id['O']) for tag in tags]
return {
"token_ids": torch.tensor(token_ids, dtype=torch.long),
"label_ids": torch.tensor(label_ids, dtype=torch.long)
}
def criar_dataloader_ner(caminho_dados, vocab, mapeamento_rotulos, tamanho_batch, embaralhar=False):
"""
Cria DataLoader para tarefa de NER
"""
dataset = DatasetNER(caminho_dados, vocab, mapeamento_rotulos)
def collate_batch(batch):
lista_token_ids = [item['token_ids'] for item in batch]
lista_label_ids = [item['label_ids'] for item in batch]
token_ids_preenchidos = pad_sequence(lista_token_ids, batch_first=True, padding_value=vocab.id_pad)
label_ids_preenchidos = pad_sequence(lista_label_ids, batch_first=True, padding_value=-100)
mascara_atencao = (token_ids_preenchidos != vocab.id_pad).long()
return {
"token_ids": token_ids_preenchidos,
"label_ids": label_ids_preenchidos,
"attention_mask": mascara_atencao
}
return DataLoader(dataset, batch_size=tamanho_batch, shuffle=embaralhar, collate_fn=collate_batch)
if __name__ == '__main__':
arquivo_treino = './dados/CMeEE-V2_train.json'
arquivo_vocabulario = './dados/vocabulario.json'
arquivo_categorias = './dados/categorias.json'
vocabulario = Vocabulario(caminho_vocabulario=arquivo_vocabulario)
with open(arquivo_categorias, 'r', encoding='utf-8') as f:
mapeamento_rotulos = json.load(f)
print("Vocabulário e mapeamento de rótulos carregados.")
dataloader_treino = criar_dataloader_ner(
caminho_dados=arquivo_treino,
vocab=vocabulario,
mapeamento_rotulos=mapeamento_rotulos,
tamanho_batch=4,
embaralhar=True
)
print("DataLoader criado.")
print("\n--- Verificando um lote ---")
lote = next(iter(dataloader_treino))
print(f" Token IDs (shape): {lote['token_ids'].shape}")
print(f" Label IDs (shape): {lote['label_ids'].shape}")
print(f" Attention Mask (shape): {lote['attention_mask'].shape}")
print(f" Token IDs (amostra): {lote['token_ids'][0][:20]}...")
print(f" Label IDs (amostra): {lote['label_ids'][0][:20]}...")
print(f" Attention Mask (amostra): {lote['attention_mask'][0][:20]}...")
nn.ModuleList vs nn.Sequential
No PyTorch, nn.ModuleList e nn.Sequential são ambos contêineres para armazenar múltiplos submódulos, mas possuem filosofias de design e casos de uso distintos:
nn.Sequential: Funciona como um pipeline automatizado onde os dados fluem automaticamente por cada camada. É adequado para pilhas lineares simples, mas não permite interações complexas entre camadas.nn.ModuleList: Age mais como uma lista Python comum, armazenando módulos sem executá-los automaticamente. Você precisa escrever loops manualmente no métodoforwardpara chamar cada camada, possibilitando adicionar lógica personalizada entre camadas (como conexões residuais).
Construção do Modelo
loss_fn = nn.CrossEntropyLoss(ignore_index=-100, reduction='none')
reduction='none': A perda padrão retorna um único valor. Ao definir este parâmetro, a perda retorna o valor para cada rótulo individualmente, formando uma matriz de perdas.
ignore_index funciona: Observa-se que nas posições de preenchimento com valor -100 em label_ids, a perda correspondente é 0. Com este método, em modelos unidirecionais não precisamos passar a máscara de sequência, mas em modelos bidirecionais isso não é possível.
torch.nn.utils.rnn.pack_padded_sequence: Esta função recebe um tensor de entrada preenchido e uma lista de comprimentos reais. Ela retorna um objeto PackedSequence, que pode ser imaginado como um pacote de dados "comprimido" onde todas as posições de preenchimento são temporariamente removidas. Ao receber este objeto especial, o módulo RNN consegue processar corretamente sequências de comprimento variável de forma eficiente.
É claro, onde há "empacotamento" há "desempacotamento". A função correspondente pad_packed_sequence é responsável por "descompactar" o objeto PackedSequence após o cálculo do RNN, convertendo-o de volta para um Tensor organizado com preenchimento.
import torch
import torch.nn as nn
import torch.nn.utils.rnn as rnn
class RedeBiGRUNER(nn.Module):
def __init__(self, tamanho_vocabulario, tamanho_oculto, numero_rotulos, numero_camadas_gru=1):
super().__init__()
self.embedding = nn.Embedding(tamanho_vocabulario, tamanho_oculto)
self.camadas_gru = nn.ModuleList()
for _ in range(numero_camadas_gru):
self.camadas_gru.append(
nn.GRU(
input_size=tamanho_oculto,
hidden_size=tamanho_oculto,
num_layers=1,
batch_first=True,
bidirectional=True
)
)
self.camada_fusao = nn.Linear(tamanho_oculto * 2, tamanho_oculto)
self.classificador = nn.Linear(tamanho_oculto, numero_rotulos)
def forward(self, token_ids, mascara_atencao):
comprimentos = mascara_atencao.sum(dim=1).cpu()
texto嵌入 = self.embedding(token_ids)
entrada_packed = rnn.pack_padded_sequence(
texto嵌入, comprimentos, batch_first=True, enforce_sorted=False
)
for camada_gru in self.camadas_gru:
saida_packed, _ = camada_gru(entrada_packed)
saida, _ = rnn.pad_packed_sequence(
saida_packed, batch_first=True, total_length=token_ids.shape[1]
)
caracteristicas = self.camada_fusao(saida)
entrada_descompactada, _ = rnn.pad_packed_sequence(
entrada_packed, batch_first=True, total_length=token_ids.shape[1]
)
entrada_atual = caracteristicas + entrada_descompactada
entrada_packed = rnn.pack_padded_sequence(
entrada_atual, comprimentos, batch_first=True, enforce_sorted=False
)
saida_final, _ = rnn.pad_packed_sequence(
entrada_packed, batch_first=True, total_length=token_ids.shape[1]
)
logits = self.classificador(saida_final)
return logits
if __name__ == '__main__':
token_ids = torch.tensor([
[210, 18, 871, 147, 0, 0, 0, 0],
[922, 2962, 842, 210, 18, 871, 147, 0]
], dtype=torch.int64)
mascara_atencao = torch.tensor([
[1, 1, 1, 1, 0, 0, 0, 0],
[1, 1, 1, 1, 1, 1, 1, 0]
], dtype=torch.int64)
label_ids = torch.tensor([
[0, 0, 0, 0, -100, -100, -100, -100],
[0, 0, 0, 0, 0, 0, 0, -100]
], dtype=torch.int64)
modelo = RedeBiGRUNER(
tamanho_vocabulario=10000,
tamanho_oculto=128,
numero_rotulos=37,
numero_camadas_gru=2
)
logits = modelo(token_ids=token_ids, mascara_atencao=mascara_atencao)
funcao_perda = nn.CrossEntropyLoss(ignore_index=-100, reduction='none')
logits_permutados = torch.permute(logits, dims=(0, 2, 1))
perda = funcao_perda(logits_permutados, label_ids)
print(f"Logits shape: {logits.shape}")
print(f"Loss shape: {perda.shape}")
print("\nPerda para cada Token:")
print(perda)
Construção de Componentes
Trainer
Antes de escrever a classe Trainer, criamos uma pasta treinador/ no diretório src/ e um arquivo treinador.py para armazenar a definição da classe.
Trainer é responsável apenas pelo "treinamento": A responsabilidade central da classe Trainer é exectuar ciclos padrão de treinamento e avaliação.
__init__: Inicializa o modelo PyTorch, otimizador, função de perda, dataloader de treino, dataloader de validação, função de métrica de avaliação, diretório de saída do modelo e dispositivo de treinamento.
fit(self, epochs): Ponto de entrada principal para treinamento.
_train_one_epoch(self): Encapsula o fluxo de um ciclo de treinamento.
_train_step(self, batch): Encapsula a lógica de um passo de treinamento (forward, perda, backward).
_evaluate(self): Encapsula a lógica de avaliação.
_evaluation_step(self, batch): Encapsula a lógica de um passo de avaliação.
_save_checkpoint(self, is_best=False): Encapsula a lógica de salvamento do modelo.
Em nosso design de "montagem por componentes", embora o Trainer não receba diretamente o objeto de configuração config (para manter o desacoplamento), o config ainda é a fonte de parâmetros para todas as "peças".
import torch
from tqdm import tqdm
import os
from dataclasses import asdict
class Treinador:
def __init__(self, modelo, otimizador, funcao_perda, dataloader_treino, dataloader_val=None,
funcao_metrica_aval=None, diretorio_saida=None, dispositivo='cpu'):
self.modelo = modelo.to(dispositivo)
self.otimizador = otimizador
self.funcao_perda = funcao_perda
self.dataloader_treino = dataloader_treino
self.dataloader_val = dataloader_val
self.funcao_metrica_aval = funcao_metrica_aval
self.diretorio_saida = diretorio_saida
self.dispositivo = torch.device(dispositivo)
if self.diretorio_saida:
os.makedirs(self.diretorio_saida, exist_ok=True)
def fit(self, epocas):
melhor_metrica = float('inf')
for epoca in range(1, epocas + 1):
perda_treino = self._treinar_uma_epoca()
print(f"Época {epoca} - Perda de Treino: {perda_treino:.4f}")
metricas = self._avaliar()
if metricas:
print(f"Época {epoca} - Métricas de Validação: {metricas}")
metrica_atual = metricas.get('perda')
if metrica_atual < melhor_metrica:
melhor_metrica = metrica_atual
if self.diretorio_saida:
self._salvar_checkpoint(eh_melhor=True)
print(f"Novo melhor modelo salvo com perda de validação: {melhor_metrica:.4f}")
if self.diretorio_saida:
self._salvar_checkpoint(eh_melior=False)
def _treinar_uma_epoca(self):
self.modelo.train()
perda_total = 0
for lote in tqdm(self.dataloader_treino, desc=f"Treinando Época"):
resultados = self._passo_treinamento(lote)
perda_total += resultados['perda'].item()
return perda_total / len(self.dataloader_treino)
def _passo_treinamento(self, lote):
lote = {k: v.to(self.dispositivo) for k, v in lote.items() if isinstance(v, torch.Tensor)}
logits = self.modelo(token_ids=lote['token_ids'], mascara_atencao=lote['mascara_atencao'])
perda = self.funcao_perda(logits.permute(0, 2, 1), lote['label_ids'])
self.otimizador.zero_grad()
perda.backward()
self.otimizador.step()
return {'perda': perda, 'logits': logits}
def _avaliar(self):
if self.dataloader_val is None:
return None
self.modelo.eval()
perda_total = 0
todos_logits = []
todos_rotulos = []
todas_mascaras_atencao = []
with torch.no_grad():
for lote in tqdm(self.dataloader_val, desc="Avaliando"):
resultados = self._passo_avaliacao(lote)
perda_total += resultados['perda'].item()
todos_logits.append(resultados['logits'].cpu())
todos_rotulos.append(lote['label_ids'].cpu())
todas_mascaras_atencao.append(lote['mascara_atencao'].cpu())
metricas = {}
if self.funcao_metrica_aval:
metricas = self.funcao_metrica_aval(todos_logits, todos_rotulos, todas_mascaras_atencao)
metricas['perda'] = perda_total / len(self.dataloader_val)
return metricas
def _passo_avaliacao(self, lote):
lote = {k: v.to(self.dispositivo) for k, v in lote.items() if isinstance(v, torch.Tensor)}
logits = self.modelo(token_ids=lote['token_ids'], mascara_atencao=lote['mascara_atencao'])
perda = self.funcao_perda(logits.permute(0, 2, 1), lote['label_ids'])
return {'perda': perda, 'logits': logits}
def _salvar_checkpoint(self, eh_melhor):
estado = {'model_state_dict': self.modelo.state_dict()}
if eh_melhor:
torch.save(estado, os.path.join(self.diretorio_saida, 'melhor_modelo.pth'))
torch.save(estado, os.path.join(self.diretorio_saida, 'ultimo_modelo.pth'))
Classe de Configuração
# src/configs/configs.py
import torch
from dataclasses import dataclass, field
@dataclass
class ConfiguracaoNER:
parametros_caminho: str = "dados"
arquivo_treino: str = "CMeEE-V2_train.json"
arquivo_validacao: str = "CMeEE-V2_dev.json"
arquivo_vocabulario: str = "vocabulary.json"
arquivo_rotulos: str = "categories.json"
diretorio_saida: str = "saida"
parametros_treinamento: int = 32
epocas: int = 20
taxa_aprendizado: float = 1e-3
dispositivo: str = field(default_factory=lambda: 'cuda' if torch.cuda.is_available() else 'cpu')
parametros_modelo: int = 256
numero_camadas_gru: int = 2
@dataclass é um decorador introduzido no Python 3.7 que simplifica a escrita de classes. Para classes de configuração como ConfiguracaoNER, ele gera automaticamente o construtor (__init__), eliminando a necessidade de escrever código manual de atribuição de parâmetros. Também gera uma representação amigável (__repr__), o que significa que print(config) exibirá claramente todos os parâmetros e valores, facilitando a depuração.
Componente de Modelo
Primeiro passo: Criar diretório de modelos
Criamos uma nova pasta chamada modelos no diretório src/.
Segundo passo: Definir classe base do modelo
Antes de construir modelos específicos, podemos criar um arquivo base.py no diretório src/modelos/ para definir uma classe base do modelo. Esta classe usa o módulo abc do Python (Abstract Base Classes) para definir uma interface的统一 para todos os modelos de NER.
Os benefícios de fazer isso são:
- Interface forçada: Todos os modelos devem implementar um método
forwardque recebe os mesmos parâmetros (token_ids,mascara_atencao). Isso garante que oTreinadorpossa trabalhar perfeitamente com qualquer novo modelo que criarmos (como BERT-NER, LSTM-NER) sem precisar modificar o código doTreinador. - Melhor legibilidade e manutenibilidade: A estrutura do código fica mais clara; ao herdar o projeto, basta verificar a classe base para entender a interface dos modelos.
# src/modelos/base.py
import torch.nn as nn
from abc import ABC, abstractmethod
class RedeBaseNER(nn.Module, ABC):
@abstractmethod
def forward(self, token_ids, mascara_atencao):
"""
Define a interface de forward que todos os modelos de NER devem seguir.
Args:
token_ids (torch.Tensor): [batch_size, seq_len]
mascara_atencao (torch.Tensor): [batch_size, seq_len]
Returns:
torch.Tensor: Logits, [batch_size, seq_len, num_tags]
"""
raise NotImplementedError
# src/modelos/modelo_ner.py
import torch.nn as nn
import torch.nn.utils.rnn as rnn
from .base import RedeBaseNER
class RedeBiGRUNER(RedeBaseNER):
def __init__(self, tamanho_vocabulario, tamanho_oculto, numero_rotulos, numero_camadas_gru=1):
super().__init__()
# ... (implementação idêntica à seção anterior)
Componente de Carregamento de Dados
Definimos a classe DatasetNER e o DataLoader.
Componente de Tokenizador
Constrói a classe base do tokenizador para garantir compatibilidade ao trocar tokenizadores.
# src/tokenizadores/base.py
from abc import ABC, abstractmethod
class TokenizadorBase(ABC):
@abstractmethod
def texto_para_tokens(self, texto: str) -> list[str]:
"""Divide o texto em lista de tokens."""
raise NotImplementedError
@abstractmethod
def tokens_para_ids(self, tokens: list[str]) -> list[int]:
"""Converte lista de tokens para lista de IDs."""
raise NotImplementedError
def codificar(self, texto: str) -> list[int]:
"""Método conveniente para codificar texto diretamente para lista de IDs."""
tokens = self.texto_para_tokens(texto)
return self.tokens_para_ids(tokens)
@abstractmethod
def obter_id_pad(self) -> int:
"""Obtém o ID do token de preenchimento."""
raise NotImplementedError
# src/tokenizadores/tokenizador_caractere.py
from .vocabulario import Vocabulario
from .base import TokenizadorBase
def normalizar_texto(texto):
# ... (implementação omitida) ...
class TokenizadorCaractere(TokenizadorBase):
def __init__(self, vocab: Vocabulario):
self.vocab = vocab
def texto_para_tokens(self, texto: str):
texto_normalizado = normalizar_texto(texto)
return list(texto_normalizado)
def tokens_para_ids(self, tokens: list[str]):
return self.vocab.converter_tokens_para_ids(tokens)
def obter_id_pad(self) -> int:
return self.vocab.id_pad
Componente de Avaliação
A função _trans_entidade_para_tupla converte a sequência de IDs de rótulos para lista de tuplas de entidades (decodificação BMES estrita). O salvamento ocorre apenas ao encontrar E- ou S-; ao encontrar um novo B- ou O, o fragmento não concluído é descartado.
A implementação atual de calcular_metricas_nivel_entidade enfrenta dois problemas ao lidar com cenários de avaliação em lote do Treinador:
- Lidar com preenchimento: Em um lote, sentenças de diferentes comprimentos são preenchidas para o mesmo comprimento. Estas posições de preenchimento não devem participar da avaliação. Precisamos usar o mecanismo de
mascara_atencaopara filtrar todos os Tokens inválidos causados pelo preenchimento, garantindo que a avaliação seja feita apenas em trechos de sequência válidos. - Rastrear origem da amostra: Ao processar múltiplas amostras de um lote, precisamos ser capazes de distinguir de qual amostra cada entidade veio. Por exemplo, a primeira e a segunda amostra do lote podem ter uma entidade do tipo 'dis' na mesma posição
(0, 2). Se não distinguirmos durante a decodificação, essas duas entidades independentes serão erroneamente tratadas como a mesma ao serem armazenadas em umset. Para distinguir com precisão entidades de diferentes amostras no mesmo lote, foi desenvolvido um esquema: adicionar um ID único (ou seja, índice dentro do lotei) a cada entidade decodificada. Isso garante que cada entidade seja identificada por uma tupla única(id_amostra, tipo_entidade, posicao_inicio, posicao_fim), resolvendo fundamentalmente o problema de ambiguidade de pertencimento à entidade.
# src/metricas/metricas_entidade.py
import torch
def _trans_entidade_para_tupla(label_ids, id2rotulo):
"""
Converte sequência de IDs de rótulos para lista de tuplas de entidades (decodificação BMES estrita).
O salvamento ocorre apenas ao encontrar E- ou S-; ao encontrar novo B- ou O, o fragmento não concluído é descartado.
"""
entidades = []
entidade_atual = None
for i, id_rotulo in enumerate(label_ids):
rotulo = id2rotulo.get(id_rotulo.item(), 'O')
if rotulo.startswith('B-'):
entidade_atual = (rotulo[2:], i, i + 1)
elif rotulo.startswith('M-'):
if entidade_atual and entidade_atual[0] == rotulo[2:]:
entidade_atual = (entidade_atual[0], entidade_atual[1], i + 1)
else:
entidade_atual = None
elif rotulo.startswith('E-'):
if entidade_atual and entidade_atual[0] == rotulo[2:]:
entidade_atual = (entidade_atual[0], entidade_atual[1], i + 1)
entidades.append(entidade_atual)
entidade_atual = None
elif rotulo.startswith('S-'):
entidades.append((rotulo[2:], i, i + 1))
entidade_atual = None
else:
entidade_atual = None
return set(entidades)
def calcular_metricas_nivel_entidade(todos_ids_preditos, todos_ids_rotulos, todas_mascaras, id2rotulo):
"""
Calcula precisão, revocação e F1 em nível de entidade.
"""
entidades_verdadeiras = set()
entidades_preditas = set()
indice_amostra = 0
for preds_lote, labels_lote, mascaras_lote in zip(todos_ids_preditos, todos_ids_rotulos, todas_mascaras):
B = labels_lote.shape[0]
for b in range(B):
mascara_linha = mascaras_lote[b].bool()
labels_linha = labels_lote[b][mascara_linha]
preds_linha = preds_lote[b][mascara_linha]
te = _trans_entidade_para_tupla(labels_linha, id2rotulo)
pe = _trans_entidade_para_tupla(preds_linha, id2rotulo)
entidades_verdadeiras.update({(indice_amostra,) + e for e in te})
entidades_preditas.update({(indice_amostra,) + e for e in pe})
indice_amostra += 1
num_corretos = len(entidades_verdadeiras.intersection(entidades_preditas))
num_verdadeiros = len(entidades_verdadeiras)
num_preditos = len(entidades_preditas)
precisao = num_corretos / num_preditos if num_preditos > 0 else 0.0
revocacao = num_corretos / num_verdadeiros if num_verdadeiros > 0 else 0.0
f1 = 2 * (precisao * revocacao) / (precisao + revocacao) if (precisao + revocacao) > 0 else 0.0
return {"precisao": precisao, "revocacao": revocacao, "f1": f1}
Inferência e Otimização do Modelo
Decodificando a sequência prevista:
O forward do modelo finalmente produz um tensor logits com formato [batch_size, seq_len, num_tags]. Após a operação argmax, obtemos uma sequência de IDs de rótulos, por exemplo: [0, 9, 10, 11, 0, ...].
Esta sequência não é intuitiva por si só. Para realizar avaliação em nível de entidade ou apresentar os resultados da previsão ao usuário, precisamos implementar uma função de decodificação que converta esta sequência numérica em uma lista contendo informações concretas de entidades, por exemplo: [{"texto": "hipertensão arterial", "tipo": "dis", "inicio": 3, "fim": 6}]. O núcleo deste processo de decodificação é analisar os limites e tipos das entidades a partir da sequência de rótulos, usando as regras do esquema de anotação BMES.
Estratégia de decodificação:
A abordagem atual usa um modo "estrito". Qualquer sequência que não siga as especificações (por exemplo, uma entidade com apenas B- sem E-) será diretamente descartada. Esta é a prática mais comum porque garante a normalidade das entidades de saída.
Em alguns cenários de negócios específicos, uma estratégia mais "permissiva" pode ser usada. Por exemplo, se o modelo previr uma sequência B-M-O, podemos escolher outputting a parte B-M como uma entidade em vez de descartá-la completamente. A escolha desta estratégia depende do diferentes énfasis em "revocação" e "precisão" na aplicação específica, precisando ser determinada conforme as necessidades reais.
Fluxo Principle do PrevisaoNER
O objetivo do método __init__ é carregar e preparar todos os componentes necessários para inferência.
Carregar configuração: Do diretório do modelo, carrega config.json para obter hiperparâmetros do modelo e caminhos de arquivos relacionados.
Carregar vocabulário e mapeamento de rótulos: De acordo com os caminhos no arquivo de configuração, carrega vocabulary.json e tags.json, e constrói o mapeamento id2rotulo.
Carregar tokenizador: Inicializa o TokenizadorCaractere.
Inicializar modelo e carregar pesos:
- Instancia o modelo
RedeBiGRUNERde acordo com a configuração. - Carrega os pesos do modelo
melhor_modelo.pthdo diretório do modelo. Aqui precisamos usarmap_location=self.dispositivopara garantir que o modelo possa ser carregado no dispositivo especificado (CPU ou GPU). - Chama
modelo.to(self.dispositivo)para mover o modelo para o dispositivo especificado. - Chama
modelo.eval()para mudar o modelo para o modo de avaliação, desativando Dropout e BatchNorm que são usados apenas durante o treinamento, garantindo resultados de previsão determinísticos.
# codigo/previsao.py
import torch
import json
import os
import argparse
from src.modelos.modelo_ner import RedeBiGRUNER
from src.tokenizadores.vocabulario import Vocabulario
from src.tokenizadores.tokenizador_caractere import TokenizadorCaractere
from src.utils.leitura_arquivo import carregar_json
class PrevisaoNER:
def __init__(self, diretorio_modelo, dispositivo='cpu'):
self.dispositivo = torch.device(dispositivo)
# --- 1. Carregar arquivo de configuração para obter parâmetros do modelo ---
caminho_config = os.path.join(diretorio_modelo, 'config.json')
self.config = carregar_json(caminho_config)
# --- 2. Carregar vocabulário e mapeamento de rótulos ---
caminho_vocab = os.path.join(self.config["diretorio_dados"], self.config["arquivo_vocabulario"])
caminho_tags = os.path.join(self.config["diretorio_dados"], self.config["arquivo_rotulos"])
self.vocabulario = Vocabulario.carregar_de_arquivo(caminho_vocab)
self.tokenizador = TokenizadorCaractere(self.vocabulario)
mapeamento_tags = carregar_json(caminho_tags)
self.id2rotulo = {v: k for k, v in mapeamento_tags.items()}
# --- 3. Inicializar modelo e carregar pesos ---
self.modelo = RedeBiGRUNER(
tamanho_vocabulario=len(self.vocabulario),
tamanho_oculto=self.config["tamanho_oculto"],
numero_rotulos=len(mapeamento_tags),
numero_camadas_gru=self.config["numero_camadas_gru"]
)
caminho_modelo = os.path.join(diretorio_modelo, 'melhor_modelo.pth')
self.modelo.load_state_dict(torch.load(caminho_modelo, map_location=self.dispositivo)['model_state_dict'])
self.modelo.to(self.dispositivo)
self.modelo.eval()
def prever(self, texto):
tokens = self.tokenizador.texto_para_tokens(texto)
token_ids = self.tokenizador.tokens_para_ids(tokens)
# --- Pré-processamento ---
tensor_token_ids = torch.tensor([token_ids], dtype=torch.long).to(self.dispositivo)
mascara_atencao = torch.ones_like(tensor_token_ids)
# --- Previsão do modelo ---
with torch.no_grad():
logits = self.modelo(tensor_token_ids, mascara_atencao)
# --- Pós-processamento ---
previsoes = torch.argmax(logits, dim=-1).squeeze(0)
rotulos = [self.id2rotulo[id_.item()] for id_ in previsoes]
return self._extrair_entidades(tokens, rotulos)
def _extrair_entidades(self, tokens, rotulos):
entidades = []
entidade_atual = None
for i, rotulo in enumerate(rotulos):
if rotulo.startswith('B-'):
if entidade_atual:
pass
entidade_atual = {"texto": tokens[i], "tipo": rotulo[2:], "inicio": i}
elif rotulo.startswith('M-'):
if entidade_atual and entidade_atual["tipo"] == rotulo[2:]:
entidade_atual["texto"] += tokens[i]
else:
entidade_atual = None
elif rotulo.startswith('E-'):
if entidade_atual and entidade_atual["tipo"] == rotulo[2:]:
entidade_atual["texto"] += tokens[i]
entidade_atual["fim"] = i + 1
entidades.append(entidade_atual)
entidade_atual = None
elif rotulo.startswith('S-'):
entidade_atual = None
entidades.append({"texto": tokens[i], "tipo": rotulo[2:], "inicio": i, "fim": i + 1})
else:
entidade_atual = None
return entidades
def main():
parser = argparse.ArgumentParser(description="Previsão NER")
parser.add_argument("--diretorio_modelo", type=str, required=True, help="Diretório do modelo salvo e configuração.")
parser.add_argument("--texto", type=str, required=True, help="Texto para prever.")
args = parser.parse_args()
previsao = PrevisaoNER(diretorio_modelo=args.diretorio_modelo)
entidades = previsao.prever(args.texto)
print(f"Texto: {args.texto}")
print(f"Entidades: {json.dumps(entidades, ensure_ascii=False, indent=2)}")
if __name__ == "__main__":
main()
Função de Perda Personalizada
Entropia Cruzada Ponderada
O método mais simples é "ponderar". Atribuir um peso maior aos rótulos de entidades escassos (B, M, E, S) e um peso menor ao rótulo abundante de não-entidade (O). Por exemplo, podemos definir o peso da perda de entidade como 10 e o peso da perda de não-entidade como 1. Dessa forma, durante a retropropagação, se o modelo errar um Token de entidade, recebe uma "punição" 10 vezes maior do que se errar um Token de não-entidade, forçando o modelo a prestar mais atenção ao reconhecimento de entidades.
Mineração de Amostras Difíteis
Outra abordagem é "amostragem". Entre as nombreuses amostras de não-entidade, a maioria são "amostras simples" que o modelo pode prever facilmente, contribuindo pouco para a perda e tendo pouco valor de aprendizado repetido. O que realmente tem valor são as "amostras negativas dif ícies" que o modelo tende a errar facilmente, como um Token de não-entidade que o modelo倾向于 previr como entidade.
A mineração de amostras dif ícies funciona assim: ao calcular a perda da parte de não-entidade, em vez de calcular a perda média de todos os Tokens de não-entidade, selecionamos apenas aqueles com os maiores valores de perda (Top-K) para cálculo e retropropagação. Isso equivale a filtrar, do vasto "grupo majoritário", as "amostras mais problemáticas" mais valiosas para aprendizado, melhorando a eficiência e eficácia do treinamento.
# codigo/perda/personalizada.py
import torch
import torch.nn as nn
class PerdaNER(nn.Module):
def __init__(self, tipo_perda='cross_entropy', peso_entidade=10.0, proporcao_negativa_dificil=0.5, indice_ignorar=-100):
super().__init__()
self.tipo_perda = tipo_perda
self.peso_entidade = peso_entidade
self.proporcao_negativa_dificil = proporcao_negativa_dificil
self.funcao_perda_base = nn.CrossEntropyLoss(reduction='none', ignore_index=indice_ignorar)
def forward(self, logits, rotulos):
if self.tipo_perda == 'weighted_ce':
return self._entropia_cruzada_ponderada(logits, rotulos)
elif self.tipo_perda == 'hard_negative_mining':
return self._mineracao_amostras_dificeis(logits, rotulos)
else:
return self.funcao_perda_base(logits, rotulos).mean()
def _entropia_cruzada_ponderada(self, logits, rotulos):
perda_por_token = self.funcao_perda_base(logits, rotulos)
mascara_entidade = (rotulos > 0).float()
mascara_nao_entidade = (rotulos == 0).float()
perda_entidade = torch.sum(perda_por_token * mascara_entidade) / (torch.sum(mascara_entidade) + 1e-8)
perda_nao_entidade = torch.sum(perda_por_token * mascara_nao_entidade) / (torch.sum(mascara_nao_entidade) + 1e-8)
perda_total = self.peso_entidade * perda_entidade + 1.0 * perda_nao_entidade
return perda_total, perda_entidade.detach(), perda_nao_entidade.detach()
def _mineracao_amostras_dificeis(self, logits, rotulos):
perda_por_token = self.funcao_perda_base(logits, rotulos)
mascara_entidade = (rotulos > 0).float()
perda_entidade = torch.sum(perda_por_token * mascara_entidade) / (torch.sum(mascara_entidade) + 1e-8)
mascara_nao_entidade = (rotulos == 0).float()
perda_nao_entidade = perda_por_token * mascara_nao_entidade
num_entidades = torch.sum(mascara_entidade).item()
num_negativas_dificeis = int(num_entidades * self.proporcao_negativa_dificil)
if num_negativas_dificeis == 0:
num_nao_entidades = torch.sum(mascara_nao_entidade).item()
num_negativas_dificeis = int(num_nao_entidades * 0.1)
perdas_topk, _ = torch.topk(perda_nao_entidade.view(-1), k=num_negativas_dificeis)
perda_negativa_dificil = torch.mean(perdas_topk)
perda_total = self.peso_entidade * perda_entidade + 1.0 * perda_negativa_dificil
return perda_total, perda_entidade.detach(), perda_negativa_dificil.detach()