PipePayloadParser.java
package org.jetlinks.community.network.tcp.parser.strateies;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* <pre>{@code
* PipePayloadParser payloadParser =
* //先读取4个字节
* new PipePayloadParser()
* .fixed(4)
* //第一次读取数据
* .handler((buffer,parser) -> {
* //4字节转为int,表示接下来要读取的包长度
* int len = buffer.getInt(0);
* parser
* .result(buffer) //将已读取的4字节设置到结果中
* .fixed(len);//设置接下来要读取的字节长度
* })
* //第二次读取数据
* .handler((buffer,parser) -> parser
* .result(buffer) //设置结果
* .complete() //完成本次读取,输出结果,开始下一次读取
* );
* }</pre>
*/
@Slf4j
public class PipePayloadParser implements PayloadParser {
private final static AtomicIntegerFieldUpdater<PipePayloadParser> CURRENT_PIPE =
AtomicIntegerFieldUpdater.newUpdater(PipePayloadParser.class, "currentPipe");
private final Sinks.Many<Buffer> sink = Reactors.createMany();
private final List<BiConsumer<Buffer, PipePayloadParser>> pipe = new CopyOnWriteArrayList<>();
private final List<Buffer> result = new CopyOnWriteArrayList<>();
private volatile RecordParser recordParser;
private Function<Buffer, Buffer> directMapper;
private Consumer<RecordParser> firstInit;
private volatile int currentPipe;
public Buffer newBuffer() {
return Buffer.buffer();
}
public PipePayloadParser result(String buffer) {
return result(Buffer.buffer(buffer));
}
public PipePayloadParser result(byte[] buffer) {
return result(Buffer.buffer(buffer));
}
// public PipePayloadParser handler(Consumer<Buffer> handler) {
//
// return handler((payloadParser, buffer) -> handler.accept(buffer));
// }
public PipePayloadParser handler(BiConsumer<Buffer, PipePayloadParser> handler) {
pipe.add(handler);
return this;
}
public PipePayloadParser delimited(String delimited) {
if (recordParser == null) {
setParser(RecordParser.newDelimited(delimited));
firstInit = (parser -> parser.delimitedMode(delimited));
return this;
}
recordParser.delimitedMode(delimited);
return this;
}
public PipePayloadParser fixed(int size) {
if (size == 0) {
complete();
return this;
}
if (recordParser == null) {
setParser(RecordParser.newFixed(size));
firstInit = (parser -> parser.fixedSizeMode(size));
return this;
}
recordParser.fixedSizeMode(size);
return this;
}
public PipePayloadParser direct(Function<Buffer, Buffer> mapper) {
this.directMapper = mapper;
return this;
}
private BiConsumer<Buffer, PipePayloadParser> getNextHandler() {
int i = CURRENT_PIPE.getAndIncrement(this);
if (i < pipe.size()) {
return pipe.get(i);
}
CURRENT_PIPE.set(this, 0);
return pipe.get(0);
}
private void setParser(RecordParser parser) {
this.recordParser = parser;
this.recordParser.handler(buffer -> getNextHandler().accept(buffer, this));
}
public PipePayloadParser complete() {
CURRENT_PIPE.set(this, 0);
if (recordParser != null) {
firstInit.accept(recordParser);
}
if (!this.result.isEmpty()) {
Buffer buffer = Buffer.buffer();
for (Buffer buf : this.result) {
buffer.appendBuffer(buf);
}
this.result.clear();
sink.emitNext(buffer, Reactors.emitFailureHandler());
}
return this;
}
public PipePayloadParser result(Buffer buffer) {
this.result.add(buffer);
return this;
}
@Override
public synchronized void handle(Buffer buffer) {
if (recordParser == null && directMapper == null) {
log.error("record parser not init");
return;
}
if (recordParser != null) {
recordParser.handle(buffer);
return;
}
Buffer buf = directMapper.apply(buffer);
if (null != buf) {
sink.emitNext(buf, Reactors.emitFailureHandler());
}
}
@Override
public Flux<Buffer> handlePayload() {
return sink.asFlux();
}
@Override
public void reset() {
this.result.clear();
complete();
}
@Override
public void close() {
sink.tryEmitComplete();
CURRENT_PIPE.set(this, 0);
this.result.clear();
}
}