/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.device.service;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.entity.DeviceProductEntity;
import org.jetlinks.community.device.entity.DeviceTagEntity;
import org.jetlinks.community.device.enums.DeviceState;
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
import org.jetlinks.community.device.service.LocalDeviceProductService;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DerivedMetadataMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceRegisterMessage;
import org.jetlinks.core.message.DeviceUnRegisterMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.UpdateTagMessage;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class DeviceMessageBusinessHandler {
    private static final Logger log = LoggerFactory.getLogger(DeviceMessageBusinessHandler.class);
    private final LocalDeviceInstanceService deviceService;
    private final LocalDeviceProductService productService;
    private final DeviceRegistry registry;
    private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
    private final EventBus eventBus;
    private final Disposable.Composite disposable = Disposables.composite();

    private Mono<DeviceOperator> doAutoRegister(DeviceRegisterMessage message) {
        return Mono.zip((Mono)Mono.justOrEmpty((Object)message.getDeviceId()), (Mono)Mono.justOrEmpty((Optional)message.getHeader("deviceName")).map(String::valueOf), (Mono)Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf)), (Mono)Mono.justOrEmpty(message.getHeader("productId").map(String::valueOf)).flatMap(arg_0 -> ((LocalDeviceProductService)this.productService).findById(arg_0)), (Mono)Mono.justOrEmpty(message.getHeader("configuration").map(Map.class::cast).orElse(new HashMap()))).flatMap(tps -> {
            DeviceInstanceEntity instance = new DeviceInstanceEntity();
            instance.setId(tps.getT1());
            instance.setName((String)tps.getT2());
            instance.setProductId((String)tps.getT3());
            instance.setProductName(((DeviceProductEntity)((Object)((Object)tps.getT4()))).getName());
            instance.setConfiguration((Map)tps.getT5());
            instance.setRegistryTime(message.getTimestamp());
            instance.setCreateTimeNow();
            instance.setCreatorId(((DeviceProductEntity)((Object)((Object)tps.getT4()))).getCreatorId());
            instance.setOrgId(((DeviceProductEntity)((Object)((Object)tps.getT4()))).getOrgId());
            boolean selfManageState = CastUtils.castBoolean((Object)((Map)tps.getT5()).getOrDefault(DeviceConfigKey.selfManageState.getKey(), false));
            instance.setState(selfManageState ? DeviceState.offline : DeviceState.online);
            instance.mergeConfiguration((Map)tps.getT5());
            return this.deviceService.save((Object)instance).then(Mono.defer(() -> this.registry.register(instance.toDeviceInfo().addConfig("state", (Object)(selfManageState ? (byte)-1 : (byte)1)))));
        });
    }

    @Subscribe(value={"/device/*/*/register"})
    @Transactional(propagation=Propagation.NEVER)
    public Mono<Void> autoRegisterDevice(DeviceRegisterMessage message) {
        if (message.getHeader(Headers.force).orElse(false).booleanValue()) {
            return this.doAutoRegister(message).then();
        }
        return this.registry.getDevice(message.getDeviceId()).flatMap(device -> {
            Map config = message.getHeader("configuration").map(Map.class::cast).orElse(null);
            if (MapUtils.isNotEmpty((Map)config)) {
                return this.deviceService.mergeConfiguration(device.getDeviceId(), config, update -> (ReactiveUpdate)update.set(DeviceInstanceEntity::getName, message.getHeader((HeaderKey)PropertyConstants.deviceName).orElse(null))).thenReturn(device);
            }
            return Mono.just((Object)device);
        }).switchIfEmpty(Mono.defer(() -> this.doAutoRegister(message))).then();
    }

    @Subscribe(value={"/device/*/*/message/children/*/register"})
    @Transactional(propagation=Propagation.REQUIRES_NEW)
    public Mono<Void> autoBindChildrenDevice(ChildDeviceMessage message) {
        Message childMessage = message.getChildDeviceMessage();
        if (childMessage instanceof DeviceRegisterMessage) {
            String childId = ((DeviceRegisterMessage)childMessage).getDeviceId();
            if (message.getDeviceId().equals(childId)) {
                log.warn("\u5b50\u8bbe\u5907\u6ce8\u518c\u6d88\u606f\u5faa\u73af\u4f9d\u8d56:{}", (Object)message);
                return Mono.empty();
            }
            childMessage.addHeaderIfAbsent(DeviceConfigKey.parentGatewayId.getKey(), (Object)message.getDeviceId());
            return this.registry.getDevice(childId).map(device -> device.getState().flatMap(state -> ((ReactiveUpdate)((ReactiveUpdate)((ReactiveUpdate)this.deviceService.createUpdate().set(DeviceInstanceEntity::getParentId, (Object)message.getDeviceId())).set(DeviceInstanceEntity::getState, (Object)DeviceState.of(state))).where(DeviceInstanceEntity::getId, (Object)childId)).execute()).then(device.setConfig((ConfigKey)DeviceConfigKey.parentGatewayId, (Object)message.getDeviceId())).thenReturn(device)).defaultIfEmpty((Object)Mono.defer(() -> this.doAutoRegister((DeviceRegisterMessage)childMessage))).flatMap(Function.identity()).then();
        }
        return Mono.empty();
    }

    @Subscribe(value={"/device/*/*/message/children/*/unregister"})
    public Mono<Void> autoUnbindChildrenDevice(ChildDeviceMessage message) {
        Message childMessage = message.getChildDeviceMessage();
        if (childMessage instanceof DeviceUnRegisterMessage) {
            String childId = ((DeviceUnRegisterMessage)childMessage).getDeviceId();
            return this.registry.getDevice(childId).flatMap(dev -> dev.removeConfig(DeviceConfigKey.parentGatewayId.getKey()).then(dev.checkState())).flatMap(state -> ((ReactiveUpdate)((ReactiveUpdate)((ReactiveUpdate)this.deviceService.createUpdate().setNull(DeviceInstanceEntity::getParentId)).set(DeviceInstanceEntity::getState, (Object)DeviceState.of(state))).where(DeviceInstanceEntity::getId, (Object)childId)).execute().then());
        }
        return Mono.empty();
    }

    @Subscribe(value={"/device/*/*/unregister"})
    @Transactional(propagation=Propagation.NEVER)
    public Mono<Void> unRegisterDevice(DeviceUnRegisterMessage message) {
        return this.deviceService.unregisterDevice(message.getDeviceId()).then();
    }

    @Subscribe(value={"/device/*/*/message/tags/update"})
    public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
        Map tags = message.getTags();
        String deviceId = message.getDeviceId();
        return ((Mono)this.registry.getDevice(deviceId).flatMap(DeviceOperator::getMetadata).flatMapMany(metadata -> Flux.fromIterable(tags.entrySet()).map(e -> {
            DeviceTagEntity tagEntity = metadata.getTag((String)e.getKey()).map(tagMeta -> DeviceTagEntity.of(tagMeta, e.getValue())).orElseGet(() -> {
                DeviceTagEntity entity = new DeviceTagEntity();
                entity.setKey((String)e.getKey());
                entity.setType("string");
                entity.setName((String)e.getKey());
                entity.setCreateTime(new Date());
                entity.setDescription("\u8bbe\u5907\u4e0a\u62a5");
                entity.setValue(String.valueOf(e.getValue()));
                return entity;
            });
            tagEntity.setDeviceId(deviceId);
            tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
            return tagEntity;
        })).as(arg_0 -> this.tagRepository.save(arg_0))).then();
    }

    @Subscribe(value={"/device/*/*/metadata/derived"})
    public Mono<Void> updateMetadata(DerivedMetadataMessage message) {
        if (message.isAll()) {
            return this.updateMedata(message.getDeviceId(), message.getMetadata());
        }
        return Mono.zip((Mono)this.registry.getDevice(message.getDeviceId()).flatMap(DeviceOperator::getMetadata), (Mono)JetLinksDeviceMetadataCodec.getInstance().decode(message.getMetadata()), DeviceMetadata::merge).flatMap(arg_0 -> ((JetLinksDeviceMetadataCodec)JetLinksDeviceMetadataCodec.getInstance()).encode(arg_0)).flatMap(metadata -> this.updateMedata(message.getDeviceId(), (String)metadata));
    }

    private Mono<Void> updateMedata(String deviceId, String metadata) {
        return ((ReactiveUpdate)((ReactiveUpdate)this.deviceService.createUpdate().set(DeviceInstanceEntity::getDeriveMetadata, (Object)metadata)).where(DeviceInstanceEntity::getId, (Object)deviceId)).execute().then(this.registry.getDevice(deviceId)).flatMap(device -> device.updateMetadata(metadata)).then();
    }

    @PreDestroy
    public void shutdown() {
        this.disposable.dispose();
    }

    @PostConstruct
    public void init() {
        Subscription subscription = Subscription.builder().subscriberId("device-state-synchronizer").topics(new String[]{"/device/*/*/online", "/device/*/*/offline"}).justLocal().build();
        PersistenceBuffer buffer = new PersistenceBuffer("./data/device-state-buffer", "device-state.queue", StateBuf::new, flux -> this.deviceService.syncStateBatch((Flux<List<String>>)flux.filter(StateBuf::isEffective).map(StateBuf::getId).distinct().collectList().flux(), false).then(Reactors.ALWAYS_FALSE)).name("device-state-synchronizer").parallelism(1).bufferTimeout(Duration.ofSeconds(1L)).retryWhenError(e -> ErrorUtils.hasException((Throwable)e, (Class[])new Class[]{IOException.class, QueryTimeoutException.class})).bufferSize(1000);
        buffer.start();
        this.disposable.add(this.eventBus.subscribe(subscription, DeviceMessage.class).subscribe(msg -> buffer.write((Serializable)new StateBuf(msg.getDeviceId(), msg.getTimestamp()))));
        this.disposable.add((Disposable)buffer);
    }

    public DeviceMessageBusinessHandler(LocalDeviceInstanceService deviceService, LocalDeviceProductService productService, DeviceRegistry registry, ReactiveRepository<DeviceTagEntity, String> tagRepository, EventBus eventBus) {
        this.deviceService = deviceService;
        this.productService = productService;
        this.registry = registry;
        this.tagRepository = tagRepository;
        this.eventBus = eventBus;
    }

    public static class StateBuf
    implements Externalizable {
        static long expires = Duration.ofHours(1L).toMillis();
        private String id;
        private long time;

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeUTF(this.id);
            out.writeLong(this.time);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException {
            this.id = in.readUTF();
            this.time = in.readLong();
        }

        public boolean isEffective() {
            return System.currentTimeMillis() - this.time < expires;
        }

        public StateBuf(String id, long time) {
            this.id = id;
            this.time = time;
        }

        public StateBuf() {
        }

        public String getId() {
            return this.id;
        }

        public long getTime() {
            return this.time;
        }

        public void setId(String id) {
            this.id = id;
        }

        public void setTime(long time) {
            this.time = time;
        }
    }
}

