/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.defaults;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.defaults.DefaultFunctionInvokeMessageSender;
import org.jetlinks.core.defaults.DefaultReadPropertyMessageSender;
import org.jetlinks.core.defaults.DefaultWritePropertyMessageSender;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceMessageSender;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.FunctionInvokeMessageSender;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.ReadPropertyMessageSender;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.WritePropertyMessageSender;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultDeviceMessageSender
implements DeviceMessageSender {
    private static final Logger log = LoggerFactory.getLogger(DefaultDeviceMessageSender.class);
    private final DeviceOperationBroker handler;
    private final DeviceOperator operator;
    private final DeviceRegistry registry;
    private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(Integer.getInteger("jetlinks.device.message.default-timeout", 10).intValue());
    private long defaultTimeout = DEFAULT_TIMEOUT;
    private final DeviceMessageSenderInterceptor globalInterceptor;

    public DefaultDeviceMessageSender(DeviceOperationBroker handler, DeviceOperator operator, DeviceRegistry registry, DeviceMessageSenderInterceptor interceptor) {
        this.handler = handler;
        this.operator = operator;
        this.registry = registry;
        this.globalInterceptor = interceptor;
    }

    @Override
    public <R extends DeviceMessageReply> Flux<R> send(Publisher<RepayableDeviceMessage<R>> message) {
        return this.send(message, this::convertReply);
    }

    protected <T extends DeviceMessageReply> T convertReply(Message sent, Object reply) {
        if (reply instanceof ChildDeviceMessageReply && !(sent instanceof ChildDeviceMessage)) {
            ChildDeviceMessageReply messageReply = (ChildDeviceMessageReply)reply;
            if (!messageReply.isSuccess()) {
                ErrorCode.of(messageReply.getCode()).map(DeviceOperationException::new).ifPresent(err -> {
                    throw err;
                });
            }
            if (messageReply.getChildDeviceMessage() == null) {
                ErrorCode.of(messageReply.getCode()).map(DeviceOperationException::new).ifPresent(err -> {
                    throw err;
                });
                throw new DeviceOperationException(ErrorCode.NO_REPLY);
            }
            return (T)((DeviceMessageReply)this.convertReply(((ChildDeviceMessageReply)reply).getChildDeviceMessage()));
        }
        return (T)((DeviceMessageReply)this.convertReply(reply));
    }

    protected <T extends DeviceMessage> T convertReply(Object obj) {
        DeviceMessage result = null;
        if (obj instanceof DeviceMessageReply) {
            DeviceMessageReply reply = (DeviceMessageReply)obj;
            if (!reply.isSuccess()) {
                ErrorCode.of(reply.getCode()).map(code -> {
                    String msg = reply.getHeader("errorMessage").map(String::valueOf).orElse(code.getText());
                    return new DeviceOperationException((ErrorCode)((Object)code), msg);
                }).ifPresent(err -> {
                    throw err;
                });
            }
            result = reply;
        } else if (obj instanceof DeviceMessage) {
            result = (DeviceMessage)obj;
        } else if (obj instanceof Map) {
            result = MessageType.convertMessage((Map)obj).orElse(null);
        }
        if (result == null) {
            throw new DeviceOperationException(ErrorCode.SYSTEM_ERROR, (Throwable)new ClassCastException("can not cast " + obj + " to DeviceMessageReply"));
        }
        return (T)result;
    }

    private <R extends DeviceMessage> Flux<R> logReply(DeviceMessage msg, Flux<R> flux) {
        if (log.isDebugEnabled()) {
            return flux.doOnNext(r -> log.debug("receive device[{}] message[{}]: {}", this.operator.getDeviceId(), r.getMessageId(), r)).doOnComplete(() -> log.debug("complete receive device[{}] message[{}]", (Object)this.operator.getDeviceId(), (Object)msg.getMessageId())).doOnCancel(() -> log.debug("cancel receive device[{}] message[{}]", (Object)this.operator.getDeviceId(), (Object)msg.getMessageId()));
        }
        return flux;
    }

    @Override
    public <R extends DeviceMessage> Flux<R> send(DeviceMessage message) {
        return this.send(Mono.just(message), this::convertReply);
    }

    private Mono<String> refreshAndGetConnectionServerId() {
        return Mono.defer(() -> this.operator.refreshConfig(Collections.singleton(DeviceConfigKey.connectionServerId.getKey())).then(this.operator.getConnectionServerId()));
    }

    private ChildDeviceMessage createChildDeviceMessage(String parentId, DeviceMessage message) {
        ChildDeviceMessage children = new ChildDeviceMessage();
        children.setDeviceId(parentId);
        children.setMessageId(message.getMessageId());
        children.setTimestamp(message.getTimestamp());
        children.setChildDeviceId(this.operator.getDeviceId());
        children.setChildDeviceMessage(message);
        Headers.copyFunctionalHeader(message, children);
        message.addHeader((HeaderKey)Headers.dispatchToParent, (Object)true);
        children.validate();
        return children;
    }

    private Flux<DeviceMessage> sendToParentDevice(String parentId, DeviceMessage message) {
        if (parentId.equals(this.operator.getDeviceId())) {
            return Flux.error(new DeviceOperationException(ErrorCode.CYCLIC_DEPENDENCE, "validation.parent_id_and_id_can_not_be_same"));
        }
        ChildDeviceMessage children = this.createChildDeviceMessage(parentId, message);
        return this.registry.getDevice(parentId).switchIfEmpty(Mono.error(() -> new DeviceOperationException(ErrorCode.UNKNOWN_PARENT_DEVICE))).flatMapMany(parent -> parent.messageSender().send(Mono.just(children), resp -> this.convertReply(message, resp)));
    }

    @Override
    public <R extends DeviceMessage> Flux<R> send(Publisher<? extends DeviceMessage> message, Function<Object, R> replyMapping) {
        return Mono.zip(this.operator.getConnectionServerId().switchIfEmpty(this.refreshAndGetConnectionServerId()).defaultIfEmpty(""), this.operator.getProtocol().flatMap(ProtocolSupport::getSenderInterceptor).defaultIfEmpty(DeviceMessageSenderInterceptor.DO_NOTING), this.operator.getSelfConfig(DeviceConfigKey.parentGatewayId).defaultIfEmpty("")).flatMapMany(serverAndInterceptor -> {
            DeviceMessageSenderInterceptor interceptor = ((DeviceMessageSenderInterceptor)serverAndInterceptor.getT2()).andThen(this.globalInterceptor);
            String server = (String)serverAndInterceptor.getT1();
            String parentGatewayId = (String)serverAndInterceptor.getT3();
            if (StringUtils.isEmpty(server) && StringUtils.hasText(parentGatewayId)) {
                return Flux.from(message).flatMap(msg -> interceptor.preSend(this.operator, (DeviceMessage)msg)).flatMap(msg -> this.sendToParentDevice(parentGatewayId, (DeviceMessage)msg).as(flux -> interceptor.afterSent(this.operator, (DeviceMessage)msg, interceptor.doSend(this.operator, (DeviceMessage)msg, (Flux<DeviceMessage>)flux)))).map(r -> r);
            }
            return Flux.from(message).flatMap(msg -> interceptor.preSend(this.operator, (DeviceMessage)msg)).concatMap(msg -> Flux.defer(() -> {
                if (StringUtils.isEmpty(server)) {
                    return interceptor.afterSent(this.operator, (DeviceMessage)msg, Flux.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                }
                boolean forget = msg.getHeader(Headers.sendAndForget).orElse(false);
                Flux replyStream = forget ? Flux.empty() : this.handler.handleReply(msg.getDeviceId(), msg.getMessageId(), Duration.ofMillis(msg.getHeader(Headers.timeout).orElse(this.defaultTimeout))).map(replyMapping).onErrorResume(DeviceOperationException.class, error -> {
                    if (error.getCode() == ErrorCode.CLIENT_OFFLINE) {
                        return this.operator.checkState().then(Mono.error(error));
                    }
                    return Mono.error(error);
                }).onErrorMap(TimeoutException.class, timeout -> new DeviceOperationException(ErrorCode.TIME_OUT, (Throwable)timeout)).as(flux -> this.logReply((DeviceMessage)msg, (Flux)flux));
                return this.handler.send(server, Mono.just(msg)).defaultIfEmpty(-1).flatMapMany(len -> {
                    if (len == 0) {
                        return this.operator.checkState().flatMapMany(state -> {
                            if (1 != state) {
                                return interceptor.afterSent(this.operator, (DeviceMessage)msg, Flux.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                            }
                            if (StringUtils.hasText(parentGatewayId)) {
                                log.debug("Device [{}] Cached Server [{}] Not Available,Dispatch To Parent [{}]", this.operator.getDeviceId(), server, parentGatewayId);
                                return interceptor.afterSent(this.operator, (DeviceMessage)msg, this.sendToParentDevice(parentGatewayId, (DeviceMessage)msg)).map(r -> r);
                            }
                            log.warn("Device [{}] Cached Server [{}] Not Available", (Object)this.operator.getDeviceId(), (Object)server);
                            return interceptor.afterSent(this.operator, (DeviceMessage)msg, Flux.error(new DeviceOperationException(ErrorCode.SERVER_NOT_AVAILABLE)));
                        });
                    }
                    if (len == -1) {
                        return interceptor.afterSent(this.operator, (DeviceMessage)msg, Flux.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                    }
                    log.debug("send device[{}] message complete", (Object)this.operator.getDeviceId());
                    return interceptor.afterSent(this.operator, (DeviceMessage)msg, replyStream);
                });
            }).as(flux -> interceptor.doSend(this.operator, (DeviceMessage)msg, flux.cast(DeviceMessage.class)).map(_resp -> _resp)));
        });
    }

    @Override
    public FunctionInvokeMessageSender invokeFunction(String function) {
        return new DefaultFunctionInvokeMessageSender(this.operator, function);
    }

    @Override
    public ReadPropertyMessageSender readProperty(String ... property) {
        return new DefaultReadPropertyMessageSender(this.operator).read(property);
    }

    @Override
    public WritePropertyMessageSender writeProperty() {
        return new DefaultWritePropertyMessageSender(this.operator);
    }

    public void setDefaultTimeout(long defaultTimeout) {
        this.defaultTimeout = defaultTimeout;
    }

    public long getDefaultTimeout() {
        return this.defaultTimeout;
    }
}

