package org.jetlinks.community.configure.device;

import java.io.File;
import java.time.Duration;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.MVStoreException;
import org.jetlinks.community.configure.cluster.Cluster;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionEvent;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.supports.device.session.AbstractDeviceSessionManager;
import org.jetlinks.supports.device.session.ClusterDeviceSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.util.Lazy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.class */
public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager implements CommandLineRunner, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(PersistenceDeviceSessionManager.class);
    private Supplier<DeviceRegistry> registry;
    private MVMap<String, PersistentSessionEntity> repository;
    private String filePath;
    private Duration flushInterval;

    public PersistenceDeviceSessionManager(RpcManager rpcManager) {
        super(rpcManager);
        this.flushInterval = Duration.ofMinutes(10L);
    }

    static MVMap<String, PersistentSessionEntity> initStore(String str) {
        File file = new File(str);
        if (!file.getParentFile().exists()) {
            file.getParentFile().mkdirs();
        }
        Supplier supplier = () -> {
            return new MVStore.Builder().fileName(str).cacheSize(1).open().openMap("device-session");
        };
        try {
            return (MVMap) supplier.get();
        } catch (MVStoreException e) {
            log.warn("load session from {} error,delete it and init.", str, e);
            file.delete();
            return (MVMap) supplier.get();
        }
    }

    public void init() {
        super.init();
        if (this.filePath == null) {
            this.filePath = "./data/sessions-" + Cluster.id().replace(":", "_").replace("/", "");
        }
        this.repository = initStore(this.filePath);
        if (!this.flushInterval.isZero() && !this.flushInterval.isNegative()) {
            this.disposable.add(Flux.interval(this.flushInterval).onBackpressureDrop().concatMap(l -> {
                return (Mono) Flux.fromIterable(this.localSessions.values()).mapNotNull(deviceSessionRef -> {
                    if (deviceSessionRef.loaded == null || !deviceSessionRef.loaded.isWrapFrom(PersistentSession.class)) {
                        return null;
                    }
                    return deviceSessionRef.loaded.unwrap(PersistentSession.class);
                }).as(this::tryPersistent);
            }, 1).subscribe());
        }
        this.disposable.add(listenEvent(deviceSessionEvent -> {
            return (deviceSessionEvent.getType() == DeviceSessionEvent.Type.unregister && deviceSessionEvent.getSession().isWrapFrom(PersistentSession.class)) ? removePersistentSession((PersistentSession) deviceSessionEvent.getSession().unwrap(PersistentSession.class)) : Mono.empty();
        }));
    }

    public void shutdown() {
        super.shutdown();
        ((Mono) Flux.fromIterable(this.localSessions.values()).filter(deviceSessionRef -> {
            return deviceSessionRef.loaded != null;
        }).filter(deviceSessionRef2 -> {
            return deviceSessionRef2.loaded.isWrapFrom(PersistentSession.class);
        }).map(deviceSessionRef3 -> {
            return deviceSessionRef3.loaded.unwrap(PersistentSession.class);
        }).as(this::tryPersistent)).block();
        this.repository.store.compactMoveChunks();
        this.repository.store.close();
    }

    protected Mono<DeviceSession> handleSessionCompute(DeviceSession deviceSession, DeviceSession deviceSession2) {
        return deviceSession == deviceSession2 ? Mono.just(deviceSession2) : ((deviceSession == null || !deviceSession.isWrapFrom(PersistentSession.class)) && deviceSession2.isWrapFrom(PersistentSession.class)) ? tryPersistent(Flux.just(deviceSession2.unwrap(PersistentSession.class))).thenReturn(deviceSession2) : super.handleSessionCompute(deviceSession, deviceSession2);
    }

    Mono<Void> tryPersistent(Flux<PersistentSession> flux) {
        return flux.flatMap(persistentSession -> {
            return PersistentSessionEntity.from(getCurrentServerId(), persistentSession, this.registry.get());
        }).distinct((v0) -> {
            return v0.getId();
        }).doOnNext(persistentSessionEntity -> {
            log.debug("persistent device[{}] session", persistentSessionEntity.getDeviceId());
            this.repository.put(persistentSessionEntity.getDeviceId(), persistentSessionEntity);
        }).onErrorResume(th -> {
            log.warn("persistent session error", th);
            return Mono.empty();
        }).then();
    }

    Mono<Void> resumeSession(PersistentSessionEntity persistentSessionEntity) {
        return persistentSessionEntity.toSession(this.registry.get()).doOnNext(persistentSession -> {
            log.debug("resume session[{}]", persistentSession.getDeviceId());
            this.localSessions.putIfAbsent(persistentSession.getDeviceId(), new AbstractDeviceSessionManager.DeviceSessionRef(persistentSession.getDeviceId(), this, persistentSession));
        }).onErrorResume(th -> {
            log.debug("resume session[{}] error", persistentSessionEntity.getDeviceId(), th);
            return Mono.empty();
        }).then();
    }

    Mono<Void> removePersistentSession(PersistentSession persistentSession) {
        this.repository.remove(persistentSession.getId());
        return Mono.empty();
    }

    public void run(String... strArr) throws Exception {
        Flux.fromIterable(this.repository.values()).flatMap(this::resumeSession).subscribe();
    }

    public void setApplicationContext(@Nonnull ApplicationContext applicationContext) throws BeansException {
        this.registry = Lazy.of(() -> {
            return (DeviceRegistry) applicationContext.getBean(DeviceRegistry.class);
        });
    }

    public String getFilePath() {
        return this.filePath;
    }

    public void setFilePath(String str) {
        this.filePath = str;
    }

    public Duration getFlushInterval() {
        return this.flushInterval;
    }

    public void setFlushInterval(Duration duration) {
        this.flushInterval = duration;
    }
}
