Configuração e Uso Básico do Elasticsearch com Spring Boot

Este guia detalha a configuração e o uso básico do Elasticsearch em uma aplicação Spring Boot, focando em funcionalidades essenciais para gerenciar índices e dados.

Dependências e Configuração do Projeto

Configuração do Maven (pom.xml)

Adicione as seguintes dependências ao seu arquivo pom.xml para integrar o Spring Boot com o Elasticsearch e Swagger:


       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>io.springfox</groupId>
           <artifactId>springfox-swagger2</artifactId>
           <version>2.9.2</version> <!-- Verifique a versão compatível -->
       </dependency>
       <dependency>
           <groupId>io.springfox</groupId>
           <artifactId>springfox-swagger-ui</artifactId>
           <version>2.9.2</version> <!-- Verifique a versão compatível -->
       </dependency>
       <dependency>
           <groupId>org.elasticsearch.client</groupId>
           <artifactId>elasticsearch-rest-high-level-client</artifactId>
           <version>7.6.2</version>
           <exclusions>
               <exclusion>
                   <groupId>org.elasticsearch</groupId>
                   <artifactId>elasticsearch</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>org.elasticsearch.client</groupId>
                   <artifactId>elasticsearch-rest-client</artifactId>
               </exclusion>
           </exclusions>
       </dependency>
       <dependency>
           <groupId>org.elasticsearch.client</groupId>
           <artifactId>elasticsearch-rest-client</artifactId>
           <version>7.6.2</version>
       </dependency>
       <dependency>
           <groupId>org.elasticsearch</groupId>
           <artifactId>elasticsearch</artifactId>
           <version>7.6.2</version>
       </dependency>
       <!-- Outras dependências necessárias, como Lombok e Jackson -->
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <optional>true</optional>
       </dependency>
       <dependency>
           <groupId>com.alibaba</groupId>
           <artifactId>fastjson</artifactId>
           <version>1.2.76</version> <!-- Verifique a versão compatível -->
       </dependency>
   

Configuração da Aplicação (application-dev.yml)

Defina os parâmetros de conexão do Elasticsearch em seu arquivo de propriedades. Adapte para ambientes de produção conforme necessário.


elasticsearch:
 host: 192.168.1.1:9200,192.1.2.133:9200,192.168.1.3:9200
 cluster-name: meu-cluster-elasticsearch
 username:
 password:
 synonym:
   path: http://192.168.1.1:9048/servico/sinonimos
   

Reflexão: Ao armazenar um documento de artigo com campos como título, conteúdo e contagem de leituras no Elasticsearch, como você configuraria um analisador específico para o texto e permitiria a filtragem por contagem de leituras?

Anotações Personalizadas para Mapeamento

Anotação @FieldInfo

Crie anotações customizadas para definir o tipo de dado e o analisador para campos específicos em suas entidades. Isso simplifica a configuração do mapeamento do Elasticsearch.


import java.lang.annotation.*;

@Target({ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FieldInfo {

   /**
    * Define o tipo do campo no Elasticsearch. Padrão: "keyword".
    * @return O tipo do campo.
    */
   String type() default "keyword";

   /**
    * Define o analisador a ser utilizado.
    * 0: not_analyzed (obsoleto em versões mais recentes, considere 'keyword')
    * 1: ik_smart (analisador IK para texto em chinês, modo inteligente)
    * 2: ik_max_word (analisador IK para texto em chinês, modo de palavras máximas)
    * 3: ik-index (analisador customizado IK para indexação)
    * @return O código do analisador.
    */
   int participle() default 0;
}
   

Classe FieldMapping

Uma classe simples para encapsular o nome do campo, seu tipo e a configuração do analisador.


import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class FieldMapping {
   private String field;
   private String type;
   private Integer participle;

   public FieldMapping(String field, String type, Integer participle) {
       this.field = field;
       this.type = type;
       this.participle = participle;
   }
}
   

Utilitário FieldMappingUtil

Uma classe utilitária para extrair as configurações de @FieldInfo de uma classe Java e transformá-las em uma lista de FieldMapping.


import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FieldMappingUtil {

   /**
    * Obtém as informações de mapeamento de campos para uma dada classe.
    * @param clazz A classe a ser inspecionada.
    * @return Uma lista de objetos FieldMapping.
    */
   public static List<FieldMapping> getFieldInfo(Class<?> clazz) {
       return getFieldInfo(clazz, null);
   }

   /**
    * Obtém as informações de mapeamento de campos para uma dada classe e campo específico.
    * @param clazz A classe a ser inspecionada.
    * @param fieldName O nome do campo específico (pode ser null para todos os campos).
    * @return Uma lista de objetos FieldMapping.
    */
   public static List<FieldMapping> getFieldInfo(Class<?> clazz, String fieldName) {
       Field[] fields = clazz.getDeclaredFields();
       List<FieldMapping> fieldMappingList = new ArrayList<>();

       for (Field field : fields) {
           if (field.isAnnotationPresent(FieldInfo.class)) {
               FieldInfo fieldInfo = field.getAnnotation(FieldInfo.class);
               String name = field.getName();
               // Ignora campos específicos se um nome de campo for fornecido e não corresponder
               if (fieldName != null && !name.equals(fieldName)) {
                   continue;
               }
               fieldMappingList.add(new FieldMapping(name, fieldInfo.type(), fieldInfo.participle()));
           }
       }
       return fieldMappingList;
   }
}
   

Configuração do Elasticsearch e Swagger

Configuração do Elasticsearch (ElasticsearchConfig.java)

Esta classe configura o cliente de alto nível para interagir com o cluster Elasticsearch, definindo hosts, autenticação e timeouts.


import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Objects;

@Configuration
public class ElasticsearchConfig {

   private static final String HTTP_SCHEME = "http";
   private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);

   @Value("${elasticsearch.host}")
   private String[] hosts;

   @Value("${elasticsearch.username}")
   private String username;

   @Value("${elasticsearch.password}")
   private String password;

   @Bean
   public RestClientBuilder restClientBuilder() {
       HttpHost[] httpHosts = Arrays.stream(hosts)
               .map(this::createHttpHost)
               .filter(Objects::nonNull)
               .toArray(HttpHost[]::new);

       LOGGER.info("Configurando hosts Elasticsearch: {}", Arrays.toString(httpHosts));

       final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
       if (StringUtils.hasText(username) && StringUtils.hasText(password)) {
           credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
       }

       return RestClient.builder(httpHosts)
               .setHttpClientConfigCallback(httpClientBuilder ->
                       httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                               .setMaxConnPerRoute(100)
                               .setMaxConnTotal(100)
               )
               .setRequestConfigCallback(requestConfigBuilder -> {
                   requestConfigBuilder.setConnectTimeout(60000); // Timeout de conexão
                   requestConfigBuilder.setSocketTimeout(60000); // Timeout de socket
                   requestConfigBuilder.setConnectionRequestTimeout(60000); // Timeout para requisitar conexão
                   return requestConfigBuilder;
               });
   }

   private HttpHost createHttpHost(String address) {
       if (StringUtils.isEmpty(address)) {
           return null;
       }
       String[] parts = address.split(":");
       if (parts.length == 2) {
           try {
               String ip = parts[0];
               Integer port = Integer.valueOf(parts[1]);
               return new HttpHost(ip, port, HTTP_SCHEME);
           } catch (NumberFormatException e) {
               LOGGER.error("Porta inválida no endereço Elasticsearch: {}", address, e);
               return null;
           }
       } else {
           LOGGER.error("Formato de endereço Elasticsearch inválido: {}", address);
           return null;
       }
   }

   @Bean(name = "restHighLevelClient")
   public RestHighLevelClient restHighLevelClient(@Autowired RestClientBuilder restClientBuilder) {
       return new RestHighLevelClient(restClientBuilder);
   }
}
   

Configuração do Swagger (SwaggerConfig.java)

Configura o Swagger para documentar as APIs expostas, facilitando o entendimento e teste dos endpoints.


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {

   @Bean
   public Docket apiDocket() {
       return new Docket(DocumentationType.SWAGGER_2)
               .apiInfo(apiInfo())
               .select()
               .apis(RequestHandlerSelectors.basePackage("seu.pacote.controller")) // Adapte ao seu pacote de controllers
               .paths(PathSelectors.any())
               .build();
   }

   private ApiInfo apiInfo() {
       return new ApiInfoBuilder()
               .title("Documentação da API Elasticsearch")
               .description("API para gerenciamento de dados no Elasticsearch.")
               .version("1.0.0")
               .contact(new Contact("Nome do Desenvolvedor", "https://meu-site.com", "meu-email@example.com"))
               .build();
   }
}
   

Utilitário Principle do Elasticsearch (ElasticsearchUtil.java)

Esta classe fornece métodos para interagir com o Elasticsearch, incluindo operações de criação, atualização, exclusão e busca de dados, tanto individualmente quanto em lote.


import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Slf4j
@Component
public class ElasticsearchUtil {

   private static final String PRIMARY_KEY_NAME = "id"; // Chave primária padrão

   @Value("${elasticsearch.synonym.path}")
   private String synonymPath;

   @Autowired
   private RestHighLevelClient restHighLevelClient;

   /**
    * Cria ou atualiza um índice com mapeamento e configurações.
    * @param indexName Nome do índice.
    * @param indexAlias Alias do índice.
    * @param clazz Classe que define o mapeamento.
    * @return true se o índice foi criado/atualizado com sucesso, false caso contrário.
    * @throws IOException Se ocorrer um erro de I/O.
    */
   public boolean createOrUpdateIndex(String indexName, String indexAlias, Class<?> clazz) throws IOException {
       if (isIndexExists(indexName)) {
           log.info("Índice {} já existe.", indexName);
           // Em um cenário real, poderia haver lógica para atualizar o mapeamento aqui,
           // mas para simplificar, apenas retornamos true se já existir.
           return true;
       }

       List<FieldMapping> fieldMappings = FieldMappingUtil.getFieldInfo(clazz);
       XContentBuilder mappingBuilder = buildMapping(fieldMappings);
       XContentBuilder settingsBuilder = buildSettings();
       XContentBuilder aliasBuilder = buildAlias(indexAlias);

       CreateIndexRequest request = new CreateIndexRequest(indexName);
       request.settings(settingsBuilder);
       request.mapping(mappingBuilder);
       request.alias(aliasBuilder);

       CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);

       if (response.isAcknowledged()) {
           log.info("Índice {} criado com sucesso com alias {}.", indexName, indexAlias);
           return true;
       } else {
           log.error("Falha ao criar o índice {}.", indexName);
           return false;
       }
   }

   /**
    * Constrói o mapeamento do índice a partir da lista de FieldMapping.
    * @param fieldMappings Lista de configurações de campo.
    * @return Um XContentBuilder contendo o mapeamento.
    * @throws IOException Se ocorrer um erro.
    */
   private XContentBuilder buildMapping(List<FieldMapping> fieldMappings) throws IOException {
       XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
               .field("dynamic", "true") // Permite campos não mapeados explicitamente
               .startObject("properties");

       for (FieldMapping fm : fieldMappings) {
           builder.startObject(fm.getField())
                  .field("type", fm.getType());

           // Configuração do analisador baseada no valor de 'participle'
           if (fm.getParticiple() != null) {
               switch (fm.getParticiple()) {
                   case 1: // ik_smart
                       builder.field("analyzer", "ik_smart");
                       break;
                   case 2: // ik_max_word
                       builder.field("analyzer", "ik_max_word");
                       break;
                   case 3: // customizado (ex: com sinônimos e pinyin)
                       builder.field("analyzer", "ik_index_synonym");
                       builder.field("search_analyzer", "ik_search_synonym");
                       // Campos adicionais para suporte a pinyin, se necessário
                       builder.startObject("fields")
                              .startObject("pinyin")
                              .field("type", fm.getType()) // Mantém o tipo original
                              .field("analyzer", "ik_search_pinyin")
                              .endObject()
                              .endObject();
                       break;
                   default: // Não analisado ou tipo padrão
                       // Para campos de data, especificar o formato
                       if ("date".equalsIgnoreCase(fm.getType())) {
                          builder.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis");
                       }
                       break;
               }
           }
            // Configuração específica para campos de data
            if ("date".equalsIgnoreCase(fm.getType()) && (fm.getParticiple() == null || fm.getParticiple() == 0)) {
                 builder.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis");
            }

           builder.endObject();
       }

       builder.endObject().endObject();
       return builder;
   }


   /**
    * Constrói as configurações de análise (settings) para o índice.
    * Inclui analisadores customizados como 'ik_smart', 'ik_max_word', sinônimos e pinyin.
    * @return Um XContentBuilder contendo as configurações.
    * @throws IOException Se ocorrer um erro.
    */
   private XContentBuilder buildSettings() throws IOException {
       return XContentFactory.jsonBuilder().startObject()
               .field("number_of_shards", 3) // Número de shards
               .field("number_of_replicas", 1) // Número de réplicas
               .field("refresh_interval", "120s") // Intervalo de refresh
               .startObject("analysis")
                   .startObject("analyzer")
                       .startObject("ik_index_synonym")
                           .field("type", "custom")
                           .field("tokenizer", "ik_max_word")
                           .field("filter", new String[]{"synonym_filter"})
                       .endObject()
                       .startObject("ik_search_synonym")
                           .field("type", "custom")
                           .field("tokenizer", "ik_smart")
                           .field("filter", new String[]{"synonym_filter"})
                       .endObject()
                       .startObject("ik_search_pinyin")
                            .field("type", "custom")
                            .field("tokenizer", "ik_smart")
                            .field("filter", new String[]{"pinyin_filter", "lowercase", "asciifolding"})
                       .endObject()
                   .endObject()
                   .startObject("filter")
                       .startObject("synonym_filter")
                           .field("type", "dynamic_synonym")
                           .field("synonyms_path", synonymPath) // Caminho para o arquivo de sinônimos
                           .field("dynamic_reload", true)
                           .field("interval", 120) // Intervalo de recarregamento em segundos
                       .endObject()
                       .startObject("pinyin_filter")
                           .field("type", "pinyin")
                           .field("keep_first_letter", false) // Não manter primeira letra
                           .field("keep_separate_first_letter", false)
                           .field("keep_full_pinyin", true) // Manter pinyin completo
                           .field("keep_original", false)
                           .field("limit_first_letter_length", 16)
                           .field("lowercase", true) // Converte para minúsculas
                           .field("remove_duplicated_term", true)
                       .endObject()
                   .endObject()
               .endObject().endObject();
   }

   /**
    * Constrói o alias do índice.
    * @param alias O nome do alias.
    * @return Um XContentBuilder contendo o alias.
    * @throws IOException Se ocorrer um erro.
    */
   private XContentBuilder buildAlias(String alias) throws IOException {
       return XContentFactory.jsonBuilder().startObject()
               .field(alias).endObject().endObject();
   }


   /**
    * Verifica se um índice existe.
    * @param indexName Nome do índice.
    * @return true se o índice existe, false caso contrário.
    */
   public boolean isIndexExists(String indexName) {
       try {
           GetIndexRequest request = new GetIndexRequest(indexName);
           request.local(false);
           request.humanReadable(false);
           return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
       } catch (IOException e) {
           log.error("Erro ao verificar a existência do índice {}: {}", indexName, e.getMessage());
           return false;
       }
   }

   /**
    * Salva ou atualiza um único documento no Elasticsearch.
    * @param indexName Nome do índice.
    * @param id O ID do documento.
    * @param jsonData O documento em formato JSON.
    * @return true se a operação foi bem-sucedida, false caso contrário.
    * @throws IOException Se ocorrer um erro de I/O.
    */
   public boolean saveData(String indexName, String id, String jsonData) throws IOException {
       IndexRequest request = new IndexRequest(indexName)
               .id(id)
               .source(jsonData, XContentType.JSON)
               .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // Atualiza imediatamente

       IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);

       if (IndexResponse.Result.CREATED.equals(response.getResult())) {
           log.info("Documento com ID {} criado no índice {}.", id, indexName);
           return true;
       } else if (IndexResponse.Result.UPDATED.equals(response.getResult())) {
           log.info("Documento com ID {} atualizado no índice {}.", id, indexName);
           return true;
       } else {
           log.error("Falha ao salvar/atualizar documento com ID {} no índice {}. Resultado: {}", id, indexName, response.getResult());
           return false;
       }
   }

   /**
    * Salva ou atualiza múltiplos documentos em lote.
    * @param indexName Nome do índice.
    * @param jsonDataList Lista de documentos em formato JSON (array JSON).
    * @return true se a operação em lote foi bem-sucedida, false caso contrário.
    * @throws IOException Se ocorrer um erro de I/O.
    */
   public boolean saveDataBatch(String indexName, String jsonDataList) throws IOException {
       if (!StringUtils.hasText(jsonDataList)) {
           log.warn("Nenhum dado fornecido para o lote no índice {}.", indexName);
           return false;
       }

       BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
       JSONArray jsonArray = JSONArray.parseArray(jsonDataList);

       if (jsonArray == null || jsonArray.isEmpty()) {
            log.warn("Array JSON inválido ou vazio fornecido para o lote no índice {}.", indexName);
            return false;
       }

       for (Object obj : jsonArray) {
           if (obj instanceof JSONObject) {
               JSONObject jsonObject = (JSONObject) obj;
               String id = jsonObject.getString(PRIMARY_KEY_NAME);
               if (id == null) {
                   log.error("Documento no lote do índice {} não possui o campo chave '{}'. Documento: {}", indexName, PRIMARY_KEY_NAME, jsonObject.toJSONString());
                   continue; // Pula este documento
               }
               IndexRequest indexRequest = new IndexRequest(indexName)
                       .id(id)
                       .source(jsonObject.toJSONString(), XContentType.JSON);
               bulkRequest.add(indexRequest);
           } else {
                log.warn("Elemento inválido encontrado no array JSON do lote: {}", obj);
           }
       }

       if (bulkRequest.numberOfActions() == 0) {
            log.warn("Nenhuma ação de indexação válida foi adicionada ao lote para o índice {}.", indexName);
            return false;
       }

       BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

       if (bulkResponse.hasFailures()) {
           for (BulkItemResponse item : bulkResponse.getItems()) {
               log.error("Falha no item do lote (ID: {}): {}", item.getId(), item.getFailureMessage());
           }
           return false;
       } else {
           log.info("Lote de {} documentos processado com sucesso no índice {}. Criados: {}, Atualizados: {}",
                   bulkResponse.getItems().length, indexName,
                   countResultType(bulkResponse, DocWriteResponse.Result.CREATED),
                   countResultType(bulkResponse, DocWriteResponse.Result.UPDATED));
           return true;
       }
   }

   private long countResultType(BulkResponse bulkResponse, DocWriteResponse.Result type) {
       return java.util.Arrays.stream(bulkResponse.getItems())
               .filter(item -> item.getResponse() != null && item.getResponse().getResult().equals(type))
               .count();
   }


   /**
    * Atualiza um documento existente. Se o documento não existir, ele será criado (upsert).
    * @param indexName Nome do índice.
    * @param id O ID do documento a ser atualizado.
    * @param updateJson O JSON contendo os campos a serem atualizados.
    * @return true se a atualização foi bem-sucedida ou o documento foi criado, false caso contrário.
    */
   public boolean updateData(String indexName, String id, String updateJson) {
       UpdateRequest updateRequest = new UpdateRequest(indexName, id)
               .doc(updateJson, XContentType.JSON)
               .docAsUpsert(true) // Cria o documento se não existir
               .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

       try {
           UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
           log.info("Documento ID {} no índice {} atualizado. Resultado: {}", id, indexName, updateResponse.getResult());
           return !DocWriteResponse.Result.ERROR.equals(updateResponse.getResult());
       } catch (IOException e) {
           log.error("Erro ao atualizar documento ID {} no índice {}: {}", id, indexName, e.getMessage());
           return false;
       }
   }

   /**
    * Atualiza múltiplos documentos em lote. Similar a saveDataBatch, mas usa UpdateRequest.
    * @param indexName Nome do índice.
    * @param jsonDataList Lista de documentos a serem atualizados (cada um deve ter um ID).
    * @return true se a operação em lote foi bem-sucedida, false caso contrário.
    * @throws IOException Se ocorrer um erro de I/O.
    */
   public boolean updateDataBatch(String indexName, String jsonDataList) throws IOException {
        if (!StringUtils.hasText(jsonDataList)) {
           log.warn("Nenhum dado fornecido para atualização em lote no índice {}.", indexName);
           return false;
       }

       BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
       JSONArray jsonArray = JSONArray.parseArray(jsonDataList);

       if (jsonArray == null || jsonArray.isEmpty()) {
            log.warn("Array JSON inválido ou vazio fornecido para atualização em lote no índice {}.", indexName);
            return false;
       }

       for (Object obj : jsonArray) {
            if (obj instanceof JSONObject) {
               JSONObject jsonObject = (JSONObject) obj;
               String id = jsonObject.getString(PRIMARY_KEY_NAME);
                if (id == null) {
                   log.error("Documento no lote de atualização do índice {} não possui o campo chave '{}'. Documento: {}", indexName, PRIMARY_KEY_NAME, jsonObject.toJSONString());
                   continue;
               }
               UpdateRequest updateRequest = new UpdateRequest(indexName, id)
                       .doc(jsonObject.toJSONString(), XContentType.JSON)
                       .docAsUpsert(true); // Atualiza ou insere se não existir
               bulkRequest.add(updateRequest);
            } else {
                 log.warn("Elemento inválido encontrado no array JSON de atualização em lote: {}", obj);
            }
       }

        if (bulkRequest.numberOfActions() == 0) {
            log.warn("Nenhuma ação de atualização válida foi adicionada ao lote para o índice {}.", indexName);
            return false;
       }

       BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

       if (bulkResponse.hasFailures()) {
           for (BulkItemResponse item : bulkResponse.getItems()) {
               log.error("Falha na atualização em lote (ID: {}): {}", item.getId(), item.getFailureMessage());
           }
           return false;
       } else {
           log.info("Lote de {} documentos atualizados com sucesso no índice {}. Criados: {}, Atualizados: {}",
                   bulkResponse.getItems().length, indexName,
                   countResultType(bulkResponse, DocWriteResponse.Result.CREATED),
                   countResultType(bulkResponse, DocWriteResponse.Result.UPDATED));
           return true;
       }
   }


   /**
    * Exclui um documento do Elasticsearch.
    * @param indexName Nome do índice.
    * @param id O ID do documento a ser excluído.
    * @return true se o documento foi excluído com sucesso, false se não foi encontrado ou ocorreu um erro.
    */
   public boolean deleteData(String indexName, String id) {
       DeleteRequest deleteRequest = new DeleteRequest(indexName, id)
               .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

       try {
           DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
           if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) {
               log.warn("Documento com ID {} não encontrado no índice {}.", id, indexName);
               return false;
           } else {
               log.info("Documento com ID {} excluído com sucesso do índice {}.", id, indexName);
               return true;
           }
       } catch (IOException e) {
           log.error("Erro ao excluir documento ID {} do índice {}: {}", id, indexName, e.getMessage());
           return false;
       }
   }

   /**
    * Exclui múltiplos documentos em lote.
    * @param indexName Nome do índice.
    * @param ids Lista de IDs dos documentos a serem excluídos.
    * @return true se a operação em lote foi bem-sucedida, false caso contrário.
    */
   public boolean deleteDataBatch(String indexName, List<String> ids) {
       if (CollectionUtils.isEmpty(ids)) {
           log.warn("Nenhum ID fornecido para exclusão em lote no índice {}.", indexName);
           return false;
       }

       BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
       for (String id : ids) {
           bulkRequest.add(new DeleteRequest(indexName, id));
       }

       try {
           BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
           if (bulkResponse.hasFailures()) {
               for (BulkItemResponse item : bulkResponse.getItems()) {
                   if (item.getFailure() != null) {
                       log.error("Falha na exclusão em lote (ID: {}): {}", item.getId(), item.getFailureMessage());
                   }
               }
               return false;
           } else {
               log.info("Lote de {} documentos excluídos com sucesso do índice {}.", ids.size(), indexName);
               return true;
           }
       } catch (IOException e) {
           log.error("Erro na exclusão em lote do índice {}: {}", indexName, e.getMessage());
           return false;
       }
   }

   /**
    * Exclui todos os documentos de um índice.
    * @param indexName Nome do índice.
    * @return true se a operação foi bem-sucedida, false caso contrário.
    */
   public boolean deleteAll(String indexName) {
       if (!isIndexExists(indexName)) {
           log.warn("Índice {} não existe, exclusão de todos os documentos não necessária.", indexName);
           return false;
       }

       DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName)
               .setQuery(org.elasticsearch.index.query.QueryBuilders.matchAllQuery())
               .setConflicts("proceed") // Continua mesmo se houver conflitos
               .setRefreshPerGroup(true) // Refresca após cada grupo de documentos
               .setBatchSize(10000); // Processa em lotes de 10000

       try {
           BulkByScrollResponse bulkResponse = restHighLevelClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
           log.info("Todos os {} documentos do índice {} foram excluídos. Tempo: {}ms",
                   bulkResponse.getTotal(), indexName, bulkResponse.getTook().getMillis());
           return true;
       } catch (IOException e) {
           log.error("Erro ao excluir todos os documentos do índice {}: {}", indexName, e.getMessage());
           return false;
       }
   }

    /**
    * Exclui um índice inteiro do Elasticsearch.
    * @param indexName Nome do índice a ser excluído.
    * @return true se o índice foi excluído com sucesso, false caso contrário.
    */
   public boolean deleteIndex(String indexName) {
       if (!isIndexExists(indexName)) {
           log.warn("Índice {} não existe, exclusão não necessária.", indexName);
           return false;
       }

       try {
           DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
           // Opções para expandir nomes de índices (LENIENT_EXPAND_OPEN é comum)
           deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);

           AcknowledgedResponse response = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);

           if (response.isAcknowledged()) {
               log.info("Índice {} excluído com sucesso.", indexName);
               return true;
           } else {
               log.error("Falha ao excluir o índice {}.", indexName);
               return false;
           }
       } catch (IOException e) {
           log.error("Erro ao excluir o índice {}: {}", indexName, e.getMessage());
           return false;
       }
   }


   /**
    * Busca um documento pelo ID.
    * @param indexName Nome do índice.
    * @param id O ID do documento.
    * @return O documento em formato JSON string, ou null se não encontrado ou erro.
    */
   public String getDataById(String indexName, String id) {
       if (!isIndexExists(indexName)) {
           log.warn("Índice {} não encontrado. Impossível buscar documento {}.", indexName, id);
           return null;
       }

       GetRequest getRequest = new GetRequest(indexName, id);
       try {
           GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
           if (response.isExists()) {
               return response.getSourceAsString();
           } else {
               log.warn("Documento com ID {} não encontrado no índice {}.", id, indexName);
               return null;
           }
       } catch (IOException e) {
           log.error("Erro ao buscar documento ID {} do índice {}: {}", id, indexName, e.getMessage());
           return null;
       }
   }

   // Métodos assíncronos (exemplo)
   /**
    * Salva dados em lote de forma assíncrona.
    * @param indexName Nome do índice.
    * @param jsonDataList Lista de documentos em formato JSON.
    * @return true se a solicitação assíncrona foi enviada, false caso contrário.
    */
   public boolean saveDataBatchAsync(String indexName, String jsonDataList) {
        if (!StringUtils.hasText(jsonDataList)) {
           log.warn("Nenhum dado fornecido para o lote assíncrono no índice {}.", indexName);
           return false;
       }

       BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); // Menos agressivo para assíncrono
       JSONArray jsonArray = JSONArray.parseArray(jsonDataList);

       if (jsonArray == null || jsonArray.isEmpty()) {
            log.warn("Array JSON inválido ou vazio fornecido para o lote assíncrono no índice {}.", indexName);
            return false;
       }

       for (Object obj : jsonArray) {
           if (obj instanceof JSONObject) {
               JSONObject jsonObject = (JSONObject) obj;
               String id = jsonObject.getString(PRIMARY_KEY_NAME);
                if (id == null) {
                   log.error("Documento no lote assíncrono do índice {} não possui o campo chave '{}'. Documento: {}", indexName, PRIMARY_KEY_NAME, jsonObject.toJSONString());
                   continue;
               }
               IndexRequest indexRequest = new IndexRequest(indexName)
                       .id(id)
                       .source(jsonObject.toJSONString(), XContentType.JSON);
               bulkRequest.add(indexRequest);
           } else {
                log.warn("Elemento inválido encontrado no array JSON do lote assíncrono: {}", obj);
           }
       }

       if (bulkRequest.numberOfActions() == 0) {
            log.warn("Nenhuma ação de indexação válida foi adicionada ao lote assíncrono para o índice {}.", indexName);
            return false;
       }

       ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
           @Override
           public void onResponse(BulkResponse response) {
               if (response.hasFailures()) {
                   for (BulkItemResponse item : response.getItems()) {
                       log.error("Falha no item do lote assíncrono (ID: {}): {}", item.getId(), item.getFailureMessage());
                   }
               } else {
                   log.info("Lote assíncrono processado com sucesso no índice {}. Itens: {}", indexName, response.getItems().length);
               }
           }

           @Override
           public void onFailure(Exception e) {
               log.error("Erro no processamento do lote assíncrono para o índice {}: {}", indexName, e.getMessage());
           }
       };

       restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
       log.info("Solicitação de processamento em lote assíncrono enviada para o índice {}.", indexName);
       return true;
   }
}
   

Utilitário de Gerenciamento de Índices (FieldUtil.java)

Uma classe para mapear nomes de índices a classes de entidades específicas, permitindo que o utilitário Elasticsearch saiba qual mapeamento usar.


import java.util.HashMap;
import java.util.Map;

public class FieldUtil {

   public static final String MESSAGE_INDEX = "message_index"; // Exemplo de nome de índice

   // Mapa para associar nomes de índices a classes de entidades
   private static final Map<String, Class<?>> indexClassMap = new HashMap<>(16);

   // Bloco estático para inicializar o mapa
   static {
       // Associe o nome do índice à sua classe de entidade correspondente
       indexClassMap.put(MESSAGE_INDEX, MessageEntity.class); // Substitua MessageEntity pela sua classe real
       // Adicione outras associações conforme necessário
       // indexClassMap.put("articles", Article.class);
       // indexClassMap.put("orders", Order.class);
   }

   /**
    * Obtém a classe de entidade associada a um determinado nome de índice.
    * @param indexName O nome do índice.
    * @return A classe de entidade associada ou Object.class se não for encontrada.
    */
   public static Class<?> getClassForIndex(final String indexName) {
       return indexClassMap.getOrDefault(indexName, Object.class);
   }
}

// Exemplo de classe de entidade (substitua pela sua implementação)
class MessageEntity {
   // Defina os campos aqui com as anotações @FieldInfo apropriadas
   @FieldInfo(type = "keyword")
   private String id;

   @FieldInfo(type = "text", participle = 3) // Exemplo com analisador customizado
   private String content;

   @FieldInfo(type = "date")
   private java.util.Date timestamp;
   // ... outros campos
}
   

Exemplo de Controle REST (ElasticsearchController.java)

Exemplo de um controller Spring Boot que expõe endpoints para interagir com as funcionalidades do Elasticsearch.


import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.util.List;

@RestController
@RequestMapping("/api/es")
@Api(tags = "API de Gerenciamento Elasticsearch")
public class ElasticsearchController {

   @Autowired
   private ElasticsearchUtil elasticsearchUtil;

   @PostMapping("/index/{indexName}")
   @ApiOperation("Cria ou atualiza um índice Elasticsearch.")
   public ResponseEntity<?> createOrUpdateIndex(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "Alias do índice", required = true) @RequestParam String alias,
           @ApiParam(value = "Classe de entidade para mapeamento (usará FieldUtil)", required = true) @RequestParam String entityClassKey) {
       Class<?> entityClass = FieldUtil.getClassForIndex(entityClassKey);
       try {
           boolean success = elasticsearchUtil.createOrUpdateIndex(indexName, alias, entityClass);
           return success ? ResponseEntity.ok("Índice criado/atualizado com sucesso.") : ResponseEntity.internalServerError().body("Falha ao criar/atualizar índice.");
       } catch (IOException e) {
           return ResponseEntity.internalServerError().body("Erro de I/O ao criar/atualizar índice: " + e.getMessage());
       }
   }

   @PostMapping("/doc/{indexName}")
   @ApiOperation("Salva ou atualiza um único documento.")
   public ResponseEntity<?> saveDocument(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "ID do documento", required = true) @RequestParam String id,
           @ApiParam(value = "Documento em formato JSON", required = true) @RequestBody String jsonData) {
       try {
           boolean success = elasticsearchUtil.saveData(indexName, id, jsonData);
           return success ? ResponseEntity.ok("Documento salvo/atualizado com sucesso.") : ResponseEntity.internalServerError().body("Falha ao salvar/atualizar documento.");
       } catch (IOException e) {
           return ResponseEntity.internalServerError().body("Erro de I/O ao salvar documento: " + e.getMessage());
       }
   }

   @PostMapping("/docs/batch/{indexName}")
   @ApiOperation("Salva ou atualiza múltiplos documentos em lote (síncrono).")
   public ResponseEntity<?> saveDocumentsBatch(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "Lista de documentos em formato JSON array", required = true) @RequestBody String jsonDataList) {
       try {
           boolean success = elasticsearchUtil.saveDataBatch(indexName, jsonDataList);
           return success ? ResponseEntity.ok("Documentos salvos/atualizados em lote com sucesso.") : ResponseEntity.internalServerError().body("Falha ao salvar/atualizar documentos em lote.");
       } catch (IOException e) {
           return ResponseEntity.internalServerError().body("Erro de I/O ao salvar documentos em lote: " + e.getMessage());
       }
   }

    @PostMapping("/docs/batch/async/{indexName}")
   @ApiOperation("Salva ou atualiza múltiplos documentos em lote (assíncrono).")
   public ResponseEntity<?> saveDocumentsBatchAsync(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "Lista de documentos em formato JSON array", required = true) @RequestBody String jsonDataList) {
       boolean success = elasticsearchUtil.saveDataBatchAsync(indexName, jsonDataList);
       return success ? ResponseEntity.ok("Solicitação de salvamento em lote assíncrono enviada.") : ResponseEntity.internalServerError().body("Falha ao enviar solicitação de salvamento em lote assíncrono.");
   }


   @PutMapping("/doc/{indexName}")
   @ApiOperation("Atualiza um documento (ou cria se não existir).")
   public ResponseEntity<?> updateDocument(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "ID do documento", required = true) @RequestParam String id,
           @ApiParam(value = "Campos para atualizar em formato JSON", required = true) @RequestBody String updateJson) {
       boolean success = elasticsearchUtil.updateData(indexName, id, updateJson);
       return success ? ResponseEntity.ok("Documento atualizado/criado com sucesso.") : ResponseEntity.internalServerError().body("Falha ao atualizar/criar documento.");
   }

    @PutMapping("/docs/batch/{indexName}")
   @ApiOperation("Atualiza múltiplos documentos em lote (cria se não existir).")
   public ResponseEntity<?> updateDocumentsBatch(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "Lista de documentos para atualizar em formato JSON array", required = true) @RequestBody String jsonDataList) {
       try {
           boolean success = elasticsearchUtil.updateDataBatch(indexName, jsonDataList);
           return success ? ResponseEntity.ok("Documentos atualizados/criados em lote com sucesso.") : ResponseEntity.internalServerError().body("Falha ao atualizar/criar documentos em lote.");
       } catch (IOException e) {
           return ResponseEntity.internalServerError().body("Erro de I/O ao atualizar documentos em lote: " + e.getMessage());
       }
   }

   @DeleteMapping("/doc/{indexName}/{id}")
   @ApiOperation("Exclui um único documento.")
   public ResponseEntity<?> deleteDocument(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "ID do documento a excluir", required = true) @PathVariable String id) {
       boolean success = elasticsearchUtil.deleteData(indexName, id);
       return success ? ResponseEntity.ok("Documento excluído com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir documento (pode não ter sido encontrado).");
   }

   @DeleteMapping("/docs/batch/{indexName}")
   @ApiOperation("Exclui múltiplos documentos em lote.")
   public ResponseEntity<?> deleteDocumentsBatch(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "Lista de IDs a excluir", required = true) @RequestBody List<String> ids) {
       boolean success = elasticsearchUtil.deleteDataBatch(indexName, ids);
       return success ? ResponseEntity.ok("Documentos excluídos em lote com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir documentos em lote.");
   }

   @DeleteMapping("/index/{indexName}")
   @ApiOperation("Exclui todos os documentos de um índice (mantém o índice).")
   public ResponseEntity<?> deleteAllDocumentsFromIndex(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName) {
       boolean success = elasticsearchUtil.deleteAll(indexName);
       return success ? ResponseEntity.ok("Todos os documentos do índice excluídos com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir todos os documentos do índice.");
   }

    @DeleteMapping("/manage/index/{indexName}")
   @ApiOperation("Exclui um índice inteiro (incluindo mapeamento e dados).")
   public ResponseEntity<?> deleteIndex(
           @ApiParam(value = "Nome do índice a excluir", required = true) @PathVariable String indexName) {
       boolean success = elasticsearchUtil.deleteIndex(indexName);
       return success ? ResponseEntity.ok("Índice excluído com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir índice.");
   }


   @GetMapping("/doc/{indexName}/{id}")
   @ApiOperation("Busca um documento pelo ID.")
   public ResponseEntity<?> getDocumentById(
           @ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
           @ApiParam(value = "ID do documento", required = true) @PathVariable String id) {
       String document = elasticsearchUtil.getDataById(indexName, id);
       return document != null ? ResponseEntity.ok(document) : ResponseEntity.notFound().build();
   }

   // Adicione mais endpoints conforme necessário para buscas complexas, agregações, etc.
}
   

Tags: elasticsearch Spring Boot java rest client search engine

Publicado em 6-30 19:29