package org.jetlinks.community.device.service;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.entity.DeviceProductEntity;
import org.jetlinks.community.device.entity.DeviceTagEntity;
import org.jetlinks.community.device.enums.DeviceState;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DerivedMetadataMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceRegisterMessage;
import org.jetlinks.core.message.DeviceUnRegisterMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.UpdateTagMessage;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/jetlinks/community/device/service/DeviceMessageBusinessHandler.class */
public class DeviceMessageBusinessHandler {
    private static final Logger log = LoggerFactory.getLogger(DeviceMessageBusinessHandler.class);
    private final LocalDeviceInstanceService deviceService;
    private final LocalDeviceProductService productService;
    private final DeviceRegistry registry;
    private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
    private final EventBus eventBus;
    private final Disposable.Composite disposable = Disposables.composite();

    /* loaded from: input_file:org/jetlinks/community/device/service/DeviceMessageBusinessHandler$StateBuf.class */
    public static class StateBuf implements Externalizable {
        static long expires = Duration.ofHours(1).toMillis();
        private String id;
        private long time;

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            objectOutput.writeUTF(this.id);
            objectOutput.writeLong(this.time);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException {
            this.id = objectInput.readUTF();
            this.time = objectInput.readLong();
        }

        public boolean isEffective() {
            return System.currentTimeMillis() - this.time < expires;
        }

        public StateBuf(String str, long j) {
            this.id = str;
            this.time = j;
        }

        public StateBuf() {
        }

        public String getId() {
            return this.id;
        }

        public long getTime() {
            return this.time;
        }

        public void setId(String str) {
            this.id = str;
        }

        public void setTime(long j) {
            this.time = j;
        }
    }

    private Mono<DeviceOperator> doAutoRegister(DeviceRegisterMessage deviceRegisterMessage) {
        Mono justOrEmpty = Mono.justOrEmpty(deviceRegisterMessage.getDeviceId());
        Mono map = Mono.justOrEmpty(deviceRegisterMessage.getHeader("deviceName")).map(String::valueOf);
        Mono justOrEmpty2 = Mono.justOrEmpty(deviceRegisterMessage.getHeader("productId").map(String::valueOf));
        Mono justOrEmpty3 = Mono.justOrEmpty(deviceRegisterMessage.getHeader("productId").map(String::valueOf));
        LocalDeviceProductService localDeviceProductService = this.productService;
        localDeviceProductService.getClass();
        Mono flatMap = justOrEmpty3.flatMap((v1) -> {
            return r4.findById(v1);
        });
        Optional header = deviceRegisterMessage.getHeader("configuration");
        Class<Map> cls = Map.class;
        Map.class.getClass();
        return Mono.zip(justOrEmpty, map, justOrEmpty2, flatMap, Mono.justOrEmpty(header.map(cls::cast).orElse(new HashMap()))).flatMap(tuple5 -> {
            DeviceInstanceEntity deviceInstanceEntity = new DeviceInstanceEntity();
            deviceInstanceEntity.setId(tuple5.getT1());
            deviceInstanceEntity.setName((String) tuple5.getT2());
            deviceInstanceEntity.setProductId((String) tuple5.getT3());
            deviceInstanceEntity.setProductName(((DeviceProductEntity) tuple5.getT4()).getName());
            deviceInstanceEntity.setConfiguration((Map) tuple5.getT5());
            deviceInstanceEntity.setRegistryTime(Long.valueOf(deviceRegisterMessage.getTimestamp()));
            deviceInstanceEntity.setCreateTimeNow();
            deviceInstanceEntity.setCreatorId(((DeviceProductEntity) tuple5.getT4()).getCreatorId());
            deviceInstanceEntity.setOrgId(((DeviceProductEntity) tuple5.getT4()).getOrgId());
            boolean castBoolean = CastUtils.castBoolean(((Map) tuple5.getT5()).getOrDefault(DeviceConfigKey.selfManageState.getKey(), false));
            deviceInstanceEntity.setState(castBoolean ? DeviceState.offline : DeviceState.online);
            deviceInstanceEntity.mergeConfiguration((Map) tuple5.getT5());
            return this.deviceService.save(deviceInstanceEntity).then(Mono.defer(() -> {
                return this.registry.register(deviceInstanceEntity.toDeviceInfo().addConfig("state", Byte.valueOf(castBoolean ? (byte) -1 : (byte) 1)));
            }));
        });
    }

    @Subscribe({"/device/*/*/register"})
    @Transactional(propagation = Propagation.NEVER)
    public Mono<Void> autoRegisterDevice(DeviceRegisterMessage deviceRegisterMessage) {
        return ((Boolean) deviceRegisterMessage.getHeader(Headers.force).orElse(false)).booleanValue() ? doAutoRegister(deviceRegisterMessage).then() : this.registry.getDevice(deviceRegisterMessage.getDeviceId()).flatMap(deviceOperator -> {
            Optional header = deviceRegisterMessage.getHeader("configuration");
            Class<Map> cls = Map.class;
            Map.class.getClass();
            Map<String, Object> map = (Map) header.map(cls::cast).orElse(null);
            return MapUtils.isNotEmpty(map) ? this.deviceService.mergeConfiguration(deviceOperator.getDeviceId(), map, reactiveUpdate -> {
                return reactiveUpdate.set((v0) -> {
                    return v0.getName();
                }, deviceRegisterMessage.getHeader(PropertyConstants.deviceName).orElse(null));
            }).thenReturn(deviceOperator) : Mono.just(deviceOperator);
        }).switchIfEmpty(Mono.defer(() -> {
            return doAutoRegister(deviceRegisterMessage);
        })).then();
    }

    @Subscribe({"/device/*/*/message/children/*/register"})
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Mono<Void> autoBindChildrenDevice(ChildDeviceMessage childDeviceMessage) {
        DeviceRegisterMessage childDeviceMessage2 = childDeviceMessage.getChildDeviceMessage();
        if (!(childDeviceMessage2 instanceof DeviceRegisterMessage)) {
            return Mono.empty();
        }
        String deviceId = childDeviceMessage2.getDeviceId();
        if (childDeviceMessage.getDeviceId().equals(deviceId)) {
            log.warn("子设备注册消息循环依赖:{}", childDeviceMessage);
            return Mono.empty();
        }
        childDeviceMessage2.addHeaderIfAbsent(DeviceConfigKey.parentGatewayId.getKey(), childDeviceMessage.getDeviceId());
        return this.registry.getDevice(deviceId).map(deviceOperator -> {
            return deviceOperator.getState().flatMap(b -> {
                return this.deviceService.createUpdate().set((v0) -> {
                    return v0.getParentId();
                }, childDeviceMessage.getDeviceId()).set((v0) -> {
                    return v0.getState();
                }, DeviceState.of(b.byteValue())).where((v0) -> {
                    return v0.m20getId();
                }, deviceId).execute();
            }).then(deviceOperator.setConfig(DeviceConfigKey.parentGatewayId, childDeviceMessage.getDeviceId())).thenReturn(deviceOperator);
        }).defaultIfEmpty(Mono.defer(() -> {
            return doAutoRegister((DeviceRegisterMessage) childDeviceMessage2);
        })).flatMap(Function.identity()).then();
    }

    @Subscribe({"/device/*/*/message/children/*/unregister"})
    public Mono<Void> autoUnbindChildrenDevice(ChildDeviceMessage childDeviceMessage) {
        DeviceUnRegisterMessage childDeviceMessage2 = childDeviceMessage.getChildDeviceMessage();
        if (!(childDeviceMessage2 instanceof DeviceUnRegisterMessage)) {
            return Mono.empty();
        }
        String deviceId = childDeviceMessage2.getDeviceId();
        return this.registry.getDevice(deviceId).flatMap(deviceOperator -> {
            return deviceOperator.removeConfig(DeviceConfigKey.parentGatewayId.getKey()).then(deviceOperator.checkState());
        }).flatMap(b -> {
            return this.deviceService.createUpdate().setNull((v0) -> {
                return v0.getParentId();
            }).set((v0) -> {
                return v0.getState();
            }, DeviceState.of(b.byteValue())).where((v0) -> {
                return v0.m20getId();
            }, deviceId).execute().then();
        });
    }

    @Subscribe({"/device/*/*/unregister"})
    @Transactional(propagation = Propagation.NEVER)
    public Mono<Void> unRegisterDevice(DeviceUnRegisterMessage deviceUnRegisterMessage) {
        return this.deviceService.unregisterDevice(deviceUnRegisterMessage.getDeviceId()).then();
    }

    @Subscribe({"/device/*/*/message/tags/update"})
    public Mono<Void> updateDeviceTag(UpdateTagMessage updateTagMessage) {
        Map tags = updateTagMessage.getTags();
        String deviceId = updateTagMessage.getDeviceId();
        Flux flatMapMany = this.registry.getDevice(deviceId).flatMap((v0) -> {
            return v0.getMetadata();
        }).flatMapMany(deviceMetadata -> {
            return Flux.fromIterable(tags.entrySet()).map(entry -> {
                DeviceTagEntity deviceTagEntity = (DeviceTagEntity) deviceMetadata.getTag((String) entry.getKey()).map(propertyMetadata -> {
                    return DeviceTagEntity.of(propertyMetadata, entry.getValue());
                }).orElseGet(() -> {
                    DeviceTagEntity deviceTagEntity2 = new DeviceTagEntity();
                    deviceTagEntity2.setKey((String) entry.getKey());
                    deviceTagEntity2.setType("string");
                    deviceTagEntity2.setName((String) entry.getKey());
                    deviceTagEntity2.setCreateTime(new Date());
                    deviceTagEntity2.setDescription("设备上报");
                    deviceTagEntity2.setValue(String.valueOf(entry.getValue()));
                    return deviceTagEntity2;
                });
                deviceTagEntity.setDeviceId(deviceId);
                deviceTagEntity.setId(DeviceTagEntity.createTagId(deviceId, deviceTagEntity.getKey()));
                return deviceTagEntity;
            });
        });
        ReactiveRepository<DeviceTagEntity, String> reactiveRepository = this.tagRepository;
        reactiveRepository.getClass();
        return ((Mono) flatMapMany.as((v1) -> {
            return r1.save(v1);
        })).then();
    }

    @Subscribe({"/device/*/*/metadata/derived"})
    public Mono<Void> updateMetadata(DerivedMetadataMessage derivedMetadataMessage) {
        if (derivedMetadataMessage.isAll()) {
            return updateMedata(derivedMetadataMessage.getDeviceId(), derivedMetadataMessage.getMetadata());
        }
        Mono zip = Mono.zip(this.registry.getDevice(derivedMetadataMessage.getDeviceId()).flatMap((v0) -> {
            return v0.getMetadata();
        }), JetLinksDeviceMetadataCodec.getInstance().decode(derivedMetadataMessage.getMetadata()), (v0, v1) -> {
            return v0.merge(v1);
        });
        JetLinksDeviceMetadataCodec jetLinksDeviceMetadataCodec = JetLinksDeviceMetadataCodec.getInstance();
        jetLinksDeviceMetadataCodec.getClass();
        return zip.flatMap(jetLinksDeviceMetadataCodec::encode).flatMap(str -> {
            return updateMedata(derivedMetadataMessage.getDeviceId(), str);
        });
    }

    private Mono<Void> updateMedata(String str, String str2) {
        return this.deviceService.createUpdate().set((v0) -> {
            return v0.getDeriveMetadata();
        }, str2).where((v0) -> {
            return v0.m20getId();
        }, str).execute().then(this.registry.getDevice(str)).flatMap(deviceOperator -> {
            return deviceOperator.updateMetadata(str2);
        }).then();
    }

    @PreDestroy
    public void shutdown() {
        this.disposable.dispose();
    }

    @PostConstruct
    public void init() {
        Subscription build = Subscription.builder().subscriberId("device-state-synchronizer").topics(new String[]{"/device/*/*/online", "/device/*/*/offline"}).justLocal().build();
        PersistenceBuffer bufferSize = new PersistenceBuffer("./data/device-state-buffer", "device-state.queue", StateBuf::new, flux -> {
            return this.deviceService.syncStateBatch(flux.filter((v0) -> {
                return v0.isEffective();
            }).map((v0) -> {
                return v0.getId();
            }).distinct().collectList().flux(), false).then(Reactors.ALWAYS_FALSE);
        }).name("device-state-synchronizer").parallelism(1).bufferTimeout(Duration.ofSeconds(1L)).retryWhenError(th -> {
            return ErrorUtils.hasException(th, new Class[]{IOException.class, QueryTimeoutException.class});
        }).bufferSize(1000);
        bufferSize.start();
        this.disposable.add(this.eventBus.subscribe(build, DeviceMessage.class).subscribe(deviceMessage -> {
            bufferSize.write(new StateBuf(deviceMessage.getDeviceId(), deviceMessage.getTimestamp()));
        }));
        this.disposable.add(bufferSize);
    }

    public DeviceMessageBusinessHandler(LocalDeviceInstanceService localDeviceInstanceService, LocalDeviceProductService localDeviceProductService, DeviceRegistry deviceRegistry, ReactiveRepository<DeviceTagEntity, String> reactiveRepository, EventBus eventBus) {
        this.deviceService = localDeviceInstanceService;
        this.productService = localDeviceProductService;
        this.registry = deviceRegistry;
        this.tagRepository = reactiveRepository;
        this.eventBus = eventBus;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1041904421:
                if (implMethodName.equals("getParentId")) {
                    z = 4;
                    break;
                }
                break;
            case -1011424148:
                if (implMethodName.equals("getDeriveMetadata")) {
                    z = 3;
                    break;
                }
                break;
            case -75308287:
                if (implMethodName.equals("getName")) {
                    z = false;
                    break;
                }
                break;
            case 98245393:
                if (implMethodName.equals("getId")) {
                    z = 2;
                    break;
                }
                break;
            case 1965583067:
                if (implMethodName.equals("getState")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getName();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Lorg/jetlinks/community/device/enums/DeviceState;")) {
                    return (v0) -> {
                        return v0.getState();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Lorg/jetlinks/community/device/enums/DeviceState;")) {
                    return (v0) -> {
                        return v0.getState();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.m20getId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.m20getId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.m20getId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getDeriveMetadata();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getParentId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/device/entity/DeviceInstanceEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getParentId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
