Explorando Consultas Elasticsearch com Python DSL

Este guia explora o uso do Python Elasticsearch DSL para construir e execuatr consultas complexas no Elasticsearch.

Conectando ao Elasticsearch

Para começar, é necessário estabelecer uma conexão com o cluster Elasticsearch.


import elasticsearch

# Conexão usando lista de hosts e portas
cliente = elasticsearch.Elasticsearch([{"host": "10.44.99.102", "port": 9200}])

# Conexão alternativa usando string de host:porta
cliente = elasticsearch.Elasticsearch(["10.44.99.102:9200"])
 

Realizando Buscas Básicas

A função search permite consultas simples. O parâmetro q define o termo de busca, size controla o número de resultados e from_ especifica o ponto de partida. filter_path pode ser usado para limitar os campos retornados.


# Busca simples com q, size e from_
resultado_busca_simples = cliente.search(index="bank", q="Holmes", size=1, from_=1)

# Busca com filtragem de campos retornados
resultado_filtrado = cliente.search(
   index="bank",
   q=" 39225    5686 ",
   size=1000,
   filter_path=["hits.hits._id", "hits.hits._type"]
)
 

Consultando Índices Específicos

É possível consultar um único índice, múltiplos índices especificados em uma lista ou índices que correspondem a um padrão:

  • index="nome_indice": Consulta um índice específico.
  • index=["indice1", "indice2"]: Consulta múltiplos índices.
  • index=["padrao*"]: Consulta índices que começam com "padrao".

A diretiva doc_type também pode ser especificada dentro da consulta search.

Usando Elasticsearch DSL

A biblioteca elasticsearch_dsl oferece uma interface mais robusta para a construção de consultas.


from elasticsearch_dsl import Search
import logging

# Inicializando uma busca
s = Search(using=cliente, index="situation-event")
s.execute()
logging.warning(s.to_dict())

# Ignorando índices indisponíveis
s_ignora_indisponivel = Search(using=cliente, index="situation-event")
s_ignora_indisponivel = s_ignora_indisponivel.params(ignore_unavailable=True)
 

Consultas por Campo

É possível realizar consultas baseadas em campos específicos, e múltiplas condições podem ser combinadas.


# Consulta por um único campo
s_por_tipo = Search(using=cliente, index="situation-event").query("match", event_type="002")

# Adicionando outra condição de correspondência
s_por_tipo.query("match", event_title="aaa")
resultado_combinado = s_por_tipo.execute()
logging.warning(resultado_combinado.to_dict())
 

Consultas Multi-Campo

A classe MultiMatch ou o construtor Q com o tipo multi_match permitem buscar em múltiplos campos simultaneamente.


from elasticsearch_dsl.query import MultiMatch

# Usando MultiMatch
multi_match_query = MultiMatch(query="aaa", fields=["event_type", "event_title"])
s_multi_match = Search(using=cliente, index="situation-event").query(multi_match_query)
resultado_multi_match = s_multi_match.execute()
logging.warning(resultado_multi_match.to_dict())

# Usando Q para multi_match
from elasticsearch_dsl import Q

q_obj_multi_match = Q("multi_match", query="aaa", fields=["event_type", "event_title"])
s_q_multi_match = Search(using=cliente, index="situation-event").query(q_obj_multi_match)
resultado_q_multi_match = s_q_multi_match.execute()
logging.warning(resultado_q_multi_match.to_dict())
 

Consultas Booleanas com Q

O objeto Q é versátil e permite construir consultas booleanas complexas (must, should, must_not, filter).


# Consulta booleana com must
q_booleana_must = Q("bool", must=[
   Q("match", event_type="002"),
   Q("match", event_title="aaa")
])
s_q_booleana = Search(using=cliente, index="situation-event").query(q_booleana_must)
resultado_q_booleana = s_q_booleana.execute()
logging.warning(resultado_q_booleana.to_dict())

# Combinação com OR (|)
q_or = Q("match", event_type="002") | Q("match", event_type="003")
s_or = Search(using=cliente, index="situation-event").query(q_or).execute()
logging.warning(s_or.to_dict())
# {"query": {"bool": {"should": [{"match": {"event_type": "002"}}, {"match": {"event_type": "003"}}]}}}

# Combinação com AND (&)
q_and = Q("match", event_type="002") & Q("match", event_type="003")
s_and = Search(using=cliente, index="situation-event").query(q_and)
logging.warning(s_and.to_dict())
# {"query": {"bool": {"must": [{"match": {"event_type": "002"}}, {"match": {"event_type": "003"}}]}}}

# Negação com NOT (~)
q_not = ~Q("match", event_type="002")
s_not = Search(using=cliente, index="situation-event").query(q_not).execute()
logging.warning(s_not.to_dict())
# {"query": {"bool": {"must_not": [{"match": {"event_type": "002"}}]}}}
 

Filtrando Resultados

O método filter é usado para aplicar filtros que não afetam a pontuação dos documentos. Diferente de query, filtros são mais eficientes pois podem ser cacheados.

Diferença entre term e match: term realiza uma correspondência exata (não tokenizada), enquanto match realiza uma busca mais flexível, tokenizando o texto e considerando relevância.


import time

# Filtro de intervalo de tempo e consulta por correspondência
s_filtro_tempo = Search(using=cliente, index="situation-event") \
   .filter("range", update_time={"gte": 0, "lt": time.time()}) \
   .query("match", event_type="003")
logging.warning(s_filtro_tempo.to_dict())

# Filtro usando 'terms' para múltiplos valores
s_filtro_termos = Search(using=cliente, index="situation-event").filter("terms", event_type=["002", "003"]).execute()
logging.warning(s_filtro_termos.to_dict())
# {"query": {"bool": {"filter": [{"terms": {"event_type": ["002", "003"]}}]}}}

# Alternativas para aplicar filtros
s_alternativa1 = Search(using=cliente, index="situation-event").filter("terms", event_type=["002", "003"])
logging.warning(s_alternativa1.to_dict())
# {"query": {"bool": {"filter": [{"terms": {"event_type": ["002", "003"]}}]}}}

s_alternativa2 = Search(using=cliente, index="situation-event").query("bool", filter=[Q("terms", event_type=["002", "003"])])
logging.warning(s_alternativa2.to_dict())
# {"query": {"bool": {"filter": [{"terms": {"event_type": ["002", "003"]}}]}}}

s_alternativa3 = Search(using=cliente, index="situation-event").query("bool", filter=[~Q("terms", event_type=["002", "003"])])
logging.warning(s_alternativa3.to_dict())
# {"query": {"bool": {"filter": [{"bool": {"must_not": [{"terms": {"event_type": ["002", "003"]}}]}}]}}}
 

Agregações (Aggregations)

As agregações permitem realizar cálculos e agrupar dados. Elas podem ser encadeadas com consultas e filtros.

  • Bucket Aggregations: Agrupam documentos em "baldes" (buckets) com base em critérios (ex: terms).
  • Metric Aggregations: Calculam métricas sobre os documentos (ex: sum, avg, stats, extended_stats).

# Agregação de bucket de camada única
s_agg_bucket_simples = Search(using=cliente, index="situation-event")
s_agg_bucket_simples.aggs.bucket("por_evento", "terms", field="event_type")
resultado_agg_simples = s_agg_bucket_simples.execute()
logging.warning(resultado_agg_simples.to_dict())
# {"query": {"match_all": {}}, "aggs": {"por_evento": {"terms": {"field": "event_type"}}}}

# Agregação de bucket de dupla camada
s_agg_bucket_duplo = Search(using=cliente, index="situation-event")
s_agg_bucket_duplo.aggs.bucket("por_evento", "terms", field="event_type") \
   .bucket("por_nivel", "terms", field="event_level")
resultado_agg_duplo = s_agg_bucket_duplo.execute()
logging.warning(resultado_agg_duplo.to_dict())
# {"query": {"match_all": {}}, "aggs": {"por_evento": {"terms": {"field": "event_type"}, "aggs": {"por_nivel": {"terms": {"field": "event_level"}}}}}}

# Agregação métrica (stats)
s_agg_metrica_simples = Search(using=cliente, index="situation-event")
s_agg_metrica_simples.aggs.metric("estatisticas_sistema", "stats", field="system_id")
resultado_agg_metrica_simples = s_agg_metrica_simples.execute()
logging.warning(resultado_agg_metrica_simples.to_dict())
# {"query": {"match_all": {}}, "aggs": {"estatisticas_sistema": {"stats": {"field": "system_id"}}}}

# Agregação de bucket com métrica aninhada
s_agg_aninhada = Search(using=cliente, index="situation-event")
s_agg_aninhada.aggs.bucket("por_sistema", "terms", field="system_id") \
   .metric("estatisticas_id_sistema", "stats", field="system_id")
resultado_agg_aninhada = s_agg_aninhada.execute()
logging.warning(resultado_agg_aninhada.to_dict())
# {"query": {"match_all": {}}, "aggs": {"por_sistema": {"terms": {"field": "system_id"}, "aggs": {"estatisticas_id_sistema": {"stats": {"field": "system_id"}}}}}}
 

É crucial executar a consulta com .execute() para obter os resultados. As operações de agregação s.aggs não devem ser atribuídas a variáveis intermediárias antes da execução.

Ordenação e Paginação

A ordenação pode ser definida com o método sort, aceitando campos simples, campos com ordem descendente (prefixo -) ou dicionários para configurações mais detalhadas.


# Exemplo de ordenação
s_ordenado = Search().sort(
   "categoria",              # Ordem ascendente padrão
   "-titulo",                # Ordem descendente
   {"linhas": {"order": "asc", "mode": "avg"}} # Ordenação complexa por campo 'linhas'
)

# Paginação usando slicing
# Retorna documentos do índice 10 ao 19 (total de 10 documentos)
s_paginado = s_ordenado[10:20]
# O DSL gerará automaticamente os parâmetros 'from' e 'size'
# {"from": 10, "size": 10}
 

Métodos de Extensão Úteis

O DSL oferece métodos adicionais para customizar a requisição:

  • .extra(explain=True): Adiciona parâmetros de extensão à requisição.
  • .params(search_type="count"): Define parâmetros gerais da requisição.
  • .source(): Controla quais campos do documento devem ser retornados (incluir, excluir, desabilitar retorno de campos, etc.).
  • Search.from_dict(): Cria uma instância de Search a partir de um dicionário que representa a consulta.
  • .update_from_dict(): Modifica uma consulta existente usando um dicionário.

s = Search()

# Adicionando explicação à consulta
s = s.extra(explain=True)

# Definindo tipo de busca
s = s.params(search_type="count")

# Limitando os campos retornados (apenas 'titulo' e 'corpo')
s = s.source(["titulo", "corpo"])

# Não retornar nenhum campo de documento, apenas metadados
s = s.source(False)

# Incluir 'titulo' e excluir campos que começam com 'usuario.'
s = s.source(include=["titulo"], exclude=["usuario.*"])

# Resetar a seleção de campos
s = s.source(None)

# Criar busca a partir de um dicionário
busca_de_dict = Search.from_dict({"query": {"match": {"titulo": "python"}}})

# Atualizar uma busca existente
s_existente = Search(index="meu_indice")
s_existente.update_from_dict({"query": {"match": {"titulo": "python"}}, "size": 42})
 

Tags: elasticsearch Python dsl query aggregation

Publicado em 6-13 17:21 por Thomas