/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.tdengine.metadata;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.hswebframework.ezorm.rdb.executor.BatchSqlRequest;
import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.jetlinks.community.tdengine.TDengineException;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TDengineRestfulSqlExecutor
implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(TDengineRestfulSqlExecutor.class);
    private final WebClient client;

    public Mono<Integer> update(Publisher<SqlRequest> request) {
        return this.doExecute(request).then(Reactors.ALWAYS_ONE);
    }

    public Mono<Void> execute(Publisher<SqlRequest> request) {
        return this.doExecute(request).then();
    }

    public <E> Flux<E> select(Publisher<SqlRequest> requests, ResultWrapper<E, ?> wrapper) {
        return this.doExecute(requests).flatMap(response -> this.convertQueryResult((JSONObject)response, wrapper));
    }

    private Flux<JSONObject> doExecute(Publisher<SqlRequest> requests) {
        return Flux.from(requests).expand(request -> {
            if (request instanceof BatchSqlRequest) {
                return Flux.fromIterable((Iterable)((BatchSqlRequest)request).getBatch());
            }
            return Flux.empty();
        }).filter(SqlRequest::isNotEmpty).concatMap(request -> {
            String sql = request.toNativeSql();
            log.trace("Execute ==> {}", (Object)sql);
            return ((WebClient.RequestBodySpec)this.client.post().uri("/rest/sql", new Object[0])).bodyValue((Object)sql).exchangeToMono(response -> response.bodyToMono(String.class).map(json -> {
                JSONObject result = JSON.parseObject((String)json);
                this.checkExecuteResult(sql, result);
                return result;
            }));
        });
    }

    private void checkExecuteResult(String sql, JSONObject result) {
        if (result.getInteger("code") != 0) {
            String error = result.getString("desc");
            if (sql.startsWith("describe") && error.contains("does not exist")) {
                return;
            }
            log.warn("execute tdengine sql error [{}]: [{}]", (Object)error, (Object)sql);
            throw new TDengineException(sql, result.getString("desc"));
        }
    }

    protected <E> Flux<E> convertQueryResult(JSONObject result, ResultWrapper<E, ?> wrapper) {
        JSONArray head = result.getJSONArray("column_meta");
        JSONArray data = result.getJSONArray("data");
        if (CollectionUtils.isEmpty((Collection)head) || CollectionUtils.isEmpty((Collection)data)) {
            return Flux.empty();
        }
        List columns = head.stream().map(v -> ((JSONArray)v).getString(0)).collect(Collectors.toList());
        return Flux.create(sink -> {
            wrapper.beforeWrap(() -> columns);
            for (Object rowo : data) {
                Object rowInstance = wrapper.newRowInstance();
                JSONArray row = (JSONArray)rowo;
                for (int i = 0; i < columns.size(); ++i) {
                    String property = (String)columns.get(i);
                    Object value = row.get(i);
                    DefaultColumnWrapperContext context = new DefaultColumnWrapperContext(i, property, value, rowInstance);
                    wrapper.wrapColumn((ColumnWrapperContext)context);
                    rowInstance = context.getRowInstance();
                }
                if (!wrapper.completedWrapRow(rowInstance)) break;
                if (rowInstance == null) continue;
                sink.next(rowInstance);
            }
            wrapper.completedWrap();
            sink.complete();
        });
    }

    public TDengineRestfulSqlExecutor(WebClient client) {
        this.client = client;
    }
}

