0


springboot 2.0集成elasticsearch 7.6.2(集群)

小伙伴们,你们好呀,我是老寇,跟我一起学习es 7.6.2

注:请点击我,获取源码

一、引入依赖配置pom.xml

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>io.springfox</groupId>
  7. <artifactId>springfox-swagger2</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>io.springfox</groupId>
  11. <artifactId>springfox-swagger-ui</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.elasticsearch.client</groupId>
  15. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  16. <version>7.6.2</version>
  17. <exclusions>
  18. <exclusion>
  19. <groupId>org.elasticsearch</groupId>
  20. <artifactId>elasticsearch</artifactId>
  21. </exclusion>
  22. <exclusion>
  23. <groupId>org.elasticsearch.client</groupId>
  24. <artifactId>elasticsearch-rest-client</artifactId>
  25. </exclusion>
  26. </exclusions>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.elasticsearch.client</groupId>
  30. <artifactId>elasticsearch-rest-client</artifactId>
  31. <version>7.6.2</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.elasticsearch</groupId>
  35. <artifactId>elasticsearch</artifactId>
  36. <version>7.6.2</version>
  37. </dependency>

二、配置application-dev.yml(生产就克隆application-dev改成生产配置)

  1. elasticsearch:
  2. host: 192.168.1.1:9200,192.1.2.133:9200,192.168.1.3:9200
  3. cluster-name: laokou-elasticsearch
  4. username:
  5. password:
  6. synonym:
  7. path: http://192.168.1.1:9048/laokou-service/synonym

问题思考:比如说,一条文章记录,它有标题,内容,阅读量,在数据存入es时,我需要对es配置分词器,并且能够通过阅读量来筛选数据,你怎么做?

三、配置ES注解

注解可以修饰属性或方法(前提是先配置)

type > 需要在es配置什么类型

participle > 需要配置什么分词器

  1. /**
  2. * @author Kou Shenhai
  3. */
  4. @Target({ElementType.FIELD,ElementType.METHOD})
  5. @Retention(RetentionPolicy.RUNTIME)
  6. @Documented
  7. @Inherited
  8. public @interface FieldInfo {
  9. /**
  10. * 默认 keyword
  11. * @return
  12. */
  13. String type() default "keyword";
  14. /**
  15. * 0 not_analyzed 1 ik_smart 2.ik_max_word 3.ik-index(自定义分词器)
  16. * @return
  17. */
  18. int participle() default 0;
  19. }

拼接属性对应的类型及分词器

  1. /**
  2. * 属性、类型、分词器
  3. * @author Kou Shenhai 2413176044@leimingtech.com
  4. * @version 1.0
  5. * @date 2021/2/9 0009 上午 10:20
  6. */
  7. @Data
  8. @NoArgsConstructor
  9. public class FieldMapping {
  10. private String field;
  11. private String type;
  12. private Integer participle;
  13. public FieldMapping(String field, String type, Integer participle) {
  14. this.field = field;
  15. this.type = type;
  16. this.participle = participle;
  17. }
  18. }

组装每个属性对应的类型及分词器 => List<FieldMapping>

  1. /**
  2. * 每个属性对应的类型及分词器
  3. * @author Kou Shenhai 2413176044@leimingtech.com
  4. * @version 1.0
  5. * @date 2021/1/24 0024 下午 7:51
  6. */
  7. @Slf4j
  8. public class FieldMappingUtil {
  9. public static List<FieldMapping> getFieldInfo(Class clazz) {
  10. return getFieldInfo(clazz, null);
  11. }
  12. public static List<FieldMapping> getFieldInfo(Class clazz, String fieldName) {
  13. //返回class中的所有字段(包括私有字段)
  14. Field[] fields = clazz.getDeclaredFields();
  15. //创建FieldMapping集合
  16. List<FieldMapping> fieldMappingList = new ArrayList<>();
  17. for (Field field : fields) {
  18. //获取字段上的FieldInfo对象
  19. boolean annotationPresent = field.isAnnotationPresent(FieldInfo.class);
  20. if (annotationPresent) {
  21. FieldInfo fieldInfo = field.getAnnotation(FieldInfo.class);
  22. //获取字段名称
  23. String name = field.getName();
  24. fieldMappingList.add(new FieldMapping(name, fieldInfo.type(), fieldInfo.participle()));
  25. } else {
  26. continue;
  27. }
  28. }
  29. return fieldMappingList;
  30. }
  31. }

四、配置es及swagger

  1. /**
  2. * es配置文件
  3. * @author Kou Shenhai 2413176044@leimingtech.com
  4. * @version 1.0
  5. * @date 2020/8/9 0009 下午 2:01
  6. */
  7. @Configuration
  8. public class ElasticsearchConfig {
  9. private static final String HTTP_SCHEME = "http";
  10. private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
  11. /**
  12. * 权限验证
  13. */
  14. final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  15. /**
  16. * es主机
  17. */
  18. @Value("${elasticsearch.host}")
  19. private String[] host;
  20. @Value("${elasticsearch.username}")
  21. private String username;
  22. @Value("${elasticsearch.password}")
  23. private String password;
  24. @Bean
  25. public RestClientBuilder restClientBuilder() {
  26. HttpHost[] hosts = Arrays.stream(host)
  27. .map(this::makeHttpHost)
  28. .filter(Objects::nonNull)
  29. .toArray(HttpHost[]::new);
  30. LOGGER.info("host:{}",Arrays.toString(hosts));
  31. //配置权限验证
  32. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
  33. return RestClient.builder(hosts).setHttpClientConfigCallback(httpClientBuilder ->
  34. httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
  35. .setMaxConnPerRoute(100)
  36. //最大连接数
  37. .setMaxConnTotal(100)
  38. ).setRequestConfigCallback(builder -> {
  39. builder.setConnectTimeout(-1);
  40. builder.setSocketTimeout(60000);
  41. builder.setConnectionRequestTimeout(-1);
  42. return builder;
  43. });
  44. }
  45. /**
  46. * 处理请求地址
  47. * @param address
  48. * @return
  49. */
  50. private HttpHost makeHttpHost(String address) {
  51. assert StringUtils.isNotEmpty(address);
  52. String[] hostAddress = address.split(":");
  53. if (hostAddress.length == 2) {
  54. String ip = hostAddress[0];
  55. Integer port = Integer.valueOf(hostAddress[1]);
  56. return new HttpHost(ip, port, HTTP_SCHEME);
  57. } else {
  58. return null;
  59. }
  60. }
  61. /**
  62. * 配置highLevelClient bean
  63. * @param restClientBuilder
  64. * @return
  65. */
  66. @Bean(name = "restHighLevelClient")
  67. public RestHighLevelClient restHighLevelClient(@Autowired RestClientBuilder restClientBuilder) {
  68. return new RestHighLevelClient(restClientBuilder);
  69. }
  70. }
  1. /**
  2. * @author Kou Shenhai
  3. */
  4. @Configuration
  5. @EnableSwagger2
  6. public class SwaggerConfig {
  7. @Bean
  8. public Docket createRestApi() {
  9. return new Docket(DocumentationType.SWAGGER_2)
  10. .apiInfo(apiInfo())
  11. .select()
  12. .apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class))
  13. .paths(PathSelectors.any())
  14. .build();
  15. }
  16. private ApiInfo apiInfo() {
  17. return new ApiInfoBuilder()
  18. .title("API文档")
  19. .version("2.0.0")
  20. .description("API文档 - Elasticsearch服务")
  21. //作者信息
  22. .contact(new Contact("寇申海", "https://blog.csdn.net/qq_39893313", "2413176044@qq.com"))
  23. .build();
  24. }
  25. }

五、ES工具类 (索引相关配置不懂的,请查看elasticsearch 7.6.2 - 索引管理)

  1. /**
  2. * Elasticsearch工具类-用于操作ES
  3. * @author Kou Shenhai 2413176044@leimingtech.com
  4. * @version 1.0
  5. * @date 2021/1/24 0024 下午 5:42
  6. */
  7. @Slf4j
  8. @Component
  9. public class ElasticsearchUtil {
  10. private static final String PRIMARY_KEY_NAME = "id";
  11. @Value("${elasticsearch.synonym.path}")
  12. private String synonymPath;
  13. @Autowired
  14. private RestHighLevelClient restHighLevelClient;
  15. /**
  16. * 批量同步数据到ES
  17. * @param indexName 索引名称
  18. * @param indexAlias 别名
  19. * @param jsonDataList 数据列表
  20. * @param clazz 类型
  21. * @return
  22. * @throws IOException
  23. */
  24. public boolean saveDataBatch(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException {
  25. //判空
  26. if (StringUtils.isBlank(jsonDataList)) {
  27. return false;
  28. }
  29. if (syncIndex(indexName, indexAlias, clazz)) {
  30. return false;
  31. }
  32. //3.批量操作Request
  33. BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
  34. if (bulkRequest.requests().isEmpty()) {
  35. return false;
  36. }
  37. final BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  38. if (bulk.hasFailures()) {
  39. for (BulkItemResponse item : bulk.getItems()) {
  40. log.error("索引[{}],主键[{}]更新操作失败,状态为:[{}],错误信息:{}",indexName,item.getId(),item.status(),item.getFailureMessage());
  41. }
  42. return false;
  43. }
  44. // 记录索引新增与修改数量
  45. Integer createdCount = 0;
  46. Integer updatedCount = 0;
  47. for (BulkItemResponse item : bulk.getItems()) {
  48. if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult())) {
  49. createdCount++;
  50. } else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult())){
  51. updatedCount++;
  52. }
  53. }
  54. log.info("索引[{}]批量同步更新成功,共新增[{}]个,修改[{}]个",indexName,createdCount,updatedCount);
  55. return true;
  56. }
  57. /**
  58. * ES修改数据
  59. * @param indexName 索引名称
  60. * @param id 主键
  61. * @param paramJson 参数JSON
  62. * @return
  63. */
  64. public boolean updateData(String indexName,String id,String paramJson) {
  65. UpdateRequest updateRequest = new UpdateRequest(indexName, id);
  66. //如果修改索引中不存在则进行新增
  67. updateRequest.docAsUpsert(true);
  68. //立即刷新数据
  69. updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  70. updateRequest.doc(paramJson,XContentType.JSON);
  71. try {
  72. UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
  73. log.info("索引[{}],主键:【{}】操作结果:[{}]",indexName,id,updateResponse.getResult());
  74. if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
  75. //新增
  76. log.info("索引:【{}】,主键:【{}】新增成功",indexName,id);
  77. return true;
  78. } else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
  79. //修改
  80. log.info("索引:【{}】,主键:【{}】修改成功",indexName, id);
  81. return true;
  82. } else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
  83. //无变化
  84. log.info("索引:[{}],主键:[{}]无变化",indexName, id);
  85. return true;
  86. }
  87. } catch (IOException e) {
  88. e.printStackTrace();
  89. log.error("索引:[{}],主键:【{}】,更新异常:[{}]",indexName, id,e);
  90. return false;
  91. }
  92. return false;
  93. }
  94. /**
  95. * 批量修改ES
  96. * @param indexName 索引名称
  97. * @param indexAlias 别名
  98. * @param jsonDataList 数据列表
  99. * @param clazz 类型
  100. * @return
  101. * @throws IOException
  102. */
  103. public boolean updateDataBatch(String indexName,String indexAlias, String jsonDataList,Class clazz) throws IOException {
  104. //1.创建索引
  105. boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
  106. if (!createIndexFlag) {
  107. return false;
  108. }
  109. return this.updateDataBatch(indexName,jsonDataList);
  110. }
  111. /**
  112. * 删除数据
  113. * @param indexName 索引名称
  114. * @param id 主键
  115. * @return
  116. */
  117. public boolean deleteData(String indexName,String id) {
  118. DeleteRequest deleteRequest = new DeleteRequest(indexName);
  119. deleteRequest.id(id);
  120. deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  121. try {
  122. DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
  123. if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) {
  124. log.error("索引:【{}】,主键:【{}】删除失败",indexName, id);
  125. return false;
  126. } else {
  127. log.info("索引【{}】主键【{}】删除成功",indexName, id);
  128. return true;
  129. }
  130. } catch (IOException e) {
  131. e.printStackTrace();
  132. log.error("删除索引【{}】出现异常[{}]",indexName,e);
  133. return false;
  134. }
  135. }
  136. /**
  137. * 批量删除ES
  138. * @param indexName 索引名称
  139. * @param ids id列表
  140. * @return
  141. */
  142. public boolean deleteDataBatch(String indexName,List<String> ids) {
  143. if (CollectionUtils.isEmpty(ids)) {
  144. return false;
  145. }
  146. BulkRequest bulkRequest = packBulkDeleteRequest(indexName, ids);
  147. try {
  148. BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  149. if (bulk.hasFailures()) {
  150. for (BulkItemResponse item : bulk.getItems()) {
  151. log.error("删除索引:[{}],主键:{}失败,信息:{}",indexName,item.getId(),item.getFailureMessage());
  152. }
  153. return false;
  154. }
  155. //记录索引新增与修改数量
  156. Integer deleteCount = 0;
  157. for (BulkItemResponse item : bulk.getItems()) {
  158. if (DocWriteResponse.Result.DELETED.equals(item.getResponse().getResult())) {
  159. deleteCount++;
  160. }
  161. }
  162. log.info("批量删除索引[{}]成功,共删除[{}]个",indexName,deleteCount);
  163. return true;
  164. } catch (IOException e) {
  165. e.printStackTrace();
  166. log.error("删除索引:【{}】出现异常:{}",indexName,e);
  167. return false;
  168. }
  169. }
  170. /**
  171. * 组装删除操作
  172. * @param indexName 索引名称
  173. * @param ids id列表
  174. * @return
  175. */
  176. private BulkRequest packBulkDeleteRequest(String indexName, List<String> ids) {
  177. BulkRequest bulkRequest = new BulkRequest();
  178. bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  179. ids.forEach(id -> {
  180. DeleteRequest deleteRequest = new DeleteRequest(indexName);
  181. deleteRequest.id(id);
  182. bulkRequest.add(deleteRequest);
  183. });
  184. return bulkRequest;
  185. }
  186. /**
  187. * 批量修改ES
  188. * @param indexName 索引名称
  189. * @param jsonDataList json数据列表
  190. * @return
  191. */
  192. public boolean updateDataBatch(String indexName, String jsonDataList) {
  193. //判空
  194. if (StringUtils.isBlank(jsonDataList)) {
  195. return false;
  196. }
  197. BulkRequest bulkRequest = packBulkUpdateRequest(indexName, jsonDataList);
  198. if (bulkRequest.requests().isEmpty()) {
  199. return false;
  200. }
  201. try {
  202. //同步执行
  203. BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  204. if (bulk.hasFailures()) {
  205. for (BulkItemResponse item : bulk.getItems()) {
  206. log.error("索引【{}】,主键[{}]修改操作失败,状态为【{}】,错误信息:{}",indexName,item.status(),item.getFailureMessage());
  207. }
  208. return false;
  209. }
  210. //记录索引新增与修改数量
  211. Integer createCount = 0;
  212. Integer updateCount = 0;
  213. for (BulkItemResponse item : bulk.getItems()) {
  214. if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult())) {
  215. createCount++;
  216. } else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult())) {
  217. updateCount++;
  218. }
  219. }
  220. log.info("索引【{}】批量修改更新成功,共新增[{}]个,修改[{}]个",indexName,createCount,updateCount);
  221. } catch (IOException e) {
  222. e.printStackTrace();
  223. log.error("索引【{}】批量修改更新出现异常",indexName);
  224. return false;
  225. }
  226. return true;
  227. }
  228. /**
  229. * 组装bulkUpdate
  230. * @param indexName 索引名称
  231. * @param jsonDataList 数据列表
  232. * @return
  233. */
  234. private BulkRequest packBulkUpdateRequest(String indexName,String jsonDataList) {
  235. BulkRequest bulkRequest = new BulkRequest();
  236. bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  237. JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
  238. if (jsonArray.isEmpty()) {
  239. return bulkRequest;
  240. }
  241. jsonArray.forEach(o -> {
  242. Map<String, String> map = (Map<String, String>) o;
  243. UpdateRequest updateRequest = new UpdateRequest(indexName,map.get(PRIMARY_KEY_NAME));
  244. // 修改索引中不存在就新增
  245. updateRequest.docAsUpsert(true);
  246. updateRequest.doc(JSON.toJSONString(o),XContentType.JSON);
  247. bulkRequest.add(updateRequest);
  248. });
  249. return bulkRequest;
  250. }
  251. /**
  252. * 删除索引、新建索引
  253. * @param indexName 索引名称
  254. * @param indexAlias 别名
  255. * @param clazz 类型
  256. * @return
  257. * @throws IOException
  258. */
  259. private boolean syncIndex(String indexName, String indexAlias, Class clazz) throws IOException {
  260. //1.删除索引
  261. boolean deleteAllFlag = deleteIndex(indexName);
  262. if (!deleteAllFlag) {
  263. return true;
  264. }
  265. //2.创建索引
  266. boolean createIndexFlag = createIndex(indexName, indexAlias, clazz);
  267. if (!createIndexFlag) {
  268. return true;
  269. }
  270. return false;
  271. }
  272. /**
  273. * 根据主键查询ES
  274. * @param indexName 索引名称
  275. * @param id 主键
  276. * @return
  277. */
  278. public String getDataById(String indexName,String id) {
  279. //判断索引是否存在
  280. //1.判断索引是否存在
  281. boolean result = isIndexExists(indexName);
  282. if (!result) {
  283. return "";
  284. }
  285. GetRequest getRequest = new GetRequest(indexName, id);
  286. try {
  287. GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
  288. String resultJson = getResponse.getSourceAsString();
  289. log.info("索引【{}】主键【{}】,查询结果:【{}】",indexName,id,resultJson);
  290. return resultJson;
  291. } catch (IOException e) {
  292. e.printStackTrace();
  293. log.error("索引【{}】主键[{}],查询异常:{}",indexName,id,e);
  294. return "";
  295. }
  296. }
  297. /**
  298. * 清空索引内容
  299. * @param indexName 索引名称
  300. * @return
  301. */
  302. public boolean deleteAll(String indexName) {
  303. //1.判断索引是否存在
  304. boolean result = isIndexExists(indexName);
  305. if (!result) {
  306. log.error("索引【{}】不存在,删除失败",indexName);
  307. return false;
  308. }
  309. DeleteRequest deleteRequest = new DeleteRequest(indexName);
  310. deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  311. try {
  312. DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
  313. if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) {
  314. log.error("索引【{}】删除失败",indexName);
  315. return false;
  316. }
  317. log.info("索引【{}】删除成功",indexName);
  318. return true;
  319. } catch (IOException e) {
  320. e.printStackTrace();
  321. log.error("删除索引[{}],出现异常[{}]",indexName,e);
  322. return false;
  323. }
  324. }
  325. /**
  326. * 批量数据保存到ES-异步
  327. * @param indexName 索引名称
  328. * @param indexAlias 索引别名
  329. * @param jsonDataList 数据列表
  330. * @param clazz 类型
  331. * @return
  332. * @throws IOException
  333. */
  334. public boolean saveDataBatchSync(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException {
  335. //判空
  336. if (StringUtils.isBlank(jsonDataList)) {
  337. return false;
  338. }
  339. if (syncIndex(indexName, indexAlias, clazz)) {
  340. return false;
  341. }
  342. //3.批量操作Request
  343. BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
  344. if (bulkRequest.requests().isEmpty()) {
  345. return false;
  346. }
  347. //4.异步执行
  348. ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
  349. @Override
  350. public void onResponse(BulkResponse bulkItemResponses) {
  351. if (bulkItemResponses.hasFailures()) {
  352. for (BulkItemResponse item : bulkItemResponses.getItems()) {
  353. log.error("索引【{}】,主键【{}】更新失败,状态【{}】,错误信息:{}",indexName,item.getId(),
  354. item.status(),item.getFailureMessage());
  355. }
  356. }
  357. }
  358. //失败操作
  359. @Override
  360. public void onFailure(Exception e) {
  361. log.error("索引【{}】批量异步更新出现异常:{}",indexName,e);
  362. }
  363. };
  364. restHighLevelClient.bulkAsync(bulkRequest,RequestOptions.DEFAULT,listener);
  365. log.info("索引批量更新索引【{}】中",indexName);
  366. return true;
  367. }
  368. /**
  369. * 删除索引
  370. * @param indexName 索引名称
  371. * @return
  372. * @throws IOException
  373. */
  374. public boolean deleteIndex(String indexName) throws IOException {
  375. //1.判断索引是否存在
  376. boolean result = isIndexExists(indexName);
  377. if (!result) {
  378. log.error("索引【{}】不存在,删除失败",indexName);
  379. return false;
  380. }
  381. //2.删除操作Request
  382. DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
  383. deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
  384. AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
  385. if (!acknowledgedResponse.isAcknowledged()) {
  386. log.error("索引【{}】删除失败",indexName);
  387. return false;
  388. }
  389. log.info("索引【{}】删除成功",indexName);
  390. return true;
  391. }
  392. /**
  393. * 批量操作的Request
  394. * @param indexName 索引名称
  395. * @param jsonDataList json数据列表
  396. * @return
  397. */
  398. private BulkRequest packBulkIndexRequest(String indexName,String jsonDataList) {
  399. BulkRequest bulkRequest = new BulkRequest();
  400. //IMMEDIATE > 请求向es提交数据,立即进行数据刷新<实时性高,资源消耗高>
  401. //WAIT_UNTIL > 请求向es提交数据,等待数据完成刷新<实时性高,资源消耗低>
  402. //NONE > 默认策略<实时性低>
  403. bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  404. JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
  405. if (jsonArray.isEmpty()) {
  406. return bulkRequest;
  407. }
  408. //循环数据封装bulkRequest
  409. jsonArray.forEach(obj ->{
  410. final Map<String, String> map = (Map<String, String>) obj;
  411. IndexRequest indexRequest = new IndexRequest(indexName);
  412. indexRequest.source(JSON.toJSONString(obj),XContentType.JSON);
  413. indexRequest.id(map.get(PRIMARY_KEY_NAME));
  414. bulkRequest.add(indexRequest);
  415. });
  416. return bulkRequest;
  417. }
  418. /**
  419. * 创建索引
  420. * @param indexName 索引名称
  421. * @param indexAlias 别名
  422. * @param clazz 类型
  423. * @return
  424. * @throws IOException
  425. */
  426. public boolean createIndex(String indexName,String indexAlias,Class clazz) throws IOException {
  427. //判断索引是否存在
  428. boolean result = isIndexExists(indexName);
  429. if (!result) {
  430. boolean createResult = createIndexAndCreateMapping(indexName,indexAlias, FieldMappingUtil.getFieldInfo(clazz));
  431. if (!createResult) {
  432. log.info("索引【{}】创建失败",indexName);
  433. return false;
  434. }
  435. }
  436. log.info("索引:[{}]创建成功",indexName);
  437. return true;
  438. }
  439. /**
  440. * 数据同步到ES
  441. * @param id 主键
  442. * @param indexName 索引名称
  443. * @param jsonData json数据
  444. * @param clazz 类型
  445. * @return
  446. */
  447. public boolean saveData(String id,String indexName,String indexAlias,String jsonData,Class clazz) throws IOException {
  448. //1.创建索引
  449. boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
  450. if (!createIndexFlag) {
  451. return false;
  452. }
  453. //2.创建操作Request
  454. IndexRequest indexRequest = new IndexRequest(indexName);
  455. //3.配置相关信息
  456. indexRequest.source(jsonData, XContentType.JSON);
  457. //IMMEDIATE > 立即刷新
  458. indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  459. indexRequest.id(id);
  460. IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  461. //4.判断索引是新增还是修改
  462. if (IndexResponse.Result.CREATED.equals(response.getResult())) {
  463. log.info("索引【{}】保存成功",indexName);
  464. return true;
  465. } else if (IndexResponse.Result.UPDATED.equals(response.getResult())) {
  466. log.info("索引【{}】修改成功",indexName);
  467. return true;
  468. }
  469. return false;
  470. }
  471. /**
  472. * 判断索引是否存在
  473. * @param indexName 索引名称
  474. * @return
  475. */
  476. public boolean isIndexExists(String indexName) {
  477. try {
  478. GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
  479. return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
  480. }catch (Exception e) {
  481. e.printStackTrace();
  482. }
  483. return false;
  484. }
  485. /**
  486. * 创建索引设置相关配置信息
  487. * @param indexName 索引名称
  488. * @param indexAlias 索引别名
  489. * @param fieldMappingList 数据列表
  490. * @return
  491. * @throws IOException
  492. */
  493. private boolean createIndexAndCreateMapping(String indexName,String indexAlias, List<FieldMapping> fieldMappingList) throws IOException {
  494. //封装es索引的mapping
  495. XContentBuilder mapping = packEsMapping(fieldMappingList, null);
  496. mapping.endObject().endObject();
  497. mapping.close();
  498. //进行索引的创建
  499. CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
  500. //配置分词器
  501. XContentBuilder settings = packSettingMapping();
  502. XContentBuilder aliases = packEsAliases(indexAlias);
  503. log.info("索引配置脚本:{}",settings);
  504. log.info("索引字段内容:{}",mapping);
  505. createIndexRequest.settings(settings);
  506. createIndexRequest.mapping("_doc", mapping);
  507. createIndexRequest.aliases(aliases);
  508. //同步方式创建索引
  509. CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest,RequestOptions.DEFAULT);
  510. boolean acknowledged = createIndexResponse.isAcknowledged();
  511. if (acknowledged) {
  512. log.info("索引:{}创建成功", indexName);
  513. return true;
  514. } else {
  515. log.error("索引:{}创建失败", indexName);
  516. return false;
  517. }
  518. }
  519. /**
  520. * 配置ES别名
  521. * @author Kou Shenhai
  522. * @param alias 别名
  523. * @return
  524. * @throws IOException
  525. */
  526. private XContentBuilder packEsAliases(String alias) throws IOException{
  527. XContentBuilder aliases = XContentFactory.jsonBuilder().startObject()
  528. .startObject(alias).endObject();
  529. aliases.endObject();
  530. aliases.close();
  531. return aliases;
  532. }
  533. /**
  534. * 配置Mapping
  535. * @param fieldMappingList 组装的实体类信息
  536. * @param mapping
  537. * @return
  538. * @throws IOException
  539. */
  540. private XContentBuilder packEsMapping(List<FieldMapping> fieldMappingList,XContentBuilder mapping) throws IOException {
  541. if (mapping == null) {
  542. //如果对象是空,首次进入,设置开始结点
  543. mapping = XContentFactory.jsonBuilder().startObject()
  544. .field("dynamic",true)
  545. .startObject("properties");
  546. }
  547. //循环实体对象的类型集合封装ES的Mapping
  548. for (FieldMapping fieldMapping : fieldMappingList) {
  549. String field = fieldMapping.getField();
  550. String dataType = fieldMapping.getType();
  551. Integer participle = fieldMapping.getParticiple();
  552. //设置分词规则
  553. if (Constant.NOT_ANALYZED.equals(participle)) {
  554. if (FieldTypeEnum.DATE.getValue().equals(dataType)) {
  555. mapping.startObject(field)
  556. .field("type", dataType)
  557. .field("format","yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
  558. .endObject();
  559. } else {
  560. mapping.startObject(field)
  561. .field("type", dataType)
  562. .endObject();
  563. }
  564. } else if (Constant.IK_INDEX.equals(participle)) {
  565. mapping.startObject(field)
  566. .field("type",dataType)
  567. .field("eager_global_ordinals",true)
  568. //fielddata=true 用来解决text字段不能进行聚合操作
  569. .field("fielddata",true)
  570. .field("boost",100.0)
  571. .field("analyzer","ik-index-synonym")
  572. .field("search_analyzer","ik-search-synonym")
  573. .startObject("fields").startObject("data-pinyin")
  574. .field("term_vector", "with_positions_offsets")
  575. .field("analyzer","ik-search-pinyin")
  576. .field("type",dataType)
  577. .field("boost",100.0)
  578. .endObject().endObject()
  579. .endObject();
  580. }
  581. }
  582. return mapping;
  583. }
  584. /**
  585. * 配置Settings
  586. * @return
  587. * @throws IOException
  588. */
  589. private XContentBuilder packSettingMapping() throws IOException {
  590. XContentBuilder setting = XContentFactory.jsonBuilder().startObject()
  591. .startObject("index")
  592. .field("number_of_shards",5)
  593. .field("number_of_replicas",1)
  594. .field("refresh_interval","120s")
  595. .endObject()
  596. .startObject("analysis");
  597. //ik分词 同义词 拼音
  598. setting.startObject("analyzer")
  599. .startObject("ik-search-pinyin")
  600. .field("type","custom")
  601. .field("tokenizer","ik_smart")
  602. .field("char_filter",new String[] {"html_strip"})
  603. .field("filter", new String[]{"laokou-pinyin","word_delimiter","lowercase", "asciifolding"})
  604. .endObject();
  605. setting.startObject("ik-index-synonym")
  606. .field("type","custom")
  607. .field("tokenizer","ik_max_word")
  608. .field("char_filter",new String[] {"html_strip"})
  609. .field("filter", new String[]{"laokou-remote-synonym"})
  610. .endObject();
  611. setting.startObject("ik-search-synonym")
  612. .field("type","custom")
  613. .field("tokenizer","ik_smart")
  614. .field("char_filter",new String[] {"html_strip"})
  615. .field("filter", new String[]{"laokou-remote-synonym"})
  616. .endObject();
  617. setting.endObject();
  618. //设置拼音分词器 同义词分词
  619. setting.startObject("filter")
  620. .startObject("laokou-pinyin")
  621. .field("type", "pinyin")
  622. .field("keep_first_letter", false)
  623. .field("keep_separate_first_letter", false)
  624. .field("keep_full_pinyin", true)
  625. .field("keep_original", false)
  626. .field("keep_joined_full_pinyin",true)
  627. .field("limit_first_letter_length", 16)
  628. .field("lowercase", true)
  629. .field("remove_duplicated_term", true)
  630. .endObject()
  631. .startObject("laokou-remote-synonym")
  632. .field("type","dynamic_synonym")
  633. .field("synonyms_path", synonymPath)
  634. .field("interval",120)
  635. .field("dynamic_reload",true)
  636. .endObject()
  637. .endObject();
  638. setting.endObject().endObject();
  639. setting.close();
  640. return setting;
  641. }
  642. }

问题思考:比如说,我有几条记录,文章记录,聊天记录,订单记录,它们是不同的索引,需要单独建立索引,怎么根据不同的数据类型来创建不同的索引?你会怎么做?

六、索引管理工具类

  1. /**
  2. * 索引管理
  3. * @author Kou Shenhai 2413176044@leimingtech.com
  4. * @version 1.0
  5. * @date 2021/10/31 0031 上午 10:11
  6. */
  7. public class FieldUtil {
  8. public static final String MESSAGE_INDEX = "message";
  9. private static final Map<String,Class<?>> classMap = new HashMap<>(16);
  10. static {
  11. classMap.put(FieldUtil.MESSAGE_INDEX, MessageIndex.class);
  12. }
  13. public static Class<?> getClazz(final String indexName) {
  14. return classMap.getOrDefault(indexName,Object.class);
  15. }
  16. }

七、测试es

  1. /**
  2. * Elasticsearch API 服务
  3. * @author Kou Shenhai 2413176044@leimingtech.com
  4. * @version 1.0
  5. * @date 2021/2/8 0008 下午 6:33
  6. */
  7. @RestController
  8. @RequestMapping("/api")
  9. @Api(tags = "Elasticsearch API 服务")
  10. public class ElasticsearchController {
  11. @Autowired
  12. private ElasticsearchUtil elasticsearchUtil;
  13. @PostMapping("/sync")
  14. @ApiOperation("同步数据到ES")
  15. @CrossOrigin
  16. public void syncIndex(@RequestBody final ElasticsearchModel model) throws IOException {
  17. String id = model.getId();
  18. String indexName = model.getIndexName();
  19. String indexAlias = model.getIndexAlias();
  20. String jsonData = model.getData();
  21. Class<?> clazz = FieldUtil.getClazz(indexAlias);
  22. elasticsearchUtil.saveData(id,indexName,indexAlias,jsonData,clazz);
  23. }
  24. @PostMapping("/batchSync")
  25. @ApiOperation("批量数据保存到ES-异步")
  26. @CrossOrigin
  27. public void batchSyncIndex(@RequestBody final ElasticsearchModel model) throws IOException {
  28. String indexName = model.getIndexName();
  29. String indexAlias = model.getIndexAlias();
  30. String jsonDataList = model.getData();
  31. Class<?> clazz = FieldUtil.getClazz(indexAlias);
  32. elasticsearchUtil.saveDataBatchSync(indexName,indexAlias,jsonDataList,clazz);
  33. }
  34. @PostMapping("/batch")
  35. @ApiOperation("批量同步数据到ES")
  36. @CrossOrigin
  37. public void saveBatchIndex(@RequestBody final ElasticsearchModel model) throws IOException {
  38. String indexName = model.getIndexName();
  39. String indexAlias = model.getIndexAlias();
  40. String jsonDataList = model.getData();
  41. Class<?> clazz = FieldUtil.getClazz(indexAlias);
  42. elasticsearchUtil.saveDataBatch(indexName,indexAlias,jsonDataList,clazz);
  43. }
  44. @GetMapping("/get")
  45. @ApiOperation("根据主键获取ES")
  46. @CrossOrigin
  47. @ApiImplicitParams({
  48. @ApiImplicitParam(name = "indexName",value = "索引名称",required = true,paramType = "query",dataType = "String"),
  49. @ApiImplicitParam(name = "id",value = "主键",required = true,paramType = "query",dataType = "String")
  50. })
  51. public HttpResultUtil<String> getDataById(@RequestParam("indexName")String indexName,@RequestParam("id")String id) {
  52. return new HttpResultUtil<String>().ok(elasticsearchUtil.getDataById(indexName,id));
  53. }
  54. @PutMapping("/batch")
  55. @ApiOperation("批量修改ES")
  56. @CrossOrigin
  57. public void updateDataBatch(@RequestBody final ElasticsearchModel model) throws IOException {
  58. String indexName = model.getIndexName();
  59. String indexAlias = model.getIndexAlias();
  60. String jsonDataList = model.getData();
  61. Class<?> clazz = FieldUtil.getClazz(indexAlias);
  62. elasticsearchUtil.updateDataBatch(indexName,indexAlias,jsonDataList,clazz);
  63. }
  64. @PutMapping("/sync")
  65. @ApiOperation("同步修改ES")
  66. @CrossOrigin
  67. public void updateData(@RequestBody final ElasticsearchModel model) {
  68. String id = model.getId();
  69. String indexName = model.getIndexName();
  70. String paramJson = model.getData();
  71. elasticsearchUtil.updateData(indexName,id,paramJson);
  72. }
  73. @DeleteMapping("/batch")
  74. @ApiOperation("批量删除ES")
  75. @CrossOrigin
  76. public void deleteDataBatch(@RequestParam("indexName")String indexName,@RequestParam("ids")List<String> ids) {
  77. elasticsearchUtil.deleteDataBatch(indexName,ids);
  78. }
  79. @DeleteMapping("/sync")
  80. @ApiOperation("同步删除ES")
  81. @CrossOrigin
  82. public void deleteData(@RequestParam("indexName")String indexName,@RequestParam("id")String id) {
  83. elasticsearchUtil.deleteData(indexName,id);
  84. }
  85. @DeleteMapping("/all")
  86. @ApiOperation("清空ES")
  87. @CrossOrigin
  88. public void deleteAll(@RequestParam("indexName")String indexName) {
  89. elasticsearchUtil.deleteAll(indexName);
  90. }
  91. }

大功告成

补充:可根据自己的业务进行数据分区


本文转载自: https://blog.csdn.net/qq_39893313/article/details/123013305
版权归原作者 k↑ 所有, 如有侵权,请联系我们删除。

“springboot 2.0集成elasticsearch 7.6.2(集群)”的评论:

还没有评论