package com.artfess.es.unit; import com.artfess.base.util.JsonUtil; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.ScoreSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilders; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; /** * @author Administrator */ @Service public class ElasticSearchApplicationImpl implements ElasticSearchApplication { private static final Logger log = LoggerFactory.getLogger(ElasticSearchApplicationImpl.class); private final RestHighLevelClient restHighLevelClient; @Autowired public ElasticSearchApplicationImpl(RestHighLevelClient restHighLevelClient) { this.restHighLevelClient = restHighLevelClient; } @Override public boolean existsIndex(String indexName, String id) { if(StringUtils.isEmpty(indexName) || StringUtils.isEmpty(id)){ return false; } GetRequest getRequest = new GetRequest(indexName,id); try { return restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("检查id:{}是否存在于indexName:{}出现异常", id,indexName, e); return false; } } @Override public boolean insertDocument(String indexName, String id, T t) { IndexRequest indexRequest = new IndexRequest(indexName); //是否创建index,如果index已存在,有了这句话,将报错 // indexRequest.create(true); //如果有id就已现存的id值进行索引,如果对应的id在索引库中已存在,将进行修改数据 if(!(StringUtils.isEmpty(id))){ indexRequest.id(id); } indexRequest.source(JsonUtil.toJsonString(t), XContentType.JSON); try { restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); return true; } catch (IOException e) { log.error("索引数据至[{}]失败,对象至:{}" ,indexName, JsonUtil.toJsonString(t), e); return false; } } @Override public boolean insertDocument(String indexName, List list) throws IOException { BulkRequest bulkRequest = new BulkRequest(); String id; //后续的es版本值支持json与SMILE for (T item : list) { ObjectMapper mapper = JsonUtil.getMapper(); JsonNode json = JsonUtil.toJsonNode(item); String s = mapper.writeValueAsString(item); id = json.get("id").asText(); if(StringUtils.isEmpty(id)){ bulkRequest.add( new IndexRequest(indexName) .source(json, XContentType.JSON)); }else{ bulkRequest.add( new IndexRequest(indexName).id(id) .source(json, XContentType.JSON)); } } try { restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); return true; } catch (IOException e) { log.error("批量索引数据至[{}]失败" ,indexName, e); return false; } } @Override public boolean insertDocument(String indexName, String pipeLine, List list) throws IOException { BulkRequest bulkRequest = new BulkRequest(); String id; //后续的es版本值支持json与SMILE for (T item : list) { ObjectMapper mapper = JsonUtil.getMapper(); JsonNode json = JsonUtil.toJsonNode(item); String s = mapper.writeValueAsString(item); id = json.get("id").asText(); if(StringUtils.isEmpty(id)){ bulkRequest.add( new IndexRequest(indexName).setPipeline(pipeLine) .source(json, XContentType.JSON)); }else{ bulkRequest.add( new IndexRequest(indexName).id(id).setPipeline(pipeLine) .source(json, XContentType.JSON)); } } try { restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); return true; } catch (IOException e) { log.error("批量索引数据至[{}]失败" ,indexName, e); return false; } } @Override public void updateById(String indexName, String id, Map map) { UpdateRequest updateRequest = new UpdateRequest(indexName,id); updateRequest.doc(map); try { restHighLevelClient.update(updateRequest,RequestOptions.DEFAULT); } catch (IOException e) { log.error("数据修改出现异常,id:{}", id, e); } } @Override public void updateById(String indexName, List ids, Map map) { BulkRequest bulkRequest = new BulkRequest(); for (String id:ids) { bulkRequest.add(new UpdateRequest(indexName,id).doc(map)); } try { restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT); } catch (IOException e) { log.error("数据批量修改出现异常!"); } } @Override public Long updateByQueryBuilder(String indexName, QueryBuilder queryBuilder, Map map) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); updateByQueryRequest.setQuery(queryBuilder); if(map.isEmpty()){ return 0L; } SimpleDateFormat sm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Iterator iterator = map.keySet().iterator(); StringBuilder str = new StringBuilder(); String key; Object value; while (iterator.hasNext()){ key = iterator.next(); value = map.get(key); if(value != null && value instanceof Date){ str.append(";ctx._source."+key + "='"+ sm.format(map.get(key))+"'"); }else{ str.append(";ctx._source."+key + "='"+map.get(key)+"'"); } } updateByQueryRequest.setScript(new Script(str.substring(1))); try { BulkByScrollResponse re = restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); return re.getDeleted(); } catch (IOException e) { log.error("数据删除出现异常!"); return 0L; } } @Override public void updateByQueryBuilderOfAsync(String indexName, QueryBuilder queryBuilder, Map map, ActionListener actionListener) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); updateByQueryRequest.setQuery(queryBuilder); if(map.isEmpty()){ return ; } SimpleDateFormat sm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Iterator iterator = map.keySet().iterator(); StringBuilder str = new StringBuilder(); String key; Object value; while (iterator.hasNext()){ key = iterator.next(); value = map.get(key); if(value != null && value instanceof Date){ str.append(";ctx._source."+key + "='"+ sm.format(map.get(key))+"'"); }else{ str.append(";ctx._source."+key + "='"+map.get(key)+"'"); } } updateByQueryRequest.setScript(new Script(str.substring(1))); restHighLevelClient.updateByQueryAsync(updateByQueryRequest,RequestOptions.DEFAULT,actionListener); } @Override public void delById(String indexName, String id) { DeleteRequest deleteRequest = new DeleteRequest(indexName,id); try { restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("数据删除出现异常,id:{}", id, e); } } @Override public void delByIds(String indexName, List ids) { BulkRequest bulkRequest = new BulkRequest(); for (String id : ids ) { bulkRequest.add(new DeleteRequest(indexName,id)); } try { restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT); } catch (IOException e) { log.error("数据批量删除出现异常,id:{}", String.join(",",ids), e); } } @Override public Long delByQuery(String indexName, QueryBuilder queryBuilder) { DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName); //设置查询条件 deleteByQueryRequest.setQuery(queryBuilder); //设置版本冲突时继续执行,默认为终止 deleteByQueryRequest.setConflicts("proceed"); try { BulkByScrollResponse bulkByScrollResponse = restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); return bulkByScrollResponse.getDeleted(); } catch (IOException e) { log.error("数据批量删除出现异常!", e); return 0L; } } @Override public T queryById(String indexName, String id, Class c) { GetRequest getRequest = new GetRequest(indexName,id); try { GetResponse getResponse = restHighLevelClient.get(getRequest,RequestOptions.DEFAULT); if(getResponse.isSourceEmpty()){ return null; } getResponse.getSourceAsMap().put("id",id); return JsonUtil.getMapper().readValue(getResponse.getSourceAsString(),c); } catch (IOException e) { log.error("数据查询出现异常,id:{}", id, e); return null; } } @Override public GetResponse queryGetResponseById(String indexName, String id) { GetRequest getRequest = new GetRequest(indexName,id); try { return restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("数据查询出现异常,id:{}", id, e); return null; } } @Override public Map queryMapById(String indexName, String id) { GetRequest getRequest = new GetRequest(indexName,id); try { GetResponse getResponse = restHighLevelClient.get(getRequest,RequestOptions.DEFAULT); getResponse.getSourceAsMap().put("id",id); return getResponse.getSourceAsMap(); } catch (IOException e) { log.error("数据查询出现异常,id:{}", id, e); return null; } } @Override public List> queryDataByQueryBuilder(String indexName, QueryBuilder queryBuilder) { SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); searchRequest.source(searchSourceBuilder.query(queryBuilder)); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); if(hits.length == 0){ return null; } List> list = new ArrayList<>(); for (SearchHit hit : hits) { list.add(hit.getSourceAsMap()); } return list; } catch (IOException e) { log.error("数据查询出现异常!"); return null; } } @Override public List> queryDataByQueryBuilder(String indexName, BoolQueryBuilder queryBuilder, SortOrder sortOrder, List filedName) { SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); searchRequest.source(searchSourceBuilder.query(queryBuilder)); //根据用户指定的字段以及排序规则进行排序 if(filedName != null){ for (String fieldName:filedName) { searchSourceBuilder.sort(new FieldSortBuilder(fieldName).order(sortOrder)); } } try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); if(hits.length == 0){ return null; } List> list = new ArrayList<>(); for (SearchHit hit : hits) { list.add(hit.getSourceAsMap()); } return list; } catch (IOException e) { log.error("数据查询出现异常!"); return null; } } /** * 查询指定索引库中符合条件的数据 分页 * @param indexName 索引名称 * @param queryBuilder 数据查询条件构造 * @param pageNum 页号 * @param pageSize 页大小 * @return map对象——数据与总页数 */ @Override public Map queryDataByQueryBuilder(String indexName,QueryBuilder queryBuilder, Integer pageNum, Integer pageSize) { pageNum = pageNum -1; if (pageNum <= 0){ pageNum = 0; } SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //分页 searchSourceBuilder.from(pageNum*pageSize); searchSourceBuilder.size(pageSize); searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); searchRequest.source(searchSourceBuilder.query(queryBuilder)); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); Map map = new HashMap<>(); //总数 long total = searchResponse.getHits().getTotalHits().value; long pages = total % pageSize == 0 ? (total / pageSize) : (total / pageSize + 1); // // if(hits.length == 0){ // Map sourceAsMap = new HashMap<>(); // // return sourceAsMap; // } List> list = new ArrayList<>(); for (SearchHit hit : hits) { Map sourceAsMap = hit.getSourceAsMap(); sourceAsMap.put("id",hit.getId()); list.add(sourceAsMap); } map.put("current",pageNum+1); map.put("size",pageSize); map.put("pages",pages); map.put("records",list); map.put("total",total); return map; } catch (IOException e) { log.error("数据查询出现异常!"); return null; } } /** * 查询指定索引库中符合条件的数据 分页 * @param indexName 索引名称 * @param queryBuilder 数据查询条件构造 * @param pageNum 页号 * @param pageSize 页大小 * @param order 排序 * @return map对象——数据与总页数 */ @Override public Map queryDataByQueryBuilder(String indexName, QueryBuilder queryBuilder,String name, SortOrder order, Integer pageNum, Integer pageSize) { pageNum = pageNum -1; if (pageNum <= 0){ pageNum = 0; } SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //分页 searchSourceBuilder.from(pageNum*pageSize); searchSourceBuilder.size(pageSize); searchSourceBuilder.sort(name,order); searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); searchRequest.source(searchSourceBuilder.query(queryBuilder)); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); Map map = new HashMap<>(); //总数 long total = searchResponse.getHits().getTotalHits().value; long pages = total % pageSize == 0 ? (total / pageSize) : (total / pageSize + 1); List> list = new ArrayList<>(); for (SearchHit hit : hits) { Map sourceAsMap = hit.getSourceAsMap(); sourceAsMap.put("id",hit.getId()); list.add(sourceAsMap); } map.put("current",pageNum+1); map.put("size",pageSize); map.put("pages",pages); map.put("records",list); map.put("total",total); return map; } catch (IOException e) { log.error("数据查询出现异常!"); e.printStackTrace(); return null; } } public Map> getTermsAggTwoLevel(QueryBuilder queryBuilder, String field1, String field2, String index) throws IOException { Map> groupMap = new HashMap<>(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(queryBuilder); searchSourceBuilder.size(0); AggregationBuilder agg1 = AggregationBuilders.terms("agg1").field(field1); AggregationBuilder agg2 = AggregationBuilders.terms("agg2").field(field2); agg1.subAggregation(agg2); searchSourceBuilder.aggregation(agg1); SearchRequest searchRequest = new SearchRequest(index); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Terms terms1 = searchResponse.getAggregations().get("agg1"); Terms terms2; for (Terms.Bucket bucket1 : terms1.getBuckets()) { terms2 = bucket1.getAggregations().get("agg2"); Map map2 = new HashMap<>(); for (Terms.Bucket bucket2 : terms2.getBuckets()) { map2.put(bucket2.getKey().toString(), bucket2.getDocCount()); } groupMap.put(bucket1.getKey().toString(), map2); } return groupMap; } @Override public Map queryForFullText(String indexName, SearchSourceBuilder searchSourceBuilder, int pageNum, int pageSize, List fieldNameForSort, SortOrder sortOrder, String[] heightFields) { Map map = new HashMap<>(); List> list = new ArrayList<>(); SearchRequest searchRequest = new SearchRequest(indexName); if(searchSourceBuilder == null){ searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); } //分页 if(pageSize != 0){ searchSourceBuilder.from(pageNum * pageSize); searchSourceBuilder.size(pageSize); } //排序 searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //根据用户指定的字段以及排序规则进行排序 if(fieldNameForSort != null){ for (String fieldName:fieldNameForSort) { searchSourceBuilder.sort(new FieldSortBuilder(fieldName).order(sortOrder)); } } boolean flag = false; //设置高亮显示 if(heightFields != null && heightFields.length != 0){ flag = true; HighlightBuilder highlightBuilder = new HighlightBuilder(); for (String str:heightFields) { if(!StringUtils.isEmpty(str)){ /* //设置高亮字段 HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("love"); //设置高亮显示器 / Elasticsearch支持三种highlight,默认unified // unified,plain和fvh(fast vector highlighter) highlightTitle.highlighterType("unified"); //突出显示的片段的大小(以字符为单位)默认为100 highlightTitle.fragmentSize(100); //要返回的最大片段数。 // 如果片段数设置为0,则不返回任何片段。而是突出显示并返回整个字段内容。 // 如果number_of_fragments 为0,fragment_size则忽略。 默认为5 highlightTitle.numOfFragments(5); highlightBuilder.field(highlightTitle); //设置边界字符的举例(默认的边界字符包括 . , ! ? \t \n) highlightBuilder.boundaryMaxScan(20);*/ //与上面注释代码的含义相同 highlightBuilder.boundaryMaxScan(50); highlightBuilder.field(str,400).numOfFragments(1); } } //设置突出显示的html标签,默认为 highlightBuilder.preTags(""); highlightBuilder.postTags(""); searchSourceBuilder.highlighter(highlightBuilder); } searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); Map mapTmep; for (SearchHit hit : hits.getHits()) { mapTmep = hit.getSourceAsMap(); mapTmep.put("id",hit.getId()); mapTmep.put("score",hit.getScore()); if(flag){ Map highlightFields = hit.getHighlightFields(); for (String str:heightFields) { if(highlightFields.containsKey(str)){ HighlightField highlight = highlightFields.get(str); Text[] fragments = highlight.fragments(); StringBuilder fragmentString = new StringBuilder(); for (Text text:fragments) { fragmentString.append(text.toString() + "
"); } mapTmep.put(str,fragmentString.toString()); } } } list.add(mapTmep); } //总数 long total = searchResponse.getHits().getTotalHits().value; long pages = total % pageSize == 0 ? (total / pageSize) : (total / pageSize + 1); map.put("current",pageNum+1); map.put("size",pageSize); map.put("pages",pages); map.put("records",list); map.put("total", total); map.put("tookTime",searchResponse.getTook().getMillis()); } catch (IOException e) { log.error("数据检索出现异常", e); return null; } return map; } public SearchResponse queryForAggregation(String indexName, QueryBuilder queryBuilder, AggregationBuilder aggregationBuilder) throws IOException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(queryBuilder); //根据用户指定的字段以及排序规则进行排序 // if(fieldNameForSort != null){ // for (String fieldName:fieldNameForSort) { // searchSourceBuilder.sort(new FieldSortBuilder(fieldName).order(order)); // } // } searchSourceBuilder.aggregation(aggregationBuilder); searchSourceBuilder.size(0); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = this.restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); return searchResponse; } @Override public Long countByQueryBuilder(String indexName, QueryBuilder queryBuilder) { CountRequest countRequest = new CountRequest(indexName); countRequest.query(queryBuilder); try { CountResponse countResponse = this.restHighLevelClient.count(countRequest,RequestOptions.DEFAULT); return countResponse.getCount(); } catch (IOException e) { log.error("数据统计错误!"); } return null; } /** * 推荐候选词搜索 * @param idxName * @param builder * @return */ @Override public List searchCompletionSuggest(String idxName, SearchSourceBuilder builder) { SearchRequest request = new SearchRequest(idxName); request.source(builder); try { SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); List list = StreamSupport.stream(Spliterators.spliteratorUnknownSize(response.getSuggest().iterator(), Spliterator.ORDERED), false) .flatMap(suggestion -> suggestion.getEntries().get(0).getOptions().stream()) .map((Suggest.Suggestion.Entry.Option option) -> option.getText().toString()) .collect(Collectors.toList()); return list; } catch (Exception e) { throw new RuntimeException(e); } } public List getSuggestSearch(String keyword,String indexName) { //field的名字,前缀(输入的text),以及大小size CompletionSuggestionBuilder suggestionBuilderDistrict = SuggestBuilders.completionSuggestion("配置了type=completion的字段名") .prefix(keyword).size(3); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("suggestDistrict", suggestionBuilderDistrict);//添加suggest SearchRequest searchRequest = new SearchRequest(indexName); //设置查询builder的index,type,以及建议 SearchRequestBuilder requestBuilder = null; SearchResponse response = requestBuilder.get(); Suggest suggest = response.getSuggest();//suggest实体 Set suggestSet = new HashSet<>();//set int maxSuggest = 0; if (suggest!=null){ Suggest.Suggestion result = suggest.getSuggestion("suggestDistrict");//获取suggest,name任意string for (Object term : result.getEntries()) { if (term instanceof CompletionSuggestion.Entry){ CompletionSuggestion.Entry item = (CompletionSuggestion.Entry) term; if (!item.getOptions().isEmpty()){ //若item的option不为空,循环遍历 for (CompletionSuggestion.Entry.Option option : item.getOptions()) { String tip = option.getText().toString(); if (!suggestSet.contains(tip)){ suggestSet.add(tip); ++maxSuggest; } } } } if (maxSuggest>=5){ break; } } } List suggests = Arrays.asList(suggestSet.toArray(new String[]{})); return suggests; } }