package org.jetlinks.community.tdengine.restful;

import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.tdengine.Point;
import org.jetlinks.community.tdengine.TDEngineDataWriter;
import org.jetlinks.community.tdengine.TDEngineUtils;
import org.jetlinks.community.tdengine.TDengineProperties;
import org.jetlinks.community.utils.ErrorUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientException;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/community/tdengine/restful/SchemalessTDEngineDataWriter.class */
public class SchemalessTDEngineDataWriter implements TDEngineDataWriter, Disposable {
    private final WebClient client;
    private final String database;
    private final DataBufferFactory factory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
    private final PersistenceBuffer<String> buffer;
    private static final byte[] newLine = "\n".getBytes();

    public SchemalessTDEngineDataWriter(WebClient webClient, String str, TDengineProperties.Buffer buffer) {
        this.client = webClient;
        this.database = str;
        if (!buffer.isEnabled()) {
            this.buffer = null;
        } else {
            this.buffer = new PersistenceBuffer(BufferSettings.create("tdengine-writer.queue", buffer), (Supplier) null, flux -> {
                return writeNow(flux).thenReturn(false);
            }).name("tdengine").parallelism(buffer.getParallelism()).retryWhenError(th -> {
                return ErrorUtils.hasException(th, new Class[]{WebClientException.class}) || ErrorUtils.hasException(th, new Class[]{IOException.class});
            });
            this.buffer.start();
        }
    }

    public void dispose() {
        if (null != this.buffer) {
            this.buffer.dispose();
        }
    }

    @Override // org.jetlinks.community.tdengine.TDEngineDataWriter
    public Mono<Void> write(Point point) {
        if (this.buffer == null) {
            return writeNow(Flux.just(convertToLine(point)));
        }
        this.buffer.write(convertToLine(point));
        return Mono.empty();
    }

    @Override // org.jetlinks.community.tdengine.TDEngineDataWriter
    public Mono<Void> write(Flux<Point> flux) {
        return writeNow(flux.map(this::convertToLine));
    }

    private Mono<Void> writeNow(Flux<String> flux) {
        return this.client.post().uri(uriBuilder -> {
            return uriBuilder.path("/influxdb/v1/write").queryParam("db", new Object[]{this.database}).build(new Object[0]);
        }).body(flux.map(str -> {
            byte[] bytes = str.getBytes();
            return this.factory.allocateBuffer(bytes.length + newLine.length).write(bytes).write(newLine);
        }), DataBuffer.class).exchangeToMono(TDEngineUtils::checkExecuteResult).then();
    }

    private String convertToLine(Point point) {
        return org.influxdb.dto.Point.measurement(point.getMetric()).tag(point.getTags()).fields(point.getValues()).time(point.getTimestamp(), TimeUnit.MILLISECONDS).build().lineProtocol();
    }

    public SchemalessTDEngineDataWriter(WebClient webClient, String str, PersistenceBuffer<String> persistenceBuffer) {
        this.client = webClient;
        this.database = str;
        this.buffer = persistenceBuffer;
    }
}
