package org.jetlinks.community.network.tcp.parser.strateies;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.class */
public class PipePayloadParser implements PayloadParser {
    private static final Logger log = LoggerFactory.getLogger(PipePayloadParser.class);
    private static final AtomicIntegerFieldUpdater<PipePayloadParser> CURRENT_PIPE = AtomicIntegerFieldUpdater.newUpdater(PipePayloadParser.class, "currentPipe");
    private final Sinks.Many<Buffer> sink = Reactors.createMany();
    private final List<BiConsumer<Buffer, PipePayloadParser>> pipe = new CopyOnWriteArrayList();
    private final List<Buffer> result = new CopyOnWriteArrayList();
    private volatile RecordParser recordParser;
    private Function<Buffer, Buffer> directMapper;
    private Consumer<RecordParser> firstInit;
    private volatile int currentPipe;

    public Buffer newBuffer() {
        return Buffer.buffer();
    }

    public PipePayloadParser result(String str) {
        return result(Buffer.buffer(str));
    }

    public PipePayloadParser result(byte[] bArr) {
        return result(Buffer.buffer(bArr));
    }

    public PipePayloadParser handler(BiConsumer<Buffer, PipePayloadParser> biConsumer) {
        this.pipe.add(biConsumer);
        return this;
    }

    public PipePayloadParser delimited(String str) {
        if (this.recordParser != null) {
            this.recordParser.delimitedMode(str);
            return this;
        }
        setParser(RecordParser.newDelimited(str));
        this.firstInit = recordParser -> {
            recordParser.delimitedMode(str);
        };
        return this;
    }

    public PipePayloadParser fixed(int i) {
        if (i == 0) {
            complete();
            return this;
        }
        if (this.recordParser != null) {
            this.recordParser.fixedSizeMode(i);
            return this;
        }
        setParser(RecordParser.newFixed(i));
        this.firstInit = recordParser -> {
            recordParser.fixedSizeMode(i);
        };
        return this;
    }

    public PipePayloadParser direct(Function<Buffer, Buffer> function) {
        this.directMapper = function;
        return this;
    }

    private BiConsumer<Buffer, PipePayloadParser> getNextHandler() {
        int andIncrement = CURRENT_PIPE.getAndIncrement(this);
        if (andIncrement < this.pipe.size()) {
            return this.pipe.get(andIncrement);
        }
        CURRENT_PIPE.set(this, 0);
        return this.pipe.get(0);
    }

    private void setParser(RecordParser recordParser) {
        this.recordParser = recordParser;
        this.recordParser.handler(buffer -> {
            getNextHandler().accept(buffer, this);
        });
    }

    public PipePayloadParser complete() {
        CURRENT_PIPE.set(this, 0);
        if (this.recordParser != null) {
            this.firstInit.accept(this.recordParser);
        }
        if (!this.result.isEmpty()) {
            Buffer buffer = Buffer.buffer();
            Iterator<Buffer> it = this.result.iterator();
            while (it.hasNext()) {
                buffer.appendBuffer(it.next());
            }
            this.result.clear();
            this.sink.emitNext(buffer, Reactors.emitFailureHandler());
        }
        return this;
    }

    public PipePayloadParser result(Buffer buffer) {
        this.result.add(buffer);
        return this;
    }

    @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
    public synchronized void handle(Buffer buffer) {
        if (this.recordParser == null && this.directMapper == null) {
            log.error("record parser not init");
            return;
        }
        if (this.recordParser != null) {
            this.recordParser.handle(buffer);
            return;
        }
        Buffer apply = this.directMapper.apply(buffer);
        if (null != apply) {
            this.sink.emitNext(apply, Reactors.emitFailureHandler());
        }
    }

    @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
    public Flux<Buffer> handlePayload() {
        return this.sink.asFlux();
    }

    @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
    public void reset() {
        this.result.clear();
        complete();
    }

    @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
    public void close() {
        this.sink.tryEmitComplete();
        CURRENT_PIPE.set(this, 0);
        this.result.clear();
    }
}
