在Elasticsearch7.15版本之后,Elasticsearch官方将它的高级客户端RestHighLevelClient标记为弃用状态。同时推出了全新的Java API客户端Elasticsearch Java API Client,该客户端也将在Elasticsearch8.0及以后版本中成为官方推荐使用的客户端。
Elasticsearch Java API Client 支持除 Vector tile search API 和 Find structure API 之外的所有 Elasticsearch API。且支持所有API数据类型,并且不再有原始JsonValue属性。它是针对Elasticsearch8.0及之后版本的客户端,目前Elasticsearch已经更新至8.9.0,所以我们需要学习新的Elasticsearch Java API Client的使用方法。
具体es在windows下的安装部署,这里不再赘述,请自行百度或者看我这篇文章ELK在windows下的安装部署
代码仓库gitee地址: elasticsearch: elasticsearch在springboot中集成案列
对您有帮助的话,请点赞支持下,创作不易!
1.引入es相关依赖
<!-- Elasticsearch8.1版本(Java API Client)--> <dependency> <groupId>co.elastic.clients</groupId> <artifactId>elasticsearch-java</artifactId> <version>8.1.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.13.4.1</version> </dependency>
2.bootstrap.yml配置(我用的spring cloud )
server: port: 9999 spring: elasticsearch: uris: localhost:9200
3.es的客户端配置类(提供了同步和异步两种客户端,可根据实际需求使用)
package com.example.es.config;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.Setter;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
@Configuration
public class ESClientConfig {
/**
* 多个IP逗号隔开
*/
@Setter
@Value("${spring.elasticsearch.uris}")
private String hosts;
//超时时间设置
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 10000;
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 300000;
public static final int DEFAULT_CONNECT_REQUEST_TIMEOUT_MILLIS = 1000;
/**
* 同步方式
*
*/
@Bean
public ElasticsearchClient elasticsearchClient() {
return new ElasticsearchClient(getTransport());
}
/**
* 异步方式
*
*/
@Bean
public ElasticsearchAsyncClient elasticsearchAsyncClient() {
return new ElasticsearchAsyncClient(getTransport());
}
/**
* 获取客户端 RestClientTransport
*/
private RestClientTransport getTransport(){
HttpHost[] httpHosts = toHttpHost();
RestClient restClient = getRestClient(httpHosts);
return new RestClientTransport(restClient, new JacksonJsonpMapper());
}
/**
* 获取客户端RestClient
* @param httpHosts http数组
*/
private RestClient getRestClient(HttpHost[] httpHosts){
return RestClient.builder(httpHosts).setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS);
requestConfigBuilder.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
requestConfigBuilder.setConnectionRequestTimeout(DEFAULT_CONNECT_REQUEST_TIMEOUT_MILLIS);
return requestConfigBuilder;
}).build();
}
/**
* 解析配置的字符串hosts,转为HttpHost对象数组
*/
private HttpHost[] toHttpHost() {
if (!StringUtils.hasLength(hosts)) {
throw new RuntimeException("invalid elasticsearch configuration. elasticsearch.hosts不能为空!");
}
// 多个IP逗号隔开
String[] hostArray = hosts.split(",");
HttpHost[] httpHosts = new HttpHost[hostArray.length];
HttpHost httpHost;
for (int i = 0; i < hostArray.length; i++) {
String[] strings = hostArray[i].split(":");
httpHost = new HttpHost(strings[0], Integer.parseInt(strings[1]), "http");
httpHosts[i] = httpHost;
}
return httpHosts;
}
}
4.测试索引的增删改查(JUnit)
package com.example.es; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.indices.CreateIndexResponse; import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse; import co.elastic.clients.elasticsearch.indices.GetIndexResponse; import co.elastic.clients.transport.endpoints.BooleanResponse; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; /** * es客户端配置集成是否成功测试 */ @RunWith(SpringRunner.class) @SpringBootTest public class ESClientConfigTest { @Autowired private ElasticsearchClient client; /** * 创建索引 * */ @Test public void createIndex() throws IOException { CreateIndexResponse products = client.indices().create(c -> c.index("uservo")); System.out.println(products.acknowledged()); } /** * 查询创建的索引 */ @Test public void queryIndex() throws IOException { GetIndexResponse getIndexResponse = client.indices().get(c->c.index("uservo")); System.out.println(getIndexResponse); } /** * 判断索引是否存在 * */ @Test public void indexExi() throws IOException { BooleanResponse exists = client.indices().exists(e -> e.index("uservo")); System.out.println(exists.value()); } /** * 删除索引 */ @Test public void deleteIndex() throws IOException{ DeleteIndexResponse deleteIndexResponse = client.indices().delete(c->c.index("uservo")); System.out.println(deleteIndexResponse.acknowledged()); } }
5.基本给公用的es服务接口
package com.example.es.service;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* 公用es服务接口
*/
public interface ESDocumentService {
/**
* 新增一个文档,此种方式若发现没有索引,会自动创建一个索引
* @param idxName 索引名
* @param idxId 索引id
* @param document 文档对象
*/
<T> IndexResponse createByFluentDSL(String idxName, String idxId, T document) throws Exception;
/**
* 新增一个文档,此种方式若发现没有索引,会自动创建一个索引
* @param idxName 索引名
* @param idxId 索引id
* @param document 文档对象
*/
<T> IndexResponse createByBuilderPattern(String idxName, String idxId, T document) throws Exception;
/**
* 用JSON字符串创建文档,此种方式若发现没有索引,会自动创建一个索引
* @param idxName 索引名
* @param idxId 索引id
* @param jsonContent json字符串
*/
IndexResponse createByJson(String idxName, String idxId, String jsonContent) throws Exception;
/**
* 异步新增文档,此种方式若发现没有索引,会自动创建一个索引
* @param idxName 索引名
* @param idxId 索引id
* @param document 文档
* @param action 操作
*/
<T> void createAsync(String idxName, String idxId, T document, BiConsumer<IndexResponse, Throwable> action);
/**
* 批量增加文档
* @param idxName 索引名
* @param documents 要增加的对象集合
* @return 批量操作的结果
*/
<T> BulkResponse bulkCreate(String idxName, List<T> documents) throws Exception;
/**
* 根据文档id查找文档
* @param idxName 索引名
* @param docId 文档id
* @return Object类型的查找结果
*/
<T> T getById(String idxName, String docId ,Class<T> tClass) throws IOException;
/**
*
* @param idxName 索引名称
* @param docId 文档id
* @param tClass 返回的类型
* @param map 修改内容的map
*/
<T> Result updateById(String idxName, String docId, Class<T> tClass, Map<String,Object> map) throws IOException;
/**
* 根据文档id查找文档,返回类型是ObjectNode
* @param idxName 索引名
* @param docId 文档id
* @return ObjectNode类型的查找结果
*/
ObjectNode getObjectNodeById(String idxName, String docId) throws IOException;
/**
* 根据文档id删除文档
* @param idxName 索引名
* @param docId 文档id
* @return Object类型的查找结果
*/
Boolean deleteById(String idxName, String docId) throws IOException;
/**
* 批量删除文档
* @param idxName 索引名
* @param docIds 要删除的文档id集合
*/
BulkResponse bulkDeleteByIds(String idxName, List<String> docIds) throws Exception;
}
6.es服务实现类
package com.example.es.service;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.*;
import com.example.es.entity.ESDocument;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
@Slf4j
@Service
@RequiredArgsConstructor
public class ESDocumentServiceImpl implements ESDocumentService {
//同步客户端
private final ElasticsearchClient elasticsearchClient;
// 异步客户端
private final ElasticsearchAsyncClient elasticsearchAsyncClient;
@Override
public <T> IndexResponse createByFluentDSL(String idxName, String idxId, T document) throws Exception {
return elasticsearchClient.index(idx -> idx
.index(idxName)
.id(idxId)
.document(document));
}
/**
* BuilderPattern 方式创建文档
* @param idxName 索引名
* @param idxId 索引id
* @param document 文档对象
*/
@Override
public <T> IndexResponse createByBuilderPattern(String idxName, String idxId, T document) throws Exception {
IndexRequest.Builder<Object> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index(idxName);
indexReqBuilder.id(idxId);
indexReqBuilder.document(document);
return elasticsearchClient.index(indexReqBuilder.build());
}
/**
* json方式创建文档
* @param idxName 索引名
* @param idxId 索引id
* @param jsonContent json字符串
*/
@Override
public IndexResponse createByJson(String idxName, String idxId, String jsonContent) throws Exception {
return elasticsearchClient.index(i -> i
.index(idxName)
.id(idxId)
.withJson(new StringReader(jsonContent))
);
}
/**
* 异步方式创建文档
* @param idxName 索引名
* @param idxId 索引id
* @param document 文档
* @param action 操作
*/
@Override
public <T> void createAsync(String idxName, String idxId, T document, BiConsumer<IndexResponse, Throwable> action) {
elasticsearchAsyncClient.index(idx -> idx
.index(idxName)
.id(idxId)
.document(document)
).whenComplete(action);
}
/**
* 批量方式创建文档
* @param idxName 索引名
* @param documents 要增加的对象集合
*/
@Override
public <T> BulkResponse bulkCreate(String idxName, List<T> documents) throws Exception {
BulkRequest.Builder br = new BulkRequest.Builder();
documents.forEach(document ->{
ESDocument esDocument = (ESDocument) document;
br.operations(op -> op.index(idx -> idx
.index(idxName)
.id(esDocument.getId().toString())
.document(esDocument)));
});
return elasticsearchClient.bulk(br.build());
}
/**
*
* @param idxName 索引名称
* @param docId 文档id
* @param tClass 返回的类型
* @param map 修改内容的map
* Map<String, Object> map = new HashMap<>();
* map.put("age", 35);
* 把年龄改成35
*/
@Override
public <T> Result updateById(String idxName, String docId, Class<T> tClass, Map<String,Object> map) throws IOException {
UpdateResponse<T> response = elasticsearchClient.update(e -> e.index(idxName).id(docId).doc(map), tClass);
return response.result();
}
/**
* 文档id查询信息
* @param idxName 索引名
* @param docId 文档id
*/
@Override
public <T> T getById(String idxName, String docId,Class<T> tClass) throws IOException {
GetResponse<T> response = elasticsearchClient.get(g -> g
.index(idxName)
.id(docId),
tClass);
return response.found() ? response.source() : null;
}
/**
* 根据索引名称和文档id查询ObjectNode
* @param idxName 索引名
* @param docId 文档id
*/
@Override
public ObjectNode getObjectNodeById(String idxName, String docId) throws IOException {
GetResponse<ObjectNode> response = elasticsearchClient.get(g -> g
.index(idxName)
.id(docId),
ObjectNode.class);
return response.found() ? response.source() : null;
}
/**
* 单条输出
* @param idxName 索引名
* @param docId 文档id
*/
@Override
public Boolean deleteById(String idxName, String docId) throws IOException {
DeleteResponse delete = elasticsearchClient.delete(d -> d
.index(idxName)
.id(docId));
return delete.forcedRefresh();
}
/**
* 批量删除
* @param idxName 索引名
* @param docIds 要删除的文档id集合
*/
@Override
public BulkResponse bulkDeleteByIds(String idxName, List<String> docIds) throws Exception {
BulkRequest.Builder br = new BulkRequest.Builder();
// 将每一个对象都放入builder中
docIds.forEach(id -> br
.operations(op -> op
.delete(d -> d
.index(idxName)
.id(id))));
return elasticsearchClient.bulk(br.build());
}
}
7.ESDocument基类
package com.example.es.entity;
import lombok.Data;
/**
* @author hulei
* @date 2023/8/26 19:11
*/
@Data
public class ESDocument {
public Long id;
}
8.测试实体类,用于生成文档
package com.example.es.entity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* @author hulei
* @date 2023/8/26 18:37
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class UserVo extends ESDocument {
private String userName;
private int age;
private String email;
private int version;
private Double height;
private Date createTime;
private Date updateTime;
}
9.controller层调用类
package com.example.es.controller;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.elasticsearch.core.search.TrackHits;
import com.alibaba.fastjson.JSON;
import com.example.es.entity.PageData;
import com.example.es.entity.UserVo;
import com.example.es.service.ESDocumentService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
/**
* elasticsearch 测试
* @author hulei
* @date 2023/8/27 16:27
*/
@RestController
@RequestMapping("/ES")
@RequiredArgsConstructor
public class ESController {
private final ESDocumentService documentDemoService;
private final ElasticsearchClient elasticsearchClient;
private final static String INDEX_NAME = "uservo";
@GetMapping("/getById")
public UserVo getById(@RequestParam("id") long id) throws Exception {
UserVo userVo = documentDemoService.getById(INDEX_NAME, Long.toString(id), UserVo.class);
System.out.println("object ->" + userVo);
return userVo;
}
@PostMapping("/updateById")
public Result updateById(@RequestParam("id") String id) throws IOException {
Map<String,Object> map = new HashMap<>(){{
put("version",288765);
}};
return documentDemoService.updateById(INDEX_NAME,id,UserVo.class,map);
}
/**
* 排序查询,范围查询,浅分页查询
*/
@GetMapping("/searchRequest")
public Object searchRequest(@RequestParam("userName") String userName) throws IOException {
List<UserVo> userList = new ArrayList<>();
// 构建查询条件列表
List<Query> queryList = new ArrayList<>();
// MatchPhrasePrefixQuery
// MatchPhraseQuery精确匹配
//此方法可以实现字段的模糊查询,其他MatchPhraseQuery,MatchAllQuery,MatchQuery,MatchBoolPrefixQuery,MatchNoneQuery查询的具体用法含义请自行百度
Query byName = MatchPhrasePrefixQuery.of(m-> m
.field("userName")
.query(userName))._toQuery();
queryList.add(byName);
/*
RangeQuery 年龄大小范围查询
Query ageStart = RangeQuery.of(r -> r
.field("age")
.gte(JsonData.of(10))
)._toQuery();
<p>
Query ageEnd = RangeQuery.of(r -> r
.field("age")
.lte(JsonData.of(90))
)._toQuery();
queryList.add(ageStart);
queryList.add(ageEnd);
*/
//查询后显示的字段,写几个显示几个
String[] sources = new String[]{"userName","age","id","email","version","height","createTime","updateTime"};
SearchResponse<UserVo> response = elasticsearchClient.search(s -> s
.index(INDEX_NAME)
.query(q -> q.bool(b -> b.must(queryList)))
.sort(sort -> sort.field(f -> f.field("id").order(SortOrder.Asc)))
.source(sc -> sc
.filter(f -> f
.includes(Arrays.asList(sources))
)
)
//分页,注意大数据量的查询用此不合适
.from(0)
.size(10000)//注意此种方式最多查10000条,超过就报错
,UserVo.class
);
List<Hit<UserVo>> hits = response.hits().hits();
for (Hit<UserVo> hit: hits) {
userList.add(hit.source());
}
return userList;
}
/**
* 查询所有,但是只能查询前一万条数据(我造了一亿数据)
*/
@GetMapping("/queryAll")
@SuppressWarnings("unchecked")
public <T> PageData<T> queryAll() throws IOException {
List<T> userList = new LinkedList<>();
SearchResponse<UserVo> searchResponse = elasticsearchClient.search(searchRequestBuilder ->{
searchRequestBuilder
.index(INDEX_NAME)
.sort(sort -> sort.field(f -> f.field("id").order(SortOrder.Asc)))
.from(0)
.size(10000)//注意此种方式最多查10000条,超过就报错,如果不写from,size就默认只查10条
;
return searchRequestBuilder;
} , UserVo.class);
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
HitsMetadata<UserVo> hitsMetadata = searchResponse.hits();
System.out.println(hitsMetadata.total());
assert hitsMetadata.total() != null;
System.out.println("符合条件的总文档数量:" + hitsMetadata.total().value());
List<Hit<UserVo>> hitList = searchResponse.hits().hits(); //注意:第一个hits() 与 第二个hits()的区别
for (Hit<UserVo> hit : hitList) {
assert hit.source() != null;
userList.add((T) hit.source());
}
return new PageData<>(userList, hitsMetadata.total().value());
}
@GetMapping("/fuzzQuery")
@SuppressWarnings("unchecked")
public <T> PageData<T> fuzzQuery(@RequestParam("userName") String userName) throws IOException {
List<T> userList = new LinkedList<>();
//模糊查询
SearchResponse<UserVo> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index(INDEX_NAME)
// 模糊查询Fuzzy
.query(queryBuilder -> queryBuilder
.fuzzy(fuzzyQueryBuilder -> fuzzyQueryBuilder
.field("userName")
.value(userName)
.fuzziness("2"))//fuzziness代表可以与关键词有误差的字数,可选值为0、1、2这三项
)
.from(0)
.size(10000)
, UserVo.class);
HitsMetadata<UserVo> hitsMetadata = searchResponse.hits();
List<Hit<UserVo>> hitList = hitsMetadata.hits(); //注意:第一个hits() 与 第二个hits()的区别
for (Hit<UserVo> hit : hitList) {
assert hit.source() != null;
userList.add((T) hit.source());
}
assert hitsMetadata.total() != null;
return new PageData<>(userList, hitsMetadata.total().value());
}
/**
* 批量新增数据(这里手动模拟插入了一亿条数据,点了很多次,每次生成3000000条数据,多了会报错
* start范围从1,3000001,6000001,9000001,12000001,15000001,18000001,21000001,24000001,27000001,30000001,33000001,36000001,39000001,42000001,
* 45000001,48000001,51000001,54000001,57000001,60000001,63000001,66000001,69000001,72000001,75000001,78000001,81000001,84000001,87000001,
* 90000001,93000001,96000001,99000001
* 开始每次增加300万
* 好测试数据查询速度)
*/
@PostMapping("/bulkCreate")
public void bulkCreate(@RequestParam("start") int start) {
List<CompletableFuture<Void>> futureList = new ArrayList<>();
for (int j = 1; j <= 10 ; j++){
// 构造文档集合
int finalJ = j;
int finalJ1 = j;
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
List<Object> list = new ArrayList<>();
for (int i = start+300000*(finalJ1 -1); i <= (start == 1 ? 0 : start-1)+300000 * finalJ; i++) {
UserVo userVO = new UserVo();
userVO.setId((long) i);
userVO.setUserName("胡磊batch" + i);
userVO.setAge(i);
userVO.setEmail("ss.com");
userVO.setCreateTime(new Date());
userVO.setUpdateTime(new Date());
userVO.setHeight(12D);
list.add(userVO);
}
// 批量新增
BulkResponse response;
try {
response = documentDemoService.bulkCreate(INDEX_NAME, list);
} catch (Exception e) {
throw new RuntimeException(e);
}
List<BulkResponseItem> items = response.items();
for (BulkResponseItem item : items) {
System.out.println("BulkResponseItem.toString() -> " + item.toString());
}
});
futureList.add(future);
}
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
}
/**
* 此种方式若发现没有索引,会自动创建一个索引
*/
@PostMapping("/createByFluentDSL")
public void createByFluentDSL() throws Exception {
// 构建文档数据
UserVo userVO = new UserVo();
userVO.setId(1L);
userVO.setUserName("胡磊batch1");
userVO.setAge(29);
userVO.setCreateTime(new Date());
userVO.setUpdateTime(new Date());
userVO.setEmail("ss.com");
userVO.setVersion(1);
userVO.setHeight(12D);
// 新增一个文档
IndexResponse response = documentDemoService.createByFluentDSL(INDEX_NAME, userVO.getId().toString(), userVO);
System.out.println("response.forcedRefresh() -> " + response.forcedRefresh());
System.out.println("response.toString() -> " + response);
}
@PostMapping("/createByJson")
public void createByJson() throws Exception {
// 构建文档数据
UserVo userVO = new UserVo();
userVO.setId(1L);
userVO.setUserName("胡磊batch1");
userVO.setAge(29);
userVO.setCreateTime(new Date());
userVO.setUpdateTime(new Date());
userVO.setEmail("ss.com");
userVO.setVersion(1);
userVO.setHeight(12D);
// 新增一个文档
IndexResponse response = documentDemoService.createByJson(INDEX_NAME, userVO.getId().toString(), JSON.toJSONString(userVO));
System.out.println("response.toString() -> " + response.toString());
}
@PostMapping("/createAsync")
public void createAsync() {
// 构建文档数据
UserVo userVO = new UserVo();
userVO.setId(1L);
userVO.setUserName("胡磊batch1");
userVO.setAge(29);
userVO.setCreateTime(new Date());
userVO.setUpdateTime(new Date());
userVO.setEmail("ss.com");
userVO.setVersion(1);
userVO.setHeight(12D);
documentDemoService.createAsync(INDEX_NAME, userVO.getId().toString(), userVO, (indexResponse, throwable) -> {
// throwable必须为空
// 验证结果
System.out.println("response.toString() -> " + indexResponse.toString());
});
}
}
版权归原作者 熊出没 所有, 如有侵权,请联系我们删除。