package org.jetlinks.community.network.tcp.parser.strateies;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import java.util.function.Supplier;
import org.jetlinks.community.ValueObject;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/community/network/tcp/parser/strateies/VertxPayloadParserBuilder.class */
public abstract class VertxPayloadParserBuilder implements PayloadParserBuilderStrategy {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/community/network/tcp/parser/strateies/VertxPayloadParserBuilder$RecordPayloadParser.class */
    public static class RecordPayloadParser implements PayloadParser {
        private final Supplier<RecordParser> recordParserSupplier;
        private final Sinks.Many<Buffer> sink = Reactors.createMany();
        private RecordParser recordParser;

        public RecordPayloadParser(Supplier<RecordParser> supplier) {
            this.recordParserSupplier = supplier;
            reset();
        }

        @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
        public void handle(Buffer buffer) {
            this.recordParser.handle(buffer);
        }

        @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
        public Flux<Buffer> handlePayload() {
            return this.sink.asFlux();
        }

        @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
        public void close() {
            this.sink.emitComplete(Reactors.emitFailureHandler());
        }

        @Override // org.jetlinks.community.network.tcp.parser.PayloadParser
        public void reset() {
            this.recordParser = this.recordParserSupplier.get();
            this.recordParser.handler(buffer -> {
                this.sink.emitNext(buffer, Reactors.emitFailureHandler());
            });
        }
    }

    @Override // org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy
    public abstract PayloadParserType getType();

    protected abstract Supplier<RecordParser> createParser(ValueObject valueObject);

    @Override // org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy
    public Supplier<PayloadParser> buildLazy(ValueObject valueObject) {
        Supplier<RecordParser> createParser = createParser(valueObject);
        return () -> {
            return new RecordPayloadParser(createParser);
        };
    }
}
