Detectando, Avaliando e Redigindo PII em Logs usando NLP e Correspondência de Padrões - Parte 1

Como utilizar o Elasticsearch e NLP para detectar e avaliar informações pessoais em logs.

Índice

Introdução

Ferramentas e Tecnicas

Detecção por Reconhecimento de Entidades Nomeadas (NER)

Detecção por Correspondência de Padrões

Amostragem de Logs

Pipeline de Ingestão como Processamento Combinável

Construindo o Pipeline de Processamento

Amostragem de Logs + Pipeline de Ingestão Combinável

Carregamento, Configuração e Execução do Pipeline NER

Carregando o Modelo NER

Implantando e Iniciando o Modelo NER

Resultados

Resultados do Modelo NER

Painel de Avaliação de PII

Próximos Passos

Apêndice de Carregamento de Dados

Código

Recarregando os Logs

Introdução:

A presença predominante de logs de alta entropia em sistemas distribuídos aumenta significativamente o risco de vazamento de PII (Personally Identifiable Information - Informações de Identificação Pessoal) para nossos logs, o que pode resultar em problemas de segurança e conformidade. Este blog em duas partes explora tarefas essenciais para identificar e gerenciar esse problema usando o Elastic Stack. Vamos explorar como usar NLP (Natural Language Processing - Processamento de Linguagem Natural) e correspondência de padrões para detectar, avaliar e, quando possível, remover PII dos logs que estão sendo ingeridos no Elasticsearch.

Na parte 1 deste blog, cobriremos:

  • Revisão das técnicas e ferramentas disponíveis para gerenciar PII em logs
  • Compreensão do papel do NLP/NER na detecção de PII
  • Construção de pipelines de processamento combináveis para detectar e avaliar PII
  • Amostragem de logs e execução através do modelo NER
  • Avaliação dos resultados do modelo NER

Na parte 2 deste blog, cobriremos:

  • Uso do NER e processador de redação para editar PII
  • Aplicação de segurança em nível de campo para controlar acesso a dados não editados
  • Aprimoramento de painéis e alertas
  • Considerações de produção e escalabilidade
  • Como executar esses processos em dados de entrada ou históricos

Este é o fluxo geral que construiremos nas 2 partes:

Todo o código deste exercício está disponível em: https://github.com/bvader/elastic-pii.

Ferramentas e Tecnicas

Usaremos quatro funcionalidades genéricas neste exercício.

  • Reconhecimento de Entidades Nomeadas (NER)
  • Detecção por Correspondência de Padrões
  • Amostragem de Logs
  • Pipeline de Ingestão como Processamento Combinável

Reconhecimento de Entidades Nomeadas (NER)

NER é uma subtarefa do processamento de linguagem natural (NLP) que envolve identificar entidades nomeadas em texto não estruturado e categorizá-las em categorias predefinidas, como:

  • Pessoa (Person): Nomes de indivíduos, incluindo celebridades, políticos e figuras históricas.
  • Organização (Organization): Nomes de empresas, instituições e organizações.
  • Localização (Location): Localizações geográficas, incluindo cidades, países e pontos turísticos.
  • Evento (Event): Nomes de eventos, incluindo conferências, reuniões e festivais.

Para nosso caso de uso de PII, selecionaremos o modelo BERT NER básico bert-base-NER, que pode ser baixado do Hugging Face e carregado como modelo treinado no Elasticsearch.

Nota Importante: Modelos NER/NLP são intensivos em CPU e custosos para executar em escala; portanto, queremos usar técnicas de amostragem para entender o risco em nossos logs sem enviar o volume completo de logs através do modelo NER. Discutiremos o desempenho e a escalabilidade do modelo NER na parte 2 do blog.

Detecção por Correspondência de Padrões

Além de usar NER, a correspondência de padrões com expressões regulares também é uma ferramenta poderosa para detectar e editar PII com base em padrões comuns. O processador redact do Elasticsearch foi construído especificamente para esse caso de uso.

Amostragem de Logs

Considerando o impacto de desempenho do NER e o fato de que podemos estar ingerindo grandes volumes de logs no Elasticsearch, faz sentido amostrar os logs de entrada. Construiremos um amostrador de logs simples para lograr isso.

Pipeline de Ingestão como Processamento Combinável

Criaremos múltiplos pipelines, cada um focado em uma funcionalidade específica, e criaremos um pipeline de ingestão principal para coordenar todo o processo.

Construindo o Pipeline de Processamneto

Amostragem de Logs + Pipeline de Ingestão Combinável

A primeira coisa que faremos é configurar um amostrador para amostrar nossos logs. Este pipeline de ingestão usa apenas uma taxa de amostragem entre 0 (sem logs) e 10000 (todos os logs), permitindo taxas de amostragem de até ~0,01%, e marca os logs amostrados com sample.sampled: true. O processamento adicional dos logs será conduzido pelo valor de sample.sampled. O sample.sample_rate pode ser definido aqui ou passado pelo pipeline de orquestração.

Este comando deve ser executado a partir do Kibana -> Dev Tools

Os três blocos de código a seguir podem ser encontrados aqui.

Código do pipeline de amostragem de logs:

# pipeline-sampler-logs - parte 1
DELETE _ingest/pipeline/pipeline-sampler-logs
PUT _ingest/pipeline/pipeline-sampler-logs
{
  "processors": [
    {
      "set": {
        "description": "Define Taxa de Amostragem 0 Nenhum 10000 todos permite precisão de 0.01%",
        "if": "ctx.sample.sample_rate == null",
        "field": "sample.sample_rate",
        "value": 10000
      }
    },
    {
      "set": {
        "description": "Determina se mantém documentos não amostrados",
        "if": "ctx.sample.keep_unsampled == null",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "set": {
        "field": "sample.sampled",
        "value": false
      }
    },
    {
      "script": {
        "source": """ Random r = new Random();
        ctx.sample.random = r.nextInt(params.max); """,
        "params": {
          "max": 10000
        }
      }
    },
    {
      "set": {
        "if": "ctx.sample.random <= ctx.sample.sample_rate",
        "field": "sample.sampled",
        "value": true
      }
    },
    {
      "drop": {
         "description": "Descarta documento não amostrado se aplicável",
        "if": "ctx.sample.keep_unsampled == false && ctx.sample.sampled == false"
      }
    }
  ]
}


Agora, vamos testar o amostrador de logs. Construiremos a primeira parte do pipeline combinável. Enviaremos logs para o data stream logs-generic-default. Com isso em mente, criaremos o pipeline logs@custom ingestion que será invoked automaticamente usando a estrutura de data stream para personalização. Adicionaremos uma camada de abstração adicional para que você possa aplicar esse processamento de PII a outros data streams.

A seguir, criaremos o pipeline process-pii. Este é o pipeline de processamento principal onde orquestraremos os componentes do processamento de PII. Na primeira etapa, simplesmente aplicaremos a lógica de amostragem. Observe que definimos a taxa de amostragem como 100, equivalente a 10% dos logs.

Código do pipeline process-pii:

# Pipeline process-pii - parte 1
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Definir como true para habilitar amostragem, caso contrário false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Definir Taxa de Amostragem 0 Nenhum 10000 todos permite precisão de 0.01%",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": "description": "Definir como false se deseja descartar dados não amostrados, útil para reindexação de dados históricos",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "pipeline-sampler-logs",
        "ignore_failure": true
      }
    }
  ]
}


Por fim, criamos o logs@custom que simplesmente invocará nosso pipeline process-pii com base no data_stream.dataset correto.

Código do pipeline logs@custom

# logs@custom pipeline - parte 1
DELETE _ingest/pipeline/logs@custom
PUT _ingest/pipeline/logs@custom
{
  "processors": [
    {
      "set": {
        "field": "pipelinetoplevel",
        "value": "logs@custom"
      }
    },
        {
      "set": {
        "field": "pipelinetoplevelinfo",
        "value": "{{{data_stream.dataset}}}"
      }
    },
    {
      "pipeline": {
        "description" : "Chama o pipeline process-pii no dataset correto",
        "if": "ctx?.data_stream?.dataset == 'pii'", 
        "name": "process-pii"
      }
    }
  ]
}


Agora, vamos testar para ver como a amostragem está funcionando.

Carregue os dados conforme instruções no Apêndice de Carregamento de Dados. Primeiro, usaremos dados de exemplo, e mais tarde discutiremos como testar com dados de entrada ou históricos no final deste blog.

Se você usar o filtro KQL data_stream.dataset : pii e Breakdown by sample.sampled no Observability -> Logs -> Logs Explorer, deverá ver uma divisão de aproximadamente 10%.

Neste ponto, temos um pipeline de ingestão combinável para "amostrar" logs. Como bônus, você também pode usar este amostrador de logs para qualquer outro caso de uso.

Carregamento, Configuração e Execução do Pipeline NER

Carregando o Modelo NER

Você precisa de um nó de machine learning para executar o modelo NER. Neste exercício, usamos o Elastic Cloud Hosted Deployment na AWS com arquitetura CPU Optimized (ARM). A inferência NER será executada no nó de ML AWS c5d. Haverá opções de GPU no futuro, mas hoje ficaremos com a arquitetura CPU.

Este exercício usará um único c5d com 8 GB de RAM e 4.2 vCPU, podendo chegar a 8.4 vCPU.

Consulte a documentação oficial sobre como importar modelos NLP treinados para o Elasticsearch para instruções completas sobre upload, configuração e implantação de modelos.

A maneira mais rápida de obter o modelo é usando o método Docker Eland.

O seguinte comando carregará o modelo no Elasticsearch, mas não o iniciaremos. Faremos isso na próxima etapa.

docker run -it --rm --network host docker.elastic.co/eland/eland \
  eland_import_hub_model \
  --url https://mydeployment.es.us-west-1.aws.found.io:443/ \
  -u elastic -p password \
  --hub-model-id dslim/bert-base-NER --task-type ner

Implantando e Iniciando o Modelo NER

Em geral, para melhorar o desempenho de ingestão, a taxa de transferência pode ser aumentada adicionando mais alocações à implantação. Para melhorar a velocidade de pesquisa, aumente o número de threads por alocação.

Para dimensionar a ingestão, nos concentraremos em expandir as alocações do modelo implantado. Mais informações sobre este tópico podem ser encontradas aqui. O número de alocações deve ser menor que os processadores de alocação disponíveis em cada nó (núcleos, não vCPU).

Implante e inicie o modelo NER. Usaremos a API de implantação de modelo treinado para fazer isso

Configuraremos o seguinte:

  • 4 alocações para permitir mais ingestão paralela
  • 1 thread por alocação
  • 0 bytes de cache porque esperamos baixa taxa de acerto de cache
  • 8192 na fila
# Inicia o modelo com 4 Alocações x 1 Thread, sem cache, e fila 8192
POST _ml/trained_models/dslim__bert-base-ner/deployment/_start?cache_size=0b&number_of_allocations=4&threads_per_allocation=1&queue_capacity=8192

Você deve receber uma resposta semelhante a esta:

{
  "assignment": {
    "task_parameters": {
      "model_id": "dslim__bert-base-ner",
      "deployment_id": "dslim__bert-base-ner",
      "model_bytes": 430974836,
      "threads_per_allocation": 1,
      "number_of_allocations": 4,
      "queue_capacity": 8192,
      "cache_size": "0",
      "priority": "normal",
      "per_deployment_memory_bytes": 430914596,
      "per_allocation_memory_bytes": 629366952
    },
...
    "assignment_state": "started",
    "start_time": "2024-09-23T21:39:18.476066615Z",
    "max_assigned_allocations": 4
  }
}


O modelo NER foi implantado e iniciado e está pronto para uso.

O seguinte pipeline de ingestão implementa o modelo NER através do processador de inferência.

Há muito código aqui, mas apenas dois itens são de interesse. O restante é lógica condicional para conduzir algum comportamento adicional específico que examinaremos mais detalhadamente no futuro.

  • O processador de inferência invoca o modelo NER pelo ID que carregamos anetriormente e passa o texto a ser analisado, neste caso o campo message, que é o text_field que queremos passar para o modelo NER analisar PII
  • O processador de script percorre o campo message e substitui o PII identificado pelos dados gerados pelo modelo NER por placeholders editados. Isso parece mais complexo do que é porque simplesmente percorre o array de previsões do ML e substitui as previsões na string de mensagem por constantes, armazenando o resultado em um novo campo redact.message. Examinaremos isso mais detalhadamente nas etapas a seguir.

Os três blocos de código a seguir podem ser encontrados aqui.

Pipeline NER PII:

# Pipeline NER
DELETE _ingest/pipeline/logs-ner-pii-processor
PUT _ingest/pipeline/logs-ner-pii-processor
{
  "processors": [
    {
      "set": {
        "description": "Definir como true para realmente redigir, false executará os processadores mas deixará o original",
        "field": "redact.enable",
        "value": true
      }
    },
    {
      "set": {
        "description": "Definir como true para manter resultados de ML para depuração",
        "field": "redact.ner.keep_result",
        "value": true
      }
    },
    {
      "set": {
        "description": "Definir como PER, LOC, ORG para pular, ou NONE para não descartar nenhuma substituição",
        "field": "redact.ner.skip_entity",
        "value": "NONE"
      }
    },
    {
      "set": {
        "description": "Definir como PER, LOC, ORG para pular, ou NONE para não descartar nenhuma substituição",
        "field": "redact.ner.minimum_score",
        "value": 0.0
      }
    },
    {
      "set": {
        "if" : "ctx.redact.message == null",
        "field": "redact.message",
        "copy_from": "message"
      }
    },
    {
      "set": {
        "field": "redact.successful",
        "value": true
      }
    },
    {
      "inference": {
        "model_id": "dslim__bert-base-ner",
        "field_map": {
          "message": "text_field"
        },
        "on_failure": [
          {
            "set": {
              "description": "Define 'error.message'",
              "field": "failure",
              "value": "REDACT_NER_FAILED"
            }
          },
          {
            "set": {
              "field": "redact.successful",
              "value": false
            }
          }
        ]
      }
    },
    {
      "script": {
        "if": "ctx.failure_ner != 'REDACT_NER_FAILED'",
        "lang": "painless",
        "source": """String msg = ctx['message'];
          for (item in ctx['ml']['inference']['entities']) {
          	if ((item['class_name'] != ctx.redact.ner.skip_entity) && 
          	  (item['class_probability'] >= ctx.redact.ner.minimum_score)) {  
          		  msg = msg.replace(item['entity'], '<' + 
          		  'REDACTNER-'+ item['class_name'] + '>')
          	}
          }
          ctx.redact.message = msg""",
        "on_failure": [
          {
            "set": {
              "description": "Define 'error.message'",
              "field": "failure",
              "value": "REDACT_REPLACEMENT_SCRIPT_FAILED",
              "override": false
            }
          },
          {
            "set": {
              "field": "redact.successful",
              "value": false
            }
          }
        ]
      }
    },
    {
      "remove": {
        "if": "ctx.redact.ner.keep_result != true",
        "field": [
          "ml"
        ],
        "ignore_missing": true,
        "ignore_failure": true
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "failure",
        "value": "GENERAL_FAILURE",
        "override": false
      }
    }
  ]
}


Pipeline process-pii atualizado, que agora chama o pipeline NER.

Código do pipeline process-pii:

# Pipeline process-pii atualizado que agora chama o pipeline NER
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Definir como true para habilitar amostragem, caso contrário false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Definir Taxa de Amostragem 0 Nenhum 10000 todos permite precisão de 0.01%",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": {
        "description": "Definir como false se deseja descartar dados não amostrados, útil para reindexação de dados históricos",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "pipeline-sampler-logs",
        "ignore_failure": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
        "name": "logs-ner-pii-processor"
      }
    }
  ]
}


Agora recarregue os dados conforme instruções em Recarregar Logs

Resultadso

Vamos ver os resultados após o processamento NER. No Logs Explorer com a barra de consulta KQL, execute a seguinte consulta: data_stream.dataset : pii and ml.inference.entities.class_name : ("PER" and "LOC" and "ORG" )

O Logs Explorer deve parecer assim, abra a mensagem superior para ver os detalhes.

Resultados do Modelo NER

Vamos examinar mais de perto o que esses campos significam.

Campo: ml.inference.entities.class_name Valor de exemplo: [PER, PER, LOC, ORG, ORG] Descrição: Array de classes de entidades nomeadas identificadas pelo modelo NER.

Campo: ml.inference.entities.class_probability Valor de exemplo: [0.999, 0.972, 0.896, 0.506, 0.595] Descrição: class_probability é um valor entre 0 e 1 que indica a probabilidade de um determinado ponto de dados pertencer a uma classe. Quanto maior o número, maior a probabilidade de o ponto de dados pertencer à classe nomeada. Isso é importante porque no próximo blog, podemos deciding o limiar a ser usado para alertas e redação. Você pode ver neste exemplo que ele identificou LOC como ORG, que podemos filtrar/identificar definindo um limiar.

Campo: ml.inference.entities.entity Valor de exemplo: [Paul Buck, Steven Glens, South Amyborough, ME, Costco] Descrição: Array de entidades identificadas, alinhado por posição com class_name e class_probability.

Campo: ml.inference.predicted_value Valor de exemplo: [2024-09-23T14:32:14.608207-07:00Z] log.level=INFO: Pedido #4594 pagamento bem-sucedido (usuário: Paul Buck, david59@burgess.net)). Telefone: 726-632-0527x520, endereço: 3713 Steven Glens, South Amyborough, ME 93580, pedido de: CostcoDescrição: O valor previsto do modelo.

Painel de Avaliação de PII

Vamos dar uma olhada rápida no painel construído para avaliar dados de PII.

Para carregar o painel, vá para Kibana -> Stack Management -> Saved Objects e importe o arquivo pii-dashboard-part-1.ndjson que pode ser encontrado aqui:

https://github.com/bvader/elastic-pii/elastic/pii-dashboard-part-1.ndjson

Mais instruções completas sobre Objetos Salvos do Kibana podem ser encontradas aqui.

Depois de carregar o painel, navegue até ele e selecione o intervalo de tempo correto, você deve ver algo assim. Ele mostra métricas como taxa de amostragem, porcentagem de logs com NER, tendência de pontuação NER, etc. Veremos a avaliação e ação na parte 2 deste blog.

Próximos Passos

Na primeira parte do blog, completamos o seguinte.

  • Revisamos nossas técnicas e ferramentas disponíveis para detecção e avaliação de PII
  • Revisamos o papel do NLP / NER na detecção e avaliação de PII
  • Construímos os pipelines de ingestão combináveis necessários para amostrar logs e executá-los através do modelo NER
  • Revisamos os resultados do NER e nos preparamos para ir para o segundo blog

Na segunda parte deste blog, cobriremos o seguinte:

  • Usando NER e processador de redação para editar PII
  • Aplicando segurança em nível de campo para controlar acesso a dados não editados
  • Aprimorando painéis e alertas
  • Considerações de produção e escalabilidade
  • Como executar esses processos em dados de entrada ou históricos

Apêndice de Carregamento de Dados

Código

O código de carregamento de dados pode ser encontrado aqui:

https://github.com/bvader/elastic-pii

$ git clone https://github.com/bvader/elastic-pii.git


Isso criará 10000 logs aleatórios em um arquivo chamado pii.log, com e sem PII, sem alterar nenhum parâmetro.

Edite load_logs.py e defina o seguinte:

# O usuário do Elastic 
ELASTIC_USER = "elastic"

# Senha para o usuário 'elastic' gerada pelo Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"

# Encontrado na página 'Manage Deployment'
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="


Então execute o seguinte comando.

$ python load_logs.py


Recarregando os Logs

Para recarregar os logs, basta executar o comando acima novamente. Você pode executar esse comando múltiplas vezes durante este exercício, e os logs serão recarregados (na verdade, carregados novamente). Os novos logs não entrarão em conflito com as execuções anteriores porque cada execução tem um run.id único, que será exibido no final do processo de carregamento.

$ python load_logs.py


Tags: elasticsearch nlp pii-detection named-entity-recognition data-security

Publicado em 6-18 16:28