package org.jetlinks.community.tdengine.metadata;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.hswebframework.ezorm.rdb.executor.BatchSqlRequest;
import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.jetlinks.community.tdengine.TDengineException;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/community/tdengine/metadata/TDengineRestfulSqlExecutor.class */
public class TDengineRestfulSqlExecutor implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(TDengineRestfulSqlExecutor.class);
    private final WebClient client;

    public Mono<Integer> update(Publisher<SqlRequest> publisher) {
        return doExecute(publisher).then(Reactors.ALWAYS_ONE);
    }

    public Mono<Void> execute(Publisher<SqlRequest> publisher) {
        return doExecute(publisher).then();
    }

    public <E> Flux<E> select(Publisher<SqlRequest> publisher, ResultWrapper<E, ?> resultWrapper) {
        return doExecute(publisher).flatMap(jSONObject -> {
            return convertQueryResult(jSONObject, resultWrapper);
        });
    }

    private Flux<JSONObject> doExecute(Publisher<SqlRequest> publisher) {
        return Flux.from(publisher).expand(sqlRequest -> {
            return sqlRequest instanceof BatchSqlRequest ? Flux.fromIterable(((BatchSqlRequest) sqlRequest).getBatch()) : Flux.empty();
        }).filter((v0) -> {
            return v0.isNotEmpty();
        }).concatMap(sqlRequest2 -> {
            String nativeSql = sqlRequest2.toNativeSql();
            log.trace("Execute ==> {}", nativeSql);
            return this.client.post().uri("/rest/sql", new Object[0]).bodyValue(nativeSql).exchangeToMono(clientResponse -> {
                return clientResponse.bodyToMono(String.class).map(str -> {
                    JSONObject parseObject = JSON.parseObject(str);
                    checkExecuteResult(nativeSql, parseObject);
                    return parseObject;
                });
            });
        });
    }

    private void checkExecuteResult(String str, JSONObject jSONObject) {
        if (jSONObject.getInteger("code").intValue() != 0) {
            String string = jSONObject.getString("desc");
            if (str.startsWith("describe") && string.contains("does not exist")) {
                return;
            }
            log.warn("execute tdengine sql error [{}]: [{}]", string, str);
            throw new TDengineException(str, jSONObject.getString("desc"));
        }
    }

    protected <E> Flux<E> convertQueryResult(JSONObject jSONObject, ResultWrapper<E, ?> resultWrapper) {
        JSONArray jSONArray = jSONObject.getJSONArray("column_meta");
        JSONArray jSONArray2 = jSONObject.getJSONArray("data");
        if (CollectionUtils.isEmpty(jSONArray) || CollectionUtils.isEmpty(jSONArray2)) {
            return Flux.empty();
        }
        List list = (List) jSONArray.stream().map(obj -> {
            return ((JSONArray) obj).getString(0);
        }).collect(Collectors.toList());
        return Flux.create(fluxSink -> {
            resultWrapper.beforeWrap(() -> {
                return list;
            });
            Iterator it = jSONArray2.iterator();
            while (it.hasNext()) {
                Object next = it.next();
                Object newRowInstance = resultWrapper.newRowInstance();
                JSONArray jSONArray3 = (JSONArray) next;
                for (int i = 0; i < list.size(); i++) {
                    DefaultColumnWrapperContext defaultColumnWrapperContext = new DefaultColumnWrapperContext(i, (String) list.get(i), jSONArray3.get(i), newRowInstance);
                    resultWrapper.wrapColumn(defaultColumnWrapperContext);
                    newRowInstance = defaultColumnWrapperContext.getRowInstance();
                }
                if (!resultWrapper.completedWrapRow(newRowInstance)) {
                    break;
                } else if (newRowInstance != null) {
                    fluxSink.next(newRowInstance);
                }
            }
            resultWrapper.completedWrap();
            fluxSink.complete();
        });
    }

    public TDengineRestfulSqlExecutor(WebClient webClient) {
        this.client = webClient;
    }
}
