Este guia detalha a configuração e o uso básico do Elasticsearch em uma aplicação Spring Boot, focando em funcionalidades essenciais para gerenciar índices e dados.
Dependências e Configuração do Projeto
Configuração do Maven (pom.xml)
Adicione as seguintes dependências ao seu arquivo pom.xml para integrar o Spring Boot com o Elasticsearch e Swagger:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version> <!-- Verifique a versão compatível -->
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version> <!-- Verifique a versão compatível -->
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.2</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.6.2</version>
</dependency>
<!-- Outras dependências necessárias, como Lombok e Jackson -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version> <!-- Verifique a versão compatível -->
</dependency>
Configuração da Aplicação (application-dev.yml)
Defina os parâmetros de conexão do Elasticsearch em seu arquivo de propriedades. Adapte para ambientes de produção conforme necessário.
elasticsearch:
host: 192.168.1.1:9200,192.1.2.133:9200,192.168.1.3:9200
cluster-name: meu-cluster-elasticsearch
username:
password:
synonym:
path: http://192.168.1.1:9048/servico/sinonimos
Reflexão: Ao armazenar um documento de artigo com campos como título, conteúdo e contagem de leituras no Elasticsearch, como você configuraria um analisador específico para o texto e permitiria a filtragem por contagem de leituras?
Anotações Personalizadas para Mapeamento
Anotação @FieldInfo
Crie anotações customizadas para definir o tipo de dado e o analisador para campos específicos em suas entidades. Isso simplifica a configuração do mapeamento do Elasticsearch.
import java.lang.annotation.*;
@Target({ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FieldInfo {
/**
* Define o tipo do campo no Elasticsearch. Padrão: "keyword".
* @return O tipo do campo.
*/
String type() default "keyword";
/**
* Define o analisador a ser utilizado.
* 0: not_analyzed (obsoleto em versões mais recentes, considere 'keyword')
* 1: ik_smart (analisador IK para texto em chinês, modo inteligente)
* 2: ik_max_word (analisador IK para texto em chinês, modo de palavras máximas)
* 3: ik-index (analisador customizado IK para indexação)
* @return O código do analisador.
*/
int participle() default 0;
}
Classe FieldMapping
Uma classe simples para encapsular o nome do campo, seu tipo e a configuração do analisador.
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class FieldMapping {
private String field;
private String type;
private Integer participle;
public FieldMapping(String field, String type, Integer participle) {
this.field = field;
this.type = type;
this.participle = participle;
}
}
Utilitário FieldMappingUtil
Uma classe utilitária para extrair as configurações de @FieldInfo de uma classe Java e transformá-las em uma lista de FieldMapping.
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FieldMappingUtil {
/**
* Obtém as informações de mapeamento de campos para uma dada classe.
* @param clazz A classe a ser inspecionada.
* @return Uma lista de objetos FieldMapping.
*/
public static List<FieldMapping> getFieldInfo(Class<?> clazz) {
return getFieldInfo(clazz, null);
}
/**
* Obtém as informações de mapeamento de campos para uma dada classe e campo específico.
* @param clazz A classe a ser inspecionada.
* @param fieldName O nome do campo específico (pode ser null para todos os campos).
* @return Uma lista de objetos FieldMapping.
*/
public static List<FieldMapping> getFieldInfo(Class<?> clazz, String fieldName) {
Field[] fields = clazz.getDeclaredFields();
List<FieldMapping> fieldMappingList = new ArrayList<>();
for (Field field : fields) {
if (field.isAnnotationPresent(FieldInfo.class)) {
FieldInfo fieldInfo = field.getAnnotation(FieldInfo.class);
String name = field.getName();
// Ignora campos específicos se um nome de campo for fornecido e não corresponder
if (fieldName != null && !name.equals(fieldName)) {
continue;
}
fieldMappingList.add(new FieldMapping(name, fieldInfo.type(), fieldInfo.participle()));
}
}
return fieldMappingList;
}
}
Configuração do Elasticsearch e Swagger
Configuração do Elasticsearch (ElasticsearchConfig.java)
Esta classe configura o cliente de alto nível para interagir com o cluster Elasticsearch, definindo hosts, autenticação e timeouts.
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Objects;
@Configuration
public class ElasticsearchConfig {
private static final String HTTP_SCHEME = "http";
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
@Value("${elasticsearch.host}")
private String[] hosts;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
@Bean
public RestClientBuilder restClientBuilder() {
HttpHost[] httpHosts = Arrays.stream(hosts)
.map(this::createHttpHost)
.filter(Objects::nonNull)
.toArray(HttpHost[]::new);
LOGGER.info("Configurando hosts Elasticsearch: {}", Arrays.toString(httpHosts));
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (StringUtils.hasText(username) && StringUtils.hasText(password)) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
return RestClient.builder(httpHosts)
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setMaxConnPerRoute(100)
.setMaxConnTotal(100)
)
.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(60000); // Timeout de conexão
requestConfigBuilder.setSocketTimeout(60000); // Timeout de socket
requestConfigBuilder.setConnectionRequestTimeout(60000); // Timeout para requisitar conexão
return requestConfigBuilder;
});
}
private HttpHost createHttpHost(String address) {
if (StringUtils.isEmpty(address)) {
return null;
}
String[] parts = address.split(":");
if (parts.length == 2) {
try {
String ip = parts[0];
Integer port = Integer.valueOf(parts[1]);
return new HttpHost(ip, port, HTTP_SCHEME);
} catch (NumberFormatException e) {
LOGGER.error("Porta inválida no endereço Elasticsearch: {}", address, e);
return null;
}
} else {
LOGGER.error("Formato de endereço Elasticsearch inválido: {}", address);
return null;
}
}
@Bean(name = "restHighLevelClient")
public RestHighLevelClient restHighLevelClient(@Autowired RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
}
Configuração do Swagger (SwaggerConfig.java)
Configura o Swagger para documentar as APIs expostas, facilitando o entendimento e teste dos endpoints.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket apiDocket() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("seu.pacote.controller")) // Adapte ao seu pacote de controllers
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("Documentação da API Elasticsearch")
.description("API para gerenciamento de dados no Elasticsearch.")
.version("1.0.0")
.contact(new Contact("Nome do Desenvolvedor", "https://meu-site.com", "meu-email@example.com"))
.build();
}
}
Utilitário Principle do Elasticsearch (ElasticsearchUtil.java)
Esta classe fornece métodos para interagir com o Elasticsearch, incluindo operações de criação, atualização, exclusão e busca de dados, tanto individualmente quanto em lote.
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Component
public class ElasticsearchUtil {
private static final String PRIMARY_KEY_NAME = "id"; // Chave primária padrão
@Value("${elasticsearch.synonym.path}")
private String synonymPath;
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* Cria ou atualiza um índice com mapeamento e configurações.
* @param indexName Nome do índice.
* @param indexAlias Alias do índice.
* @param clazz Classe que define o mapeamento.
* @return true se o índice foi criado/atualizado com sucesso, false caso contrário.
* @throws IOException Se ocorrer um erro de I/O.
*/
public boolean createOrUpdateIndex(String indexName, String indexAlias, Class<?> clazz) throws IOException {
if (isIndexExists(indexName)) {
log.info("Índice {} já existe.", indexName);
// Em um cenário real, poderia haver lógica para atualizar o mapeamento aqui,
// mas para simplificar, apenas retornamos true se já existir.
return true;
}
List<FieldMapping> fieldMappings = FieldMappingUtil.getFieldInfo(clazz);
XContentBuilder mappingBuilder = buildMapping(fieldMappings);
XContentBuilder settingsBuilder = buildSettings();
XContentBuilder aliasBuilder = buildAlias(indexAlias);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settingsBuilder);
request.mapping(mappingBuilder);
request.alias(aliasBuilder);
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("Índice {} criado com sucesso com alias {}.", indexName, indexAlias);
return true;
} else {
log.error("Falha ao criar o índice {}.", indexName);
return false;
}
}
/**
* Constrói o mapeamento do índice a partir da lista de FieldMapping.
* @param fieldMappings Lista de configurações de campo.
* @return Um XContentBuilder contendo o mapeamento.
* @throws IOException Se ocorrer um erro.
*/
private XContentBuilder buildMapping(List<FieldMapping> fieldMappings) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("dynamic", "true") // Permite campos não mapeados explicitamente
.startObject("properties");
for (FieldMapping fm : fieldMappings) {
builder.startObject(fm.getField())
.field("type", fm.getType());
// Configuração do analisador baseada no valor de 'participle'
if (fm.getParticiple() != null) {
switch (fm.getParticiple()) {
case 1: // ik_smart
builder.field("analyzer", "ik_smart");
break;
case 2: // ik_max_word
builder.field("analyzer", "ik_max_word");
break;
case 3: // customizado (ex: com sinônimos e pinyin)
builder.field("analyzer", "ik_index_synonym");
builder.field("search_analyzer", "ik_search_synonym");
// Campos adicionais para suporte a pinyin, se necessário
builder.startObject("fields")
.startObject("pinyin")
.field("type", fm.getType()) // Mantém o tipo original
.field("analyzer", "ik_search_pinyin")
.endObject()
.endObject();
break;
default: // Não analisado ou tipo padrão
// Para campos de data, especificar o formato
if ("date".equalsIgnoreCase(fm.getType())) {
builder.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis");
}
break;
}
}
// Configuração específica para campos de data
if ("date".equalsIgnoreCase(fm.getType()) && (fm.getParticiple() == null || fm.getParticiple() == 0)) {
builder.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis");
}
builder.endObject();
}
builder.endObject().endObject();
return builder;
}
/**
* Constrói as configurações de análise (settings) para o índice.
* Inclui analisadores customizados como 'ik_smart', 'ik_max_word', sinônimos e pinyin.
* @return Um XContentBuilder contendo as configurações.
* @throws IOException Se ocorrer um erro.
*/
private XContentBuilder buildSettings() throws IOException {
return XContentFactory.jsonBuilder().startObject()
.field("number_of_shards", 3) // Número de shards
.field("number_of_replicas", 1) // Número de réplicas
.field("refresh_interval", "120s") // Intervalo de refresh
.startObject("analysis")
.startObject("analyzer")
.startObject("ik_index_synonym")
.field("type", "custom")
.field("tokenizer", "ik_max_word")
.field("filter", new String[]{"synonym_filter"})
.endObject()
.startObject("ik_search_synonym")
.field("type", "custom")
.field("tokenizer", "ik_smart")
.field("filter", new String[]{"synonym_filter"})
.endObject()
.startObject("ik_search_pinyin")
.field("type", "custom")
.field("tokenizer", "ik_smart")
.field("filter", new String[]{"pinyin_filter", "lowercase", "asciifolding"})
.endObject()
.endObject()
.startObject("filter")
.startObject("synonym_filter")
.field("type", "dynamic_synonym")
.field("synonyms_path", synonymPath) // Caminho para o arquivo de sinônimos
.field("dynamic_reload", true)
.field("interval", 120) // Intervalo de recarregamento em segundos
.endObject()
.startObject("pinyin_filter")
.field("type", "pinyin")
.field("keep_first_letter", false) // Não manter primeira letra
.field("keep_separate_first_letter", false)
.field("keep_full_pinyin", true) // Manter pinyin completo
.field("keep_original", false)
.field("limit_first_letter_length", 16)
.field("lowercase", true) // Converte para minúsculas
.field("remove_duplicated_term", true)
.endObject()
.endObject()
.endObject().endObject();
}
/**
* Constrói o alias do índice.
* @param alias O nome do alias.
* @return Um XContentBuilder contendo o alias.
* @throws IOException Se ocorrer um erro.
*/
private XContentBuilder buildAlias(String alias) throws IOException {
return XContentFactory.jsonBuilder().startObject()
.field(alias).endObject().endObject();
}
/**
* Verifica se um índice existe.
* @param indexName Nome do índice.
* @return true se o índice existe, false caso contrário.
*/
public boolean isIndexExists(String indexName) {
try {
GetIndexRequest request = new GetIndexRequest(indexName);
request.local(false);
request.humanReadable(false);
return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("Erro ao verificar a existência do índice {}: {}", indexName, e.getMessage());
return false;
}
}
/**
* Salva ou atualiza um único documento no Elasticsearch.
* @param indexName Nome do índice.
* @param id O ID do documento.
* @param jsonData O documento em formato JSON.
* @return true se a operação foi bem-sucedida, false caso contrário.
* @throws IOException Se ocorrer um erro de I/O.
*/
public boolean saveData(String indexName, String id, String jsonData) throws IOException {
IndexRequest request = new IndexRequest(indexName)
.id(id)
.source(jsonData, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // Atualiza imediatamente
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
if (IndexResponse.Result.CREATED.equals(response.getResult())) {
log.info("Documento com ID {} criado no índice {}.", id, indexName);
return true;
} else if (IndexResponse.Result.UPDATED.equals(response.getResult())) {
log.info("Documento com ID {} atualizado no índice {}.", id, indexName);
return true;
} else {
log.error("Falha ao salvar/atualizar documento com ID {} no índice {}. Resultado: {}", id, indexName, response.getResult());
return false;
}
}
/**
* Salva ou atualiza múltiplos documentos em lote.
* @param indexName Nome do índice.
* @param jsonDataList Lista de documentos em formato JSON (array JSON).
* @return true se a operação em lote foi bem-sucedida, false caso contrário.
* @throws IOException Se ocorrer um erro de I/O.
*/
public boolean saveDataBatch(String indexName, String jsonDataList) throws IOException {
if (!StringUtils.hasText(jsonDataList)) {
log.warn("Nenhum dado fornecido para o lote no índice {}.", indexName);
return false;
}
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
if (jsonArray == null || jsonArray.isEmpty()) {
log.warn("Array JSON inválido ou vazio fornecido para o lote no índice {}.", indexName);
return false;
}
for (Object obj : jsonArray) {
if (obj instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) obj;
String id = jsonObject.getString(PRIMARY_KEY_NAME);
if (id == null) {
log.error("Documento no lote do índice {} não possui o campo chave '{}'. Documento: {}", indexName, PRIMARY_KEY_NAME, jsonObject.toJSONString());
continue; // Pula este documento
}
IndexRequest indexRequest = new IndexRequest(indexName)
.id(id)
.source(jsonObject.toJSONString(), XContentType.JSON);
bulkRequest.add(indexRequest);
} else {
log.warn("Elemento inválido encontrado no array JSON do lote: {}", obj);
}
}
if (bulkRequest.numberOfActions() == 0) {
log.warn("Nenhuma ação de indexação válida foi adicionada ao lote para o índice {}.", indexName);
return false;
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item : bulkResponse.getItems()) {
log.error("Falha no item do lote (ID: {}): {}", item.getId(), item.getFailureMessage());
}
return false;
} else {
log.info("Lote de {} documentos processado com sucesso no índice {}. Criados: {}, Atualizados: {}",
bulkResponse.getItems().length, indexName,
countResultType(bulkResponse, DocWriteResponse.Result.CREATED),
countResultType(bulkResponse, DocWriteResponse.Result.UPDATED));
return true;
}
}
private long countResultType(BulkResponse bulkResponse, DocWriteResponse.Result type) {
return java.util.Arrays.stream(bulkResponse.getItems())
.filter(item -> item.getResponse() != null && item.getResponse().getResult().equals(type))
.count();
}
/**
* Atualiza um documento existente. Se o documento não existir, ele será criado (upsert).
* @param indexName Nome do índice.
* @param id O ID do documento a ser atualizado.
* @param updateJson O JSON contendo os campos a serem atualizados.
* @return true se a atualização foi bem-sucedida ou o documento foi criado, false caso contrário.
*/
public boolean updateData(String indexName, String id, String updateJson) {
UpdateRequest updateRequest = new UpdateRequest(indexName, id)
.doc(updateJson, XContentType.JSON)
.docAsUpsert(true) // Cria o documento se não existir
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("Documento ID {} no índice {} atualizado. Resultado: {}", id, indexName, updateResponse.getResult());
return !DocWriteResponse.Result.ERROR.equals(updateResponse.getResult());
} catch (IOException e) {
log.error("Erro ao atualizar documento ID {} no índice {}: {}", id, indexName, e.getMessage());
return false;
}
}
/**
* Atualiza múltiplos documentos em lote. Similar a saveDataBatch, mas usa UpdateRequest.
* @param indexName Nome do índice.
* @param jsonDataList Lista de documentos a serem atualizados (cada um deve ter um ID).
* @return true se a operação em lote foi bem-sucedida, false caso contrário.
* @throws IOException Se ocorrer um erro de I/O.
*/
public boolean updateDataBatch(String indexName, String jsonDataList) throws IOException {
if (!StringUtils.hasText(jsonDataList)) {
log.warn("Nenhum dado fornecido para atualização em lote no índice {}.", indexName);
return false;
}
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
if (jsonArray == null || jsonArray.isEmpty()) {
log.warn("Array JSON inválido ou vazio fornecido para atualização em lote no índice {}.", indexName);
return false;
}
for (Object obj : jsonArray) {
if (obj instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) obj;
String id = jsonObject.getString(PRIMARY_KEY_NAME);
if (id == null) {
log.error("Documento no lote de atualização do índice {} não possui o campo chave '{}'. Documento: {}", indexName, PRIMARY_KEY_NAME, jsonObject.toJSONString());
continue;
}
UpdateRequest updateRequest = new UpdateRequest(indexName, id)
.doc(jsonObject.toJSONString(), XContentType.JSON)
.docAsUpsert(true); // Atualiza ou insere se não existir
bulkRequest.add(updateRequest);
} else {
log.warn("Elemento inválido encontrado no array JSON de atualização em lote: {}", obj);
}
}
if (bulkRequest.numberOfActions() == 0) {
log.warn("Nenhuma ação de atualização válida foi adicionada ao lote para o índice {}.", indexName);
return false;
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item : bulkResponse.getItems()) {
log.error("Falha na atualização em lote (ID: {}): {}", item.getId(), item.getFailureMessage());
}
return false;
} else {
log.info("Lote de {} documentos atualizados com sucesso no índice {}. Criados: {}, Atualizados: {}",
bulkResponse.getItems().length, indexName,
countResultType(bulkResponse, DocWriteResponse.Result.CREATED),
countResultType(bulkResponse, DocWriteResponse.Result.UPDATED));
return true;
}
}
/**
* Exclui um documento do Elasticsearch.
* @param indexName Nome do índice.
* @param id O ID do documento a ser excluído.
* @return true se o documento foi excluído com sucesso, false se não foi encontrado ou ocorreu um erro.
*/
public boolean deleteData(String indexName, String id) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, id)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) {
log.warn("Documento com ID {} não encontrado no índice {}.", id, indexName);
return false;
} else {
log.info("Documento com ID {} excluído com sucesso do índice {}.", id, indexName);
return true;
}
} catch (IOException e) {
log.error("Erro ao excluir documento ID {} do índice {}: {}", id, indexName, e.getMessage());
return false;
}
}
/**
* Exclui múltiplos documentos em lote.
* @param indexName Nome do índice.
* @param ids Lista de IDs dos documentos a serem excluídos.
* @return true se a operação em lote foi bem-sucedida, false caso contrário.
*/
public boolean deleteDataBatch(String indexName, List<String> ids) {
if (CollectionUtils.isEmpty(ids)) {
log.warn("Nenhum ID fornecido para exclusão em lote no índice {}.", indexName);
return false;
}
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (String id : ids) {
bulkRequest.add(new DeleteRequest(indexName, id));
}
try {
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.getFailure() != null) {
log.error("Falha na exclusão em lote (ID: {}): {}", item.getId(), item.getFailureMessage());
}
}
return false;
} else {
log.info("Lote de {} documentos excluídos com sucesso do índice {}.", ids.size(), indexName);
return true;
}
} catch (IOException e) {
log.error("Erro na exclusão em lote do índice {}: {}", indexName, e.getMessage());
return false;
}
}
/**
* Exclui todos os documentos de um índice.
* @param indexName Nome do índice.
* @return true se a operação foi bem-sucedida, false caso contrário.
*/
public boolean deleteAll(String indexName) {
if (!isIndexExists(indexName)) {
log.warn("Índice {} não existe, exclusão de todos os documentos não necessária.", indexName);
return false;
}
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName)
.setQuery(org.elasticsearch.index.query.QueryBuilders.matchAllQuery())
.setConflicts("proceed") // Continua mesmo se houver conflitos
.setRefreshPerGroup(true) // Refresca após cada grupo de documentos
.setBatchSize(10000); // Processa em lotes de 10000
try {
BulkByScrollResponse bulkResponse = restHighLevelClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
log.info("Todos os {} documentos do índice {} foram excluídos. Tempo: {}ms",
bulkResponse.getTotal(), indexName, bulkResponse.getTook().getMillis());
return true;
} catch (IOException e) {
log.error("Erro ao excluir todos os documentos do índice {}: {}", indexName, e.getMessage());
return false;
}
}
/**
* Exclui um índice inteiro do Elasticsearch.
* @param indexName Nome do índice a ser excluído.
* @return true se o índice foi excluído com sucesso, false caso contrário.
*/
public boolean deleteIndex(String indexName) {
if (!isIndexExists(indexName)) {
log.warn("Índice {} não existe, exclusão não necessária.", indexName);
return false;
}
try {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
// Opções para expandir nomes de índices (LENIENT_EXPAND_OPEN é comum)
deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse response = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("Índice {} excluído com sucesso.", indexName);
return true;
} else {
log.error("Falha ao excluir o índice {}.", indexName);
return false;
}
} catch (IOException e) {
log.error("Erro ao excluir o índice {}: {}", indexName, e.getMessage());
return false;
}
}
/**
* Busca um documento pelo ID.
* @param indexName Nome do índice.
* @param id O ID do documento.
* @return O documento em formato JSON string, ou null se não encontrado ou erro.
*/
public String getDataById(String indexName, String id) {
if (!isIndexExists(indexName)) {
log.warn("Índice {} não encontrado. Impossível buscar documento {}.", indexName, id);
return null;
}
GetRequest getRequest = new GetRequest(indexName, id);
try {
GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
if (response.isExists()) {
return response.getSourceAsString();
} else {
log.warn("Documento com ID {} não encontrado no índice {}.", id, indexName);
return null;
}
} catch (IOException e) {
log.error("Erro ao buscar documento ID {} do índice {}: {}", id, indexName, e.getMessage());
return null;
}
}
// Métodos assíncronos (exemplo)
/**
* Salva dados em lote de forma assíncrona.
* @param indexName Nome do índice.
* @param jsonDataList Lista de documentos em formato JSON.
* @return true se a solicitação assíncrona foi enviada, false caso contrário.
*/
public boolean saveDataBatchAsync(String indexName, String jsonDataList) {
if (!StringUtils.hasText(jsonDataList)) {
log.warn("Nenhum dado fornecido para o lote assíncrono no índice {}.", indexName);
return false;
}
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); // Menos agressivo para assíncrono
JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
if (jsonArray == null || jsonArray.isEmpty()) {
log.warn("Array JSON inválido ou vazio fornecido para o lote assíncrono no índice {}.", indexName);
return false;
}
for (Object obj : jsonArray) {
if (obj instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) obj;
String id = jsonObject.getString(PRIMARY_KEY_NAME);
if (id == null) {
log.error("Documento no lote assíncrono do índice {} não possui o campo chave '{}'. Documento: {}", indexName, PRIMARY_KEY_NAME, jsonObject.toJSONString());
continue;
}
IndexRequest indexRequest = new IndexRequest(indexName)
.id(id)
.source(jsonObject.toJSONString(), XContentType.JSON);
bulkRequest.add(indexRequest);
} else {
log.warn("Elemento inválido encontrado no array JSON do lote assíncrono: {}", obj);
}
}
if (bulkRequest.numberOfActions() == 0) {
log.warn("Nenhuma ação de indexação válida foi adicionada ao lote assíncrono para o índice {}.", indexName);
return false;
}
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
log.error("Falha no item do lote assíncrono (ID: {}): {}", item.getId(), item.getFailureMessage());
}
} else {
log.info("Lote assíncrono processado com sucesso no índice {}. Itens: {}", indexName, response.getItems().length);
}
}
@Override
public void onFailure(Exception e) {
log.error("Erro no processamento do lote assíncrono para o índice {}: {}", indexName, e.getMessage());
}
};
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
log.info("Solicitação de processamento em lote assíncrono enviada para o índice {}.", indexName);
return true;
}
}
Utilitário de Gerenciamento de Índices (FieldUtil.java)
Uma classe para mapear nomes de índices a classes de entidades específicas, permitindo que o utilitário Elasticsearch saiba qual mapeamento usar.
import java.util.HashMap;
import java.util.Map;
public class FieldUtil {
public static final String MESSAGE_INDEX = "message_index"; // Exemplo de nome de índice
// Mapa para associar nomes de índices a classes de entidades
private static final Map<String, Class<?>> indexClassMap = new HashMap<>(16);
// Bloco estático para inicializar o mapa
static {
// Associe o nome do índice à sua classe de entidade correspondente
indexClassMap.put(MESSAGE_INDEX, MessageEntity.class); // Substitua MessageEntity pela sua classe real
// Adicione outras associações conforme necessário
// indexClassMap.put("articles", Article.class);
// indexClassMap.put("orders", Order.class);
}
/**
* Obtém a classe de entidade associada a um determinado nome de índice.
* @param indexName O nome do índice.
* @return A classe de entidade associada ou Object.class se não for encontrada.
*/
public static Class<?> getClassForIndex(final String indexName) {
return indexClassMap.getOrDefault(indexName, Object.class);
}
}
// Exemplo de classe de entidade (substitua pela sua implementação)
class MessageEntity {
// Defina os campos aqui com as anotações @FieldInfo apropriadas
@FieldInfo(type = "keyword")
private String id;
@FieldInfo(type = "text", participle = 3) // Exemplo com analisador customizado
private String content;
@FieldInfo(type = "date")
private java.util.Date timestamp;
// ... outros campos
}
Exemplo de Controle REST (ElasticsearchController.java)
Exemplo de um controller Spring Boot que expõe endpoints para interagir com as funcionalidades do Elasticsearch.
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.List;
@RestController
@RequestMapping("/api/es")
@Api(tags = "API de Gerenciamento Elasticsearch")
public class ElasticsearchController {
@Autowired
private ElasticsearchUtil elasticsearchUtil;
@PostMapping("/index/{indexName}")
@ApiOperation("Cria ou atualiza um índice Elasticsearch.")
public ResponseEntity<?> createOrUpdateIndex(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "Alias do índice", required = true) @RequestParam String alias,
@ApiParam(value = "Classe de entidade para mapeamento (usará FieldUtil)", required = true) @RequestParam String entityClassKey) {
Class<?> entityClass = FieldUtil.getClassForIndex(entityClassKey);
try {
boolean success = elasticsearchUtil.createOrUpdateIndex(indexName, alias, entityClass);
return success ? ResponseEntity.ok("Índice criado/atualizado com sucesso.") : ResponseEntity.internalServerError().body("Falha ao criar/atualizar índice.");
} catch (IOException e) {
return ResponseEntity.internalServerError().body("Erro de I/O ao criar/atualizar índice: " + e.getMessage());
}
}
@PostMapping("/doc/{indexName}")
@ApiOperation("Salva ou atualiza um único documento.")
public ResponseEntity<?> saveDocument(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "ID do documento", required = true) @RequestParam String id,
@ApiParam(value = "Documento em formato JSON", required = true) @RequestBody String jsonData) {
try {
boolean success = elasticsearchUtil.saveData(indexName, id, jsonData);
return success ? ResponseEntity.ok("Documento salvo/atualizado com sucesso.") : ResponseEntity.internalServerError().body("Falha ao salvar/atualizar documento.");
} catch (IOException e) {
return ResponseEntity.internalServerError().body("Erro de I/O ao salvar documento: " + e.getMessage());
}
}
@PostMapping("/docs/batch/{indexName}")
@ApiOperation("Salva ou atualiza múltiplos documentos em lote (síncrono).")
public ResponseEntity<?> saveDocumentsBatch(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "Lista de documentos em formato JSON array", required = true) @RequestBody String jsonDataList) {
try {
boolean success = elasticsearchUtil.saveDataBatch(indexName, jsonDataList);
return success ? ResponseEntity.ok("Documentos salvos/atualizados em lote com sucesso.") : ResponseEntity.internalServerError().body("Falha ao salvar/atualizar documentos em lote.");
} catch (IOException e) {
return ResponseEntity.internalServerError().body("Erro de I/O ao salvar documentos em lote: " + e.getMessage());
}
}
@PostMapping("/docs/batch/async/{indexName}")
@ApiOperation("Salva ou atualiza múltiplos documentos em lote (assíncrono).")
public ResponseEntity<?> saveDocumentsBatchAsync(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "Lista de documentos em formato JSON array", required = true) @RequestBody String jsonDataList) {
boolean success = elasticsearchUtil.saveDataBatchAsync(indexName, jsonDataList);
return success ? ResponseEntity.ok("Solicitação de salvamento em lote assíncrono enviada.") : ResponseEntity.internalServerError().body("Falha ao enviar solicitação de salvamento em lote assíncrono.");
}
@PutMapping("/doc/{indexName}")
@ApiOperation("Atualiza um documento (ou cria se não existir).")
public ResponseEntity<?> updateDocument(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "ID do documento", required = true) @RequestParam String id,
@ApiParam(value = "Campos para atualizar em formato JSON", required = true) @RequestBody String updateJson) {
boolean success = elasticsearchUtil.updateData(indexName, id, updateJson);
return success ? ResponseEntity.ok("Documento atualizado/criado com sucesso.") : ResponseEntity.internalServerError().body("Falha ao atualizar/criar documento.");
}
@PutMapping("/docs/batch/{indexName}")
@ApiOperation("Atualiza múltiplos documentos em lote (cria se não existir).")
public ResponseEntity<?> updateDocumentsBatch(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "Lista de documentos para atualizar em formato JSON array", required = true) @RequestBody String jsonDataList) {
try {
boolean success = elasticsearchUtil.updateDataBatch(indexName, jsonDataList);
return success ? ResponseEntity.ok("Documentos atualizados/criados em lote com sucesso.") : ResponseEntity.internalServerError().body("Falha ao atualizar/criar documentos em lote.");
} catch (IOException e) {
return ResponseEntity.internalServerError().body("Erro de I/O ao atualizar documentos em lote: " + e.getMessage());
}
}
@DeleteMapping("/doc/{indexName}/{id}")
@ApiOperation("Exclui um único documento.")
public ResponseEntity<?> deleteDocument(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "ID do documento a excluir", required = true) @PathVariable String id) {
boolean success = elasticsearchUtil.deleteData(indexName, id);
return success ? ResponseEntity.ok("Documento excluído com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir documento (pode não ter sido encontrado).");
}
@DeleteMapping("/docs/batch/{indexName}")
@ApiOperation("Exclui múltiplos documentos em lote.")
public ResponseEntity<?> deleteDocumentsBatch(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "Lista de IDs a excluir", required = true) @RequestBody List<String> ids) {
boolean success = elasticsearchUtil.deleteDataBatch(indexName, ids);
return success ? ResponseEntity.ok("Documentos excluídos em lote com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir documentos em lote.");
}
@DeleteMapping("/index/{indexName}")
@ApiOperation("Exclui todos os documentos de um índice (mantém o índice).")
public ResponseEntity<?> deleteAllDocumentsFromIndex(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName) {
boolean success = elasticsearchUtil.deleteAll(indexName);
return success ? ResponseEntity.ok("Todos os documentos do índice excluídos com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir todos os documentos do índice.");
}
@DeleteMapping("/manage/index/{indexName}")
@ApiOperation("Exclui um índice inteiro (incluindo mapeamento e dados).")
public ResponseEntity<?> deleteIndex(
@ApiParam(value = "Nome do índice a excluir", required = true) @PathVariable String indexName) {
boolean success = elasticsearchUtil.deleteIndex(indexName);
return success ? ResponseEntity.ok("Índice excluído com sucesso.") : ResponseEntity.internalServerError().body("Falha ao excluir índice.");
}
@GetMapping("/doc/{indexName}/{id}")
@ApiOperation("Busca um documento pelo ID.")
public ResponseEntity<?> getDocumentById(
@ApiParam(value = "Nome do índice", required = true) @PathVariable String indexName,
@ApiParam(value = "ID do documento", required = true) @PathVariable String id) {
String document = elasticsearchUtil.getDataById(indexName, id);
return document != null ? ResponseEntity.ok(document) : ResponseEntity.notFound().build();
}
// Adicione mais endpoints conforme necessário para buscas complexas, agregações, etc.
}