package org.jetlinks.community.gateway.external.socket;

import com.alibaba.fastjson.JSON;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import org.hswebframework.web.authorization.Authentication;
import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
import org.hswebframework.web.authorization.token.UserTokenManager;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.external.Message;
import org.jetlinks.community.gateway.external.MessagingManager;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.jetlinks.community.gateway.external.socket.MessagingRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.class */
public class WebSocketMessagingHandler implements WebSocketHandler {
    private static final Logger log = LoggerFactory.getLogger(WebSocketMessagingHandler.class);
    private final MessagingManager messagingManager;
    private final UserTokenManager userTokenManager;
    private final ReactiveAuthenticationManager authenticationManager;

    @Nonnull
    public Mono<Void> handle(@Nonnull WebSocketSession webSocketSession) {
        String[] split = webSocketSession.getHandshakeInfo().getUri().getPath().split("[/]");
        if (split.length == 0) {
            return webSocketSession.send(Mono.just(webSocketSession.textMessage(JSON.toJSONString(Message.error("auth", (String) null, "错误的请求"))))).then(webSocketSession.close(CloseStatus.BAD_DATA));
        }
        String str = split[split.length - 1];
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Mono map = this.userTokenManager.getByToken(str).map((v0) -> {
            return v0.getUserId();
        });
        ReactiveAuthenticationManager reactiveAuthenticationManager = this.authenticationManager;
        reactiveAuthenticationManager.getClass();
        return map.flatMap(reactiveAuthenticationManager::getByUserId).switchIfEmpty(webSocketSession.send(Mono.just(webSocketSession.textMessage(JSON.toJSONString(Message.authError())))).then(webSocketSession.close(CloseStatus.BAD_DATA)).then(Mono.empty())).flatMap(authentication -> {
            return webSocketSession.receive().doOnNext(webSocketMessage -> {
                try {
                    if (webSocketMessage.getType() == WebSocketMessage.Type.PONG) {
                        return;
                    }
                    if (webSocketMessage.getType() == WebSocketMessage.Type.PING) {
                        webSocketSession.send(Mono.just(webSocketSession.pongMessage((v0) -> {
                            return v0.allocateBuffer();
                        }))).subscribe();
                        return;
                    }
                    MessagingRequest messagingRequest = (MessagingRequest) JSON.parseObject(webSocketMessage.getPayloadAsText(), MessagingRequest.class);
                    if (messagingRequest == null) {
                        return;
                    }
                    if (messagingRequest.getType() == MessagingRequest.Type.ping) {
                        webSocketSession.send(Mono.just(webSocketSession.textMessage(JSON.toJSONString(Message.pong(messagingRequest.getId()))))).subscribe();
                        return;
                    }
                    if (StringUtils.isEmpty(messagingRequest.getId())) {
                        webSocketSession.send(Mono.just(webSocketSession.textMessage(JSON.toJSONString(Message.error(messagingRequest.getType().name(), (String) null, "id不能为空"))))).subscribe();
                        return;
                    }
                    if (messagingRequest.getType() == MessagingRequest.Type.sub) {
                        Disposable disposable = (Disposable) concurrentHashMap.get(messagingRequest.getId());
                        if (disposable != null && !disposable.isDisposed()) {
                            return;
                        }
                        HashMap hashMap = new HashMap();
                        hashMap.put("userId", authentication.getUser().getId());
                        hashMap.put("userName", authentication.getUser().getName());
                        Flux doOnCancel = this.messagingManager.subscribe(SubscribeRequest.of(messagingRequest, authentication)).doOnEach(ReactiveLogger.onError(th -> {
                            log.error("{}", th.getMessage(), th);
                        })).onErrorResume(th2 -> {
                            return Mono.just(Message.error(messagingRequest.getId(), messagingRequest.getTopic(), th2));
                        }).map(message -> {
                            return webSocketSession.textMessage(JSON.toJSONString(message));
                        }).doOnComplete(() -> {
                            log.debug("complete subscription:{}", messagingRequest.getTopic());
                            concurrentHashMap.remove(messagingRequest.getId());
                            Mono just = Mono.just(webSocketSession.textMessage(JSON.toJSONString(Message.complete(messagingRequest.getId()))));
                            webSocketSession.getClass();
                            ((Mono) just.as((v1) -> {
                                return r1.send(v1);
                            })).subscribe();
                        }).doOnCancel(() -> {
                            log.debug("cancel subscription:{}", messagingRequest.getTopic());
                            concurrentHashMap.remove(messagingRequest.getId());
                        });
                        webSocketSession.getClass();
                        Disposable subscribe = doOnCancel.transform((v1) -> {
                            return r1.send(v1);
                        }).subscriberContext(ReactiveLogger.start(hashMap)).subscriberContext(Context.of(Authentication.class, authentication)).subscribe();
                        if (!subscribe.isDisposed()) {
                            concurrentHashMap.put(messagingRequest.getId(), subscribe);
                        }
                    } else if (messagingRequest.getType() == MessagingRequest.Type.unsub) {
                        Optional.ofNullable(concurrentHashMap.remove(messagingRequest.getId())).ifPresent((v0) -> {
                            v0.dispose();
                        });
                    } else {
                        webSocketSession.send(Mono.just(webSocketSession.textMessage(JSON.toJSONString(Message.error(messagingRequest.getId(), messagingRequest.getTopic(), "不支持的类型:" + messagingRequest.getType()))))).subscribe();
                    }
                } catch (Exception e) {
                    log.warn(e.getMessage(), e);
                    webSocketSession.send(Mono.just(webSocketSession.textMessage(JSON.toJSONString(Message.error("illegal_argument", (String) null, "消息格式错误"))))).subscribe();
                }
            }).then();
        }).doFinally(signalType -> {
            concurrentHashMap.values().forEach((v0) -> {
                v0.dispose();
            });
            concurrentHashMap.clear();
        });
    }

    public WebSocketMessagingHandler(MessagingManager messagingManager, UserTokenManager userTokenManager, ReactiveAuthenticationManager reactiveAuthenticationManager) {
        this.messagingManager = messagingManager;
        this.userTokenManager = userTokenManager;
        this.authenticationManager = reactiveAuthenticationManager;
    }
}
