/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.elastic.search.service.reactive;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.utils.StringUtils;
import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.MemoryUsage;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.service.reactive.ElasticSearchBufferProperties;
import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.utils.SerializeUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.DependsOn;
import org.springframework.web.reactive.function.client.WebClientException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@DependsOn(value={"reactiveElasticsearchClient"})
@ConfigurationProperties(prefix="elasticsearch")
public class ReactiveElasticSearchService
implements ElasticSearchService {
    private static final Logger log = LoggerFactory.getLogger(ReactiveElasticSearchService.class);
    private final ReactiveElasticsearchClient restClient;
    private final ElasticSearchIndexManager indexManager;
    public static final IndicesOptions indexOptions = IndicesOptions.fromOptions((boolean)true, (boolean)true, (boolean)false, (boolean)false);
    private PersistenceBuffer<Buffer> writer;
    private ElasticSearchBufferProperties buffer;

    public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient, ElasticSearchIndexManager indexManager) {
        this(restClient, indexManager, new ElasticSearchBufferProperties());
    }

    public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient, ElasticSearchIndexManager indexManager, ElasticSearchBufferProperties buffer) {
        this.restClient = restClient;
        this.indexManager = indexManager;
        this.buffer = buffer;
        this.init();
    }

    @Override
    public <T> Flux<T> multiQuery(String[] index, Collection<QueryParam> queryParam, Function<Map<String, Object>, T> mapper) {
        return this.indexManager.getIndexesMetadata(index).flatMap(idx -> Mono.zip((Mono)Mono.just((Object)idx), this.getIndexForSearch(idx.getIndex()))).take(1L).singleOrEmpty().flatMapMany(indexMetadata -> {
            MultiSearchRequest request = new MultiSearchRequest();
            return Flux.fromIterable((Iterable)queryParam).flatMap(entry -> this.createSearchRequest((QueryParam)entry, index)).doOnNext(arg_0 -> ((MultiSearchRequest)request).add(arg_0)).then(Mono.just((Object)request)).flatMapMany(searchRequest -> this.restClient.multiSearch((MultiSearchRequest)searchRequest).flatMapMany(response -> Flux.fromArray((Object[])response.getResponses())).flatMap(item -> {
                if (item.isFailure()) {
                    log.warn(item.getFailureMessage(), (Throwable)item.getFailure());
                    return Mono.empty();
                }
                return Flux.fromIterable(this.translate(map -> mapper.apply(((ElasticSearchIndexMetadata)indexMetadata.getT1()).convertFromElastic((Map<String, Object>)map)), item.getResponse()));
            }));
        });
    }

    @Override
    public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
        return this.query(new String[]{index}, queryParam, mapper);
    }

    @Override
    public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
        if (queryParam.isPaging()) {
            return this.doQuery(index, queryParam).flatMapMany(tp2 -> this.convertQueryResult((List)tp2.getT1(), (SearchResponse)tp2.getT2(), mapper));
        }
        return this.doScrollQuery(index, queryParam).flatMap(tp2 -> this.convertQueryHit((List)tp2.getT1(), (SearchHit)tp2.getT2(), mapper));
    }

    @Override
    public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
        if (!queryParam.isPaging()) {
            return Mono.zip(this.count(index, queryParam), (Mono)this.query(index, queryParam, mapper).collectList(), (total, list) -> PagerResult.of((int)total.intValue(), (List)list, (QueryParam)queryParam)).switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
        }
        return this.doQuery(index, queryParam).flatMap(tp2 -> this.convertQueryResult((List)tp2.getT1(), (SearchResponse)tp2.getT2(), mapper).collectList().filter(CollectionUtils::isNotEmpty).map(list -> PagerResult.of((int)((int)((SearchResponse)tp2.getT2()).getHits().getTotalHits().value), (List)list, (QueryParam)queryParam))).switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
    }

    private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList, SearchResponse response, Function<Map<String, Object>, T> mapper) {
        Map metadata = indexList.stream().collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity()));
        return Flux.fromIterable((Iterable)response.getHits()).mapNotNull(hit -> {
            Map hitMap = hit.getSourceAsMap();
            hitMap.putIfAbsent("id", hit.getId());
            return mapper.apply(((ElasticSearchIndexMetadata)Optional.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))).convertFromElastic(hitMap));
        });
    }

    private <T> Flux<T> convertQueryHit(List<ElasticSearchIndexMetadata> indexList, SearchHit searchHit, Function<Map<String, Object>, T> mapper) {
        Map metadata = indexList.stream().collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity()));
        return Flux.just((Object)searchHit).mapNotNull(hit -> {
            Map hitMap = hit.getSourceAsMap();
            hitMap.putIfAbsent("id", hit.getId());
            return mapper.apply(((ElasticSearchIndexMetadata)Optional.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))).convertFromElastic(hitMap));
        });
    }

    private Mono<Tuple2<List<ElasticSearchIndexMetadata>, SearchResponse>> doQuery(String[] index, QueryParam queryParam) {
        return this.indexManager.getIndexesMetadata(index).collectList().filter(CollectionUtils::isNotEmpty).flatMap(metadataList -> this.createSearchRequest(queryParam, (List<ElasticSearchIndexMetadata>)metadataList).flatMap(this.restClient::searchForPage).map(response -> Tuples.of((Object)metadataList, (Object)response)));
    }

    private Flux<Tuple2<List<ElasticSearchIndexMetadata>, SearchHit>> doScrollQuery(String[] index, QueryParam queryParam) {
        return this.indexManager.getIndexesMetadata(index).collectList().filter(CollectionUtils::isNotEmpty).flatMapMany(metadataList -> this.createSearchRequest(queryParam.clone().noPaging(), (List<ElasticSearchIndexMetadata>)metadataList).doOnNext(search -> search.source().size(this.getNoPagingPageSize(queryParam))).flatMapMany(arg_0 -> ((ReactiveElasticsearchClient)this.restClient).scroll(arg_0)).map(searchHit -> Tuples.of((Object)metadataList, (Object)searchHit)));
    }

    private int getNoPagingPageSize(QueryParam param) {
        return Math.max(10000, param.getPageSize());
    }

    @Override
    public Mono<Long> count(String[] index, QueryParam queryParam) {
        QueryParam param = queryParam.clone();
        param.setPaging(false);
        return this.createSearchRequest(param, index).flatMap(this::doCount).defaultIfEmpty((Object)0L);
    }

    @Override
    public Mono<Long> delete(String index, QueryParam queryParam) {
        return this.getIndexForSearch(index).flatMap(inx -> this.createQueryBuilder(queryParam, index).flatMap(request -> this.restClient.deleteBy(delete -> delete.setQuery(request).indices(new String[]{inx}))).map(BulkByScrollResponse::getDeleted)).defaultIfEmpty((Object)0L);
    }

    private boolean checkWritable(String index) {
        return true;
    }

    @Override
    public <T> Mono<Void> commit(String index, T payload) {
        if (this.checkWritable(index)) {
            this.writer.write((Serializable)Buffer.of(index, payload));
        }
        return Mono.empty();
    }

    @Override
    public <T> Mono<Void> commit(String index, Collection<T> payload) {
        if (this.checkWritable(index)) {
            for (T t : payload) {
                this.writer.write((Serializable)Buffer.of(index, t));
            }
        }
        return Mono.empty();
    }

    @Override
    public <T> Mono<Void> commit(String index, Publisher<T> data) {
        if (!this.checkWritable(index)) {
            return Mono.empty();
        }
        return Flux.from(data).flatMap(d -> this.commit(index, d)).then();
    }

    @Override
    public <T> Mono<Void> save(String index, T payload) {
        return this.save(index, (Publisher<T>)Mono.just(payload));
    }

    @Override
    public <T> Mono<Void> save(String index, Publisher<T> data) {
        return Flux.from(data).map(v -> Buffer.of(index, v)).buffer(this.buffer.getSize()).flatMap(this::doSave).then();
    }

    @Override
    public <T> Mono<Void> save(String index, Collection<T> payload) {
        return this.save(index, (Publisher<T>)Flux.fromIterable(payload));
    }

    @PreDestroy
    public void shutdown() {
        this.writer.dispose();
    }

    @PostConstruct
    public void reset() {
        this.writer.settings(bufferSettings -> bufferSettings.properties((BufferProperties)this.buffer));
    }

    private void init() {
        this.writer = new PersistenceBuffer(BufferSettings.create((String)"writer.queue", (BufferProperties)this.buffer), Buffer::new, this::doSaveBuffer).name("elasticsearch").retryWhenError(e -> {
            ElasticsearchException elasticsearchException;
            if (e instanceof ElasticsearchException && (elasticsearchException = (ElasticsearchException)e).status() == RestStatus.BAD_GATEWAY) {
                return true;
            }
            return ErrorUtils.hasException((Throwable)e, (Class[])new Class[]{WebClientException.class}) || ErrorUtils.hasException((Throwable)e, (Class[])new Class[]{IOException.class});
        });
        this.writer.start();
    }

    public Mono<Boolean> doSaveBuffer(Flux<Buffer> bufferFlux) {
        return bufferFlux.collectList().flatMap(this::doSave).subscribeOn(Schedulers.parallel()).map(i -> i == 0);
    }

    private Mono<String> getIndexForSave(String index) {
        return this.indexManager.getIndexStrategy(index).map(strategy -> strategy.getIndexForSave(index));
    }

    private Mono<String> getIndexForSearch(String index) {
        return this.indexManager.getIndexStrategy(index).map(strategy -> strategy.getIndexForSearch(index));
    }

    protected Mono<Integer> doSave(Collection<Buffer> buffers) {
        int size = buffers.size();
        return Flux.fromIterable(buffers).groupBy(Buffer::getIndex, Integer.MAX_VALUE).flatMap(group -> {
            String index = (String)group.key();
            return this.getIndexForSave(index).flatMapMany(realIndex -> group.map(buffer -> {
                try {
                    IndexRequest request = buffer.id != null ? new IndexRequest(realIndex).id(buffer.id) : new IndexRequest(realIndex);
                    if (this.getRestClient().serverVersion().before(Version.V_7_0_0)) {
                        request.type("_doc");
                    }
                    request.source(buffer.payload, XContentType.JSON);
                    IndexRequest indexRequest = request;
                    return indexRequest;
                }
                finally {
                    buffer.release();
                }
            }));
        }).collectList().filter(CollectionUtils::isNotEmpty).flatMap(lst -> {
            BulkRequest request = new BulkRequest();
            request.timeout(TimeValue.timeValueSeconds((long)9L));
            if (this.buffer.isRefreshWhenWrite()) {
                request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            }
            lst.forEach(arg_0 -> ((BulkRequest)request).add(arg_0));
            return this.restClient.bulk(request);
        }).doOnError(err -> SystemUtils.printError((String)"\u4fdd\u5b58ElasticSearch\u6570\u636e\u5931\u8d25:\n%s", () -> new Object[]{StringUtils.throwable2String((Throwable)err)})).map(response -> {
            if (response.hasFailures()) {
                return 0;
            }
            return size;
        });
    }

    private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
        return Arrays.stream(response.getHits().getHits()).map(hit -> {
            Map hitMap = hit.getSourceAsMap();
            if (org.springframework.util.StringUtils.isEmpty(hitMap.get("id"))) {
                hitMap.put("id", hit.getId());
            }
            return mapper.apply(hitMap);
        }).collect(Collectors.toList());
    }

    private Mono<Long> doCount(SearchRequest request) {
        return this.restClient.count(request);
    }

    protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String ... indexes) {
        return this.indexManager.getIndexesMetadata(indexes).collectList().filter(CollectionUtils::isNotEmpty).flatMap(list -> this.createSearchRequest(queryParam, (List<ElasticSearchIndexMetadata>)list));
    }

    protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
        SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
        return Flux.fromIterable(indexes).flatMap(index -> this.getIndexForSearch(index.getIndex())).collectList().map(indexList -> new SearchRequest(indexList.toArray(new String[0])).source(builder).indicesOptions(indexOptions));
    }

    protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
        return this.indexManager.getIndexMetadata(index).map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata)).switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
    }

    public ReactiveElasticsearchClient getRestClient() {
        return this.restClient;
    }

    public ElasticSearchIndexManager getIndexManager() {
        return this.indexManager;
    }

    public ElasticSearchBufferProperties getBuffer() {
        return this.buffer;
    }

    public void setBuffer(ElasticSearchBufferProperties buffer) {
        this.buffer = buffer;
    }

    static {
        DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
        DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd HH:mm:ss.SSS"));
    }

    public static class Buffer
    implements Externalizable,
    MemoryUsage {
        private static final long serialVersionUID = 1L;
        String index;
        String id;
        byte[] payload;

        public static Buffer of(String index, Object payload) {
            Buffer buffer = new Buffer();
            buffer.index = index;
            Map data = payload instanceof Map ? (Map)payload : (Map)FastBeanCopier.copy((Object)payload, HashMap::new, (String[])new String[0]);
            Object id = data.get("id");
            buffer.id = id == null ? null : String.valueOf(id);
            buffer.payload = ObjectMappers.JSON_MAPPER.writeValueAsBytes((Object)data);
            return buffer;
        }

        void release() {
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeUTF(this.index);
            SerializeUtils.writeNullableUTF((String)this.id, (ObjectOutput)out);
            out.writeInt(this.payload.length);
            out.write(this.payload);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.index = in.readUTF();
            this.id = SerializeUtils.readNullableUTF((ObjectInput)in);
            int length = in.readInt();
            this.payload = new byte[length];
            in.readFully(this.payload);
        }

        public int usage() {
            return this.payload == null ? 64 : 64 + this.payload.length;
        }

        public String getIndex() {
            return this.index;
        }

        public String getId() {
            return this.id;
        }

        public byte[] getPayload() {
            return this.payload;
        }
    }

    public static class BufferConfig
    extends BufferProperties {
        private boolean refreshWhenWrite = false;

        public BufferConfig() {
            this.setFilePath("./data/elasticsearch-buffer");
            this.setSize(3000);
        }

        public boolean isRefreshWhenWrite() {
            return this.refreshWhenWrite;
        }

        public void setRefreshWhenWrite(boolean refreshWhenWrite) {
            this.refreshWhenWrite = refreshWhenWrite;
        }
    }
}

