/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.tdengine.restful;

import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.tdengine.Point;
import org.jetlinks.community.tdengine.TDEngineDataWriter;
import org.jetlinks.community.tdengine.TDEngineUtils;
import org.jetlinks.community.tdengine.TDengineProperties;
import org.jetlinks.community.utils.ErrorUtils;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientException;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class SchemalessTDEngineDataWriter
implements TDEngineDataWriter,
Disposable {
    private final WebClient client;
    private final String database;
    private final DataBufferFactory factory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
    private final PersistenceBuffer<String> buffer;
    private static final byte[] newLine = "\n".getBytes();

    public SchemalessTDEngineDataWriter(WebClient client, String database, TDengineProperties.Buffer buffer) {
        this.client = client;
        this.database = database;
        if (buffer.isEnabled()) {
            this.buffer = new PersistenceBuffer(BufferSettings.create((String)"tdengine-writer.queue", (BufferProperties)buffer), null, list -> this.writeNow((Flux<String>)list).thenReturn((Object)false)).name("tdengine").parallelism(buffer.getParallelism()).retryWhenError(e -> ErrorUtils.hasException((Throwable)e, (Class[])new Class[]{WebClientException.class}) || ErrorUtils.hasException((Throwable)e, (Class[])new Class[]{IOException.class}));
            this.buffer.start();
        } else {
            this.buffer = null;
        }
    }

    public void dispose() {
        if (null != this.buffer) {
            this.buffer.dispose();
        }
    }

    @Override
    public Mono<Void> write(Point point) {
        if (this.buffer == null) {
            return this.writeNow((Flux<String>)Flux.just((Object)this.convertToLine(point)));
        }
        this.buffer.write((Serializable)((Object)this.convertToLine(point)));
        return Mono.empty();
    }

    @Override
    public Mono<Void> write(Flux<Point> points) {
        return this.writeNow((Flux<String>)points.map(this::convertToLine));
    }

    private Mono<Void> writeNow(Flux<String> lines) {
        return ((WebClient.RequestBodySpec)this.client.post().uri(builder -> builder.path("/influxdb/v1/write").queryParam("db", new Object[]{this.database}).build(new Object[0]))).body((Publisher)lines.map(str -> {
            byte[] data = str.getBytes();
            return this.factory.allocateBuffer(data.length + newLine.length).write(data).write(newLine);
        }), DataBuffer.class).exchangeToMono(TDEngineUtils::checkExecuteResult).then();
    }

    private String convertToLine(Point point) {
        return org.influxdb.dto.Point.measurement((String)point.getMetric()).tag(point.getTags()).fields(point.getValues()).time(point.getTimestamp(), TimeUnit.MILLISECONDS).build().lineProtocol();
    }

    public SchemalessTDEngineDataWriter(WebClient client, String database, PersistenceBuffer<String> buffer) {
        this.client = client;
        this.database = database;
        this.buffer = buffer;
    }
}

