package org.jetlinks.community.configure.cluster;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.net.Address;
import io.scalecube.services.transport.rsocket.RSocketClientTransportFactory;
import io.scalecube.services.transport.rsocket.RSocketServerTransportFactory;
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.supports.cluster.redis.RedisClusterManager;
import org.jetlinks.supports.config.EventBusStorageManager;
import org.jetlinks.supports.event.BrokerEventBus;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.scalecube.ExtendedClusterImpl;
import org.jetlinks.supports.scalecube.rpc.ScalecubeRpcManager;
import org.nustaq.serialization.FSTConfiguration;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ResourceLoader;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@EnableConfigurationProperties({ClusterProperties.class})
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ExtendedCluster.class})
/* loaded from: input_file:org/jetlinks/community/configure/cluster/ClusterConfiguration.class */
public class ClusterConfiguration {
    @Bean
    public ExtendedClusterImpl cluster(ClusterProperties clusterProperties, ResourceLoader resourceLoader) {
        FSTMessageCodec fSTMessageCodec = new FSTMessageCodec((Supplier<FSTConfiguration>) () -> {
            FSTConfiguration forceSerializable = FSTConfiguration.createDefaultConfiguration().setForceSerializable(true);
            forceSerializable.setClassLoader(resourceLoader.getClassLoader());
            return forceSerializable;
        });
        ExtendedClusterImpl extendedClusterImpl = new ExtendedClusterImpl(new ClusterConfig().transport(transportConfig -> {
            return transportConfig.port(clusterProperties.getPort()).messageCodec(fSTMessageCodec).transportFactory(new TcpTransportFactory());
        }).memberAlias(clusterProperties.getId()).externalHost(clusterProperties.getExternalHost()).externalPort(clusterProperties.getExternalPort()).membership(membershipConfig -> {
            return membershipConfig.seedMembers((List) clusterProperties.getSeeds().stream().map(Address::from).collect(Collectors.toList()));
        }));
        extendedClusterImpl.startAwait();
        return extendedClusterImpl;
    }

    @Bean
    public BrokerEventBus eventBus(ObjectProvider<EventBroker> objectProvider, ObjectProvider<Scheduler> objectProvider2) {
        BrokerEventBus brokerEventBus = new BrokerEventBus();
        brokerEventBus.setPublishScheduler((Scheduler) objectProvider2.getIfAvailable(Schedulers::parallel));
        Iterator it = objectProvider.iterator();
        while (it.hasNext()) {
            brokerEventBus.addBroker((EventBroker) it.next());
        }
        return brokerEventBus;
    }

    @Bean
    public EventBusStorageManager eventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
        return new EventBusStorageManager(clusterManager, eventBus, -1L);
    }

    @Bean(initMethod = "startup")
    public RedisClusterManager clusterManager(ClusterProperties clusterProperties, ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate) {
        return new RedisClusterManager(clusterProperties.getName(), clusterProperties.getId(), reactiveRedisTemplate);
    }

    @Bean(initMethod = "startAwait", destroyMethod = "stopAwait")
    public ScalecubeRpcManager rpcManager(ExtendedCluster extendedCluster, ClusterProperties clusterProperties) {
        return new ScalecubeRpcManager(extendedCluster, () -> {
            return new RSocketServiceTransport().serverTransportFactory(RSocketServerTransportFactory.tcp(clusterProperties.getRpcPort())).clientTransportFactory(RSocketClientTransportFactory.tcp());
        }).externalHost(clusterProperties.getRpcExternalHost()).externalPort(clusterProperties.getRpcExternalPort());
    }
}
