/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.message.codec;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nonnull;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.MessageEncodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.message.interceptor.DeviceMessageCodecInterceptor;
import org.jetlinks.core.message.interceptor.DeviceMessageDecodeInterceptor;
import org.jetlinks.core.message.interceptor.DeviceMessageEncodeInterceptor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class InterceptorDeviceMessageCodec
implements DeviceMessageCodec {
    private final DeviceMessageCodec messageCodec;
    private final List<DeviceMessageDecodeInterceptor> decodeDeviceMessageInterceptors = new CopyOnWriteArrayList<DeviceMessageDecodeInterceptor>();
    private final List<DeviceMessageEncodeInterceptor> encodeDeviceMessageInterceptors = new CopyOnWriteArrayList<DeviceMessageEncodeInterceptor>();

    public InterceptorDeviceMessageCodec(DeviceMessageCodec codec) {
        this.messageCodec = codec;
    }

    @Override
    public Transport getSupportTransport() {
        return this.messageCodec.getSupportTransport();
    }

    public void register(DeviceMessageCodecInterceptor interceptor) {
        if (interceptor instanceof DeviceMessageDecodeInterceptor) {
            this.decodeDeviceMessageInterceptors.add((DeviceMessageDecodeInterceptor)interceptor);
        }
        if (interceptor instanceof DeviceMessageEncodeInterceptor) {
            this.encodeDeviceMessageInterceptors.add((DeviceMessageEncodeInterceptor)interceptor);
        }
    }

    @Nonnull
    public Flux<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
        return Flux.defer(() -> {
            Mono<Object> pre = Mono.empty();
            for (DeviceMessageEncodeInterceptor interceptor : this.encodeDeviceMessageInterceptors) {
                pre = pre.then(interceptor.preEncode(context));
            }
            Flux<EncodedMessage> message = Flux.from(this.messageCodec.encode(context));
            for (DeviceMessageEncodeInterceptor interceptor : this.encodeDeviceMessageInterceptors) {
                message = message.flatMap(msg -> interceptor.postEncode(context, (EncodedMessage)msg));
            }
            return pre.thenMany(message);
        });
    }

    @Nonnull
    public Flux<? extends Message> decode(@Nonnull MessageDecodeContext context) {
        return Flux.defer(() -> {
            Mono<Object> pre = Mono.empty();
            for (DeviceMessageDecodeInterceptor interceptor : this.decodeDeviceMessageInterceptors) {
                pre = pre.then(interceptor.preDecode(context));
            }
            Flux<Message> message = Flux.from(this.messageCodec.decode(context));
            for (DeviceMessageDecodeInterceptor interceptor : this.decodeDeviceMessageInterceptors) {
                message = message.flatMap(msg -> interceptor.postDecode(context, msg));
            }
            return pre.thenMany(message);
        });
    }
}

