package org.jetlinks.community.elastic.search.service.reactive;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.utils.StringUtils;
import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.MemoryUsage;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.utils.SerializeUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.DependsOn;
import org.springframework.web.reactive.function.client.WebClientException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@DependsOn({"reactiveElasticsearchClient"})
@ConfigurationProperties(prefix = "elasticsearch")
/* loaded from: input_file:org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.class */
public class ReactiveElasticSearchService implements ElasticSearchService {
    private final ReactiveElasticsearchClient restClient;
    private final ElasticSearchIndexManager indexManager;
    private PersistenceBuffer<Buffer> writer;
    private ElasticSearchBufferProperties buffer;
    private static final Logger log = LoggerFactory.getLogger(ReactiveElasticSearchService.class);
    public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(true, true, false, false);

    /* loaded from: input_file:org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService$Buffer.class */
    public static class Buffer implements Externalizable, MemoryUsage {
        private static final long serialVersionUID = 1;
        String index;
        String id;
        byte[] payload;

        public static Buffer of(String str, Object obj) {
            Buffer buffer = new Buffer();
            buffer.index = str;
            Map map = obj instanceof Map ? (Map) obj : (Map) FastBeanCopier.copy(obj, HashMap::new, new String[0]);
            Object obj2 = map.get("id");
            buffer.id = obj2 == null ? null : String.valueOf(obj2);
            buffer.payload = ObjectMappers.JSON_MAPPER.writeValueAsBytes(map);
            return buffer;
        }

        void release() {
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            objectOutput.writeUTF(this.index);
            SerializeUtils.writeNullableUTF(this.id, objectOutput);
            objectOutput.writeInt(this.payload.length);
            objectOutput.write(this.payload);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.index = objectInput.readUTF();
            this.id = SerializeUtils.readNullableUTF(objectInput);
            this.payload = new byte[objectInput.readInt()];
            objectInput.readFully(this.payload);
        }

        public int usage() {
            if (this.payload == null) {
                return 64;
            }
            return 64 + this.payload.length;
        }

        public String getIndex() {
            return this.index;
        }

        public String getId() {
            return this.id;
        }

        public byte[] getPayload() {
            return this.payload;
        }
    }

    /* loaded from: input_file:org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService$BufferConfig.class */
    public static class BufferConfig extends BufferProperties {
        private boolean refreshWhenWrite = false;

        public BufferConfig() {
            setFilePath("./data/elasticsearch-buffer");
            setSize(3000);
        }

        public boolean isRefreshWhenWrite() {
            return this.refreshWhenWrite;
        }

        public void setRefreshWhenWrite(boolean z) {
            this.refreshWhenWrite = z;
        }
    }

    public ReactiveElasticSearchService(ReactiveElasticsearchClient reactiveElasticsearchClient, ElasticSearchIndexManager elasticSearchIndexManager) {
        this(reactiveElasticsearchClient, elasticSearchIndexManager, new ElasticSearchBufferProperties());
    }

    public ReactiveElasticSearchService(ReactiveElasticsearchClient reactiveElasticsearchClient, ElasticSearchIndexManager elasticSearchIndexManager, ElasticSearchBufferProperties elasticSearchBufferProperties) {
        this.restClient = reactiveElasticsearchClient;
        this.indexManager = elasticSearchIndexManager;
        this.buffer = elasticSearchBufferProperties;
        init();
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Flux<T> multiQuery(String[] strArr, Collection<QueryParam> collection, Function<Map<String, Object>, T> function) {
        return this.indexManager.getIndexesMetadata(strArr).flatMap(elasticSearchIndexMetadata -> {
            return Mono.zip(Mono.just(elasticSearchIndexMetadata), getIndexForSearch(elasticSearchIndexMetadata.getIndex()));
        }).take(1L).singleOrEmpty().flatMapMany(tuple2 -> {
            MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
            Flux flatMap = Flux.fromIterable(collection).flatMap(queryParam -> {
                return createSearchRequest(queryParam, strArr);
            });
            multiSearchRequest.getClass();
            return flatMap.doOnNext(multiSearchRequest::add).then(Mono.just(multiSearchRequest)).flatMapMany(multiSearchRequest2 -> {
                return this.restClient.multiSearch(multiSearchRequest2).flatMapMany(multiSearchResponse -> {
                    return Flux.fromArray(multiSearchResponse.getResponses());
                }).flatMap(item -> {
                    if (!item.isFailure()) {
                        return Flux.fromIterable(translate(map -> {
                            return function.apply(((ElasticSearchIndexMetadata) tuple2.getT1()).convertFromElastic(map));
                        }, item.getResponse()));
                    }
                    log.warn(item.getFailureMessage(), item.getFailure());
                    return Mono.empty();
                });
            });
        });
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Flux<T> query(String str, QueryParam queryParam, Function<Map<String, Object>, T> function) {
        return query(new String[]{str}, queryParam, function);
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Flux<T> query(String[] strArr, QueryParam queryParam, Function<Map<String, Object>, T> function) {
        return queryParam.isPaging() ? doQuery(strArr, queryParam).flatMapMany(tuple2 -> {
            return convertQueryResult((List) tuple2.getT1(), (SearchResponse) tuple2.getT2(), function);
        }) : doScrollQuery(strArr, queryParam).flatMap(tuple22 -> {
            return convertQueryHit((List) tuple22.getT1(), (SearchHit) tuple22.getT2(), function);
        });
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Mono<PagerResult<T>> queryPager(String[] strArr, QueryParam queryParam, Function<Map<String, Object>, T> function) {
        return !queryParam.isPaging() ? Mono.zip(count(strArr, queryParam), query(strArr, queryParam, function).collectList(), (l, list) -> {
            return PagerResult.of(l.intValue(), list, queryParam);
        }).switchIfEmpty(Mono.fromSupplier(PagerResult::empty)) : doQuery(strArr, queryParam).flatMap(tuple2 -> {
            return convertQueryResult((List) tuple2.getT1(), (SearchResponse) tuple2.getT2(), function).collectList().filter((v0) -> {
                return CollectionUtils.isNotEmpty(v0);
            }).map(list2 -> {
                return PagerResult.of((int) ((SearchResponse) tuple2.getT2()).getHits().getTotalHits().value, list2, queryParam);
            });
        }).switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
    }

    private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> list, SearchResponse searchResponse, Function<Map<String, Object>, T> function) {
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getIndex();
        }, Function.identity()));
        return Flux.fromIterable(searchResponse.getHits()).mapNotNull(searchHit -> {
            Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
            sourceAsMap.putIfAbsent("id", searchHit.getId());
            return function.apply(((ElasticSearchIndexMetadata) Optional.ofNullable(map.get(searchHit.getIndex())).orElse(list.get(0))).convertFromElastic(sourceAsMap));
        });
    }

    private <T> Flux<T> convertQueryHit(List<ElasticSearchIndexMetadata> list, SearchHit searchHit, Function<Map<String, Object>, T> function) {
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getIndex();
        }, Function.identity()));
        return Flux.just(searchHit).mapNotNull(searchHit2 -> {
            Map<String, Object> sourceAsMap = searchHit2.getSourceAsMap();
            sourceAsMap.putIfAbsent("id", searchHit2.getId());
            return function.apply(((ElasticSearchIndexMetadata) Optional.ofNullable(map.get(searchHit2.getIndex())).orElse(list.get(0))).convertFromElastic(sourceAsMap));
        });
    }

    private Mono<Tuple2<List<ElasticSearchIndexMetadata>, SearchResponse>> doQuery(String[] strArr, QueryParam queryParam) {
        return this.indexManager.getIndexesMetadata(strArr).collectList().filter((v0) -> {
            return CollectionUtils.isNotEmpty(v0);
        }).flatMap(list -> {
            Mono<SearchRequest> createSearchRequest = createSearchRequest(queryParam, (List<ElasticSearchIndexMetadata>) list);
            ReactiveElasticsearchClient reactiveElasticsearchClient = this.restClient;
            reactiveElasticsearchClient.getClass();
            return createSearchRequest.flatMap(reactiveElasticsearchClient::searchForPage).map(searchResponse -> {
                return Tuples.of(list, searchResponse);
            });
        });
    }

    private Flux<Tuple2<List<ElasticSearchIndexMetadata>, SearchHit>> doScrollQuery(String[] strArr, QueryParam queryParam) {
        return this.indexManager.getIndexesMetadata(strArr).collectList().filter((v0) -> {
            return CollectionUtils.isNotEmpty(v0);
        }).flatMapMany(list -> {
            Mono doOnNext = createSearchRequest(queryParam.clone().noPaging(), (List<ElasticSearchIndexMetadata>) list).doOnNext(searchRequest -> {
                searchRequest.source().size(getNoPagingPageSize(queryParam));
            });
            ReactiveElasticsearchClient reactiveElasticsearchClient = this.restClient;
            reactiveElasticsearchClient.getClass();
            return doOnNext.flatMapMany(reactiveElasticsearchClient::scroll).map(searchHit -> {
                return Tuples.of(list, searchHit);
            });
        });
    }

    private int getNoPagingPageSize(QueryParam queryParam) {
        return Math.max(10000, queryParam.getPageSize());
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public Mono<Long> count(String[] strArr, QueryParam queryParam) {
        QueryParam clone = queryParam.clone();
        clone.setPaging(false);
        return createSearchRequest(clone, strArr).flatMap(this::doCount).defaultIfEmpty(0L);
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public Mono<Long> delete(String str, QueryParam queryParam) {
        return getIndexForSearch(str).flatMap(str2 -> {
            return createQueryBuilder(queryParam, str).flatMap(queryBuilder -> {
                return this.restClient.deleteBy(deleteByQueryRequest -> {
                    deleteByQueryRequest.setQuery(queryBuilder).indices(new String[]{str2});
                });
            }).map((v0) -> {
                return v0.getDeleted();
            });
        }).defaultIfEmpty(0L);
    }

    private boolean checkWritable(String str) {
        return true;
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Mono<Void> commit(String str, T t) {
        if (checkWritable(str)) {
            this.writer.write(Buffer.of(str, t));
        }
        return Mono.empty();
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Mono<Void> commit(String str, Collection<T> collection) {
        if (checkWritable(str)) {
            Iterator<T> it = collection.iterator();
            while (it.hasNext()) {
                this.writer.write(Buffer.of(str, it.next()));
            }
        }
        return Mono.empty();
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Mono<Void> commit(String str, Publisher<T> publisher) {
        return !checkWritable(str) ? Mono.empty() : Flux.from(publisher).flatMap(obj -> {
            return commit(str, (String) obj);
        }).then();
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Mono<Void> save(String str, T t) {
        return save(str, (Publisher) Mono.just(t));
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Mono<Void> save(String str, Publisher<T> publisher) {
        return Flux.from(publisher).map(obj -> {
            return Buffer.of(str, obj);
        }).buffer(this.buffer.getSize()).flatMap((v1) -> {
            return doSave(v1);
        }).then();
    }

    @Override // org.jetlinks.community.elastic.search.service.ElasticSearchService
    public <T> Mono<Void> save(String str, Collection<T> collection) {
        return save(str, (Publisher) Flux.fromIterable(collection));
    }

    @PreDestroy
    public void shutdown() {
        this.writer.dispose();
    }

    @PostConstruct
    public void reset() {
        this.writer.settings(bufferSettings -> {
            return bufferSettings.properties(this.buffer);
        });
    }

    private void init() {
        this.writer = new PersistenceBuffer(BufferSettings.create("writer.queue", this.buffer), Buffer::new, this::doSaveBuffer).name("elasticsearch").retryWhenError(th -> {
            return ((th instanceof ElasticsearchException) && ((ElasticsearchException) th).status() == RestStatus.BAD_GATEWAY) || ErrorUtils.hasException(th, new Class[]{WebClientException.class}) || ErrorUtils.hasException(th, new Class[]{IOException.class});
        });
        this.writer.start();
    }

    public Mono<Boolean> doSaveBuffer(Flux<Buffer> flux) {
        return flux.collectList().flatMap((v1) -> {
            return doSave(v1);
        }).subscribeOn(Schedulers.parallel()).map(num -> {
            return Boolean.valueOf(num.intValue() == 0);
        });
    }

    private Mono<String> getIndexForSave(String str) {
        return this.indexManager.getIndexStrategy(str).map(elasticSearchIndexStrategy -> {
            return elasticSearchIndexStrategy.getIndexForSave(str);
        });
    }

    private Mono<String> getIndexForSearch(String str) {
        return this.indexManager.getIndexStrategy(str).map(elasticSearchIndexStrategy -> {
            return elasticSearchIndexStrategy.getIndexForSearch(str);
        });
    }

    protected Mono<Integer> doSave(Collection<Buffer> collection) {
        int size = collection.size();
        return Flux.fromIterable(collection).groupBy((v0) -> {
            return v0.getIndex();
        }, Integer.MAX_VALUE).flatMap(groupedFlux -> {
            return getIndexForSave((String) groupedFlux.key()).flatMapMany(str -> {
                return groupedFlux.map(buffer -> {
                    try {
                        IndexRequest id = buffer.id != null ? new IndexRequest(str).id(buffer.id) : new IndexRequest(str);
                        if (getRestClient().serverVersion().before(Version.V_7_0_0)) {
                            id.type("_doc");
                        }
                        id.source(buffer.payload, XContentType.JSON);
                        IndexRequest indexRequest = id;
                        buffer.release();
                        return indexRequest;
                    } catch (Throwable th) {
                        buffer.release();
                        throw th;
                    }
                });
            });
        }).collectList().filter((v0) -> {
            return CollectionUtils.isNotEmpty(v0);
        }).flatMap(list -> {
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout(TimeValue.timeValueSeconds(9L));
            if (this.buffer.isRefreshWhenWrite()) {
                bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            }
            bulkRequest.getClass();
            list.forEach(bulkRequest::add);
            return this.restClient.bulk(bulkRequest);
        }).doOnError(th -> {
            SystemUtils.printError("保存ElasticSearch数据失败:\n%s", () -> {
                return new Object[]{StringUtils.throwable2String(th)};
            });
        }).map(bulkResponse -> {
            if (bulkResponse.hasFailures()) {
                return 0;
            }
            return Integer.valueOf(size);
        });
    }

    private <T> List<T> translate(Function<Map<String, Object>, T> function, SearchResponse searchResponse) {
        return (List) Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
            Map sourceAsMap = searchHit.getSourceAsMap();
            if (org.springframework.util.StringUtils.isEmpty(sourceAsMap.get("id"))) {
                sourceAsMap.put("id", searchHit.getId());
            }
            return function.apply(sourceAsMap);
        }).collect(Collectors.toList());
    }

    private Mono<Long> doCount(SearchRequest searchRequest) {
        return this.restClient.count(searchRequest);
    }

    protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... strArr) {
        return this.indexManager.getIndexesMetadata(strArr).collectList().filter((v0) -> {
            return CollectionUtils.isNotEmpty(v0);
        }).flatMap(list -> {
            return createSearchRequest(queryParam, (List<ElasticSearchIndexMetadata>) list);
        });
    }

    protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> list) {
        SearchSourceBuilder convertSearchSourceBuilder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, list.get(0));
        return Flux.fromIterable(list).flatMap(elasticSearchIndexMetadata -> {
            return getIndexForSearch(elasticSearchIndexMetadata.getIndex());
        }).collectList().map(list2 -> {
            return new SearchRequest((String[]) list2.toArray(new String[0])).source(convertSearchSourceBuilder).indicesOptions(indexOptions);
        });
    }

    protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String str) {
        return this.indexManager.getIndexMetadata(str).map(elasticSearchIndexMetadata -> {
            return QueryParamTranslator.createQueryBuilder(queryParam, elasticSearchIndexMetadata);
        }).switchIfEmpty(Mono.fromSupplier(() -> {
            return QueryParamTranslator.createQueryBuilder(queryParam, null);
        }));
    }

    public ReactiveElasticsearchClient getRestClient() {
        return this.restClient;
    }

    public ElasticSearchIndexManager getIndexManager() {
        return this.indexManager;
    }

    public ElasticSearchBufferProperties getBuffer() {
        return this.buffer;
    }

    public void setBuffer(ElasticSearchBufferProperties elasticSearchBufferProperties) {
        this.buffer = elasticSearchBufferProperties;
    }

    static {
        DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
        DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd HH:mm:ss.SSS"));
    }
}
