package com.artfess.uc.kafka;

import com.alibaba.fastjson.JSON;
import com.artfess.base.util.BeanUtils;
import com.artfess.base.util.StringUtil;
import com.artfess.bpm.persistence.manager.BpmCallLogManager;
import com.artfess.bpm.persistence.model.BpmCallLog;
import com.artfess.uc.dto.*;
import com.artfess.uc.enums.MsgTypeEnum;
import com.artfess.uc.manager.*;
import com.artfess.uc.model.*;
import com.artfess.uc.params.org.OrgUserVo;
import com.artfess.uc.params.org.OrgVo;
import com.artfess.uc.params.user.UserVo;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @Author: wsf
 * @Description: kafka 消费者
 * @DateTime: 2025/6/17 11:22
 **/

@ConditionalOnProperty(name = "spring.kafka.enable", havingValue = "true", matchIfMissing = false)
@Component
@Slf4j
public class KafkaConsumer {

    @Resource
    private BpmCallLogManager bpmCallLogManager;
    @Resource
    private DemensionManager demensionManager;
    @Resource
    private OrgManager orgManager;
    @Resource
    private UserManager userManager;
    @Resource
    private RoleManager roleManager;
    @Resource
    private UserRoleManager userRoleManager;
    @Resource
    private PwdStrategyManager pwdStrategyManager;
    @Resource
    private OrgUserManager orgUserService;

    /**
     * 系统编码,安全生产系统
     */
    private static final String SYS_CODE = "XCJYN";

    @KafkaListener(
            id = "orgListenerContainer",// 必须与 registry 中启动的一致
            topics = KafkaProducer.ORG_TOPIC,
            groupId = KafkaProducer.TOPIC_GROUP,
            autoStartup = "false" // 关键点：不要自动启动
    )
    public void consumeOrg(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Object msg = preHandleKafkaMsg(record, ack, topic);
        if (msg == null) {
            return;
        }

        // 重试次数
        int maxRetry = 3;
        BpmCallLog callLog = new BpmCallLog();

        for (int attempt = 1; attempt <= maxRetry; attempt++) {
            try {
                ObjectMapper objectMapper = new ObjectMapper();
                // 将消息字符串转成 JsonNode
                JsonNode rootNode = objectMapper.readTree(msg.toString());
                String msgType = rootNode.path("msgType").asText();
                String puuid = rootNode.path("puuid").asText();
                // 消费日志
                callLog = buildCallLog(record.topic(), msg.toString(), msgType, puuid, "组织同步消息", "kafka消费组织同步消息");

                // 内容正文
                JsonNode dataNode = rootNode.get("data");
                OrgKafkaDTO orgDto = objectMapper.treeToValue(dataNode, OrgKafkaDTO.class);

                // 构建 OrgVo 对象
                OrgVo orgVo = buildOrgVo(orgDto);

                // 根据消息类型执行对应操作
                MsgTypeEnum typeEnum = MsgTypeEnum.fromCode(msgType);
                if (typeEnum == null) {
                    throw new RuntimeException("不支持的消息类型: " + msgType);
                }
                switch (typeEnum) {
                    case ADD:
                        orgManager.addOrg(orgVo);
                        break;
                    case UPDATE:
                        orgManager.updateOrg(orgVo);
                        break;
                    case DELETE:
                        orgManager.deleteOrg(orgVo.getCode());
                        break;
                    default:
                        throw new RuntimeException("不支持的消息类型: " + msgType);
                }

                // 成功
                callLog.setIsSuccess(1);
                bpmCallLogManager.save(callLog);
                ack.acknowledge();
                return; // 消费成功退出
            } catch (Exception e) {
                log.error("第 {} 次消费失败：{}", attempt, e.getMessage(), e);

                if (attempt == maxRetry) {
                    callLog.setParams(e.getMessage());
                    callLog.setIsSuccess(0);
                    bpmCallLogManager.save(callLog);
                    ack.acknowledge(); // 失败3次，放弃消息
                } else {
                    try {
                        Thread.sleep(1000); // 简单退避 1 秒
                    } catch (InterruptedException ignored) {

                    }
                }
            }
        }
    }


    @KafkaListener(
            id = "userListenerContainer",// 必须与 registry 中启动的一致,
            topics = KafkaProducer.USER_TOPIC,
            groupId = KafkaProducer.TOPIC_GROUP,
            autoStartup = "false" // 关键点：不要自动启动
    )
    public void consumeUser(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Object msg = preHandleKafkaMsg(record, ack, topic);
        if (msg == null) {
            return;
        }
        BpmCallLog callLog = new BpmCallLog();
        // 重试次数
        int maxRetry = 3;
        for (int attempt = 1; attempt <= maxRetry; attempt++) {
            try {
                String jsonStr = JSON.toJSON(msg).toString();
                JsonNode rootNode = new ObjectMapper().readTree(jsonStr);
                // 消费日志
                String msgType = rootNode.path("msgType").asText();
                callLog = buildCallLog(record.topic(), jsonStr, msgType, null, "用户同步消息", "kafka消费用户同步消息");
                JsonNode dataNode = rootNode.get("data");
                ObjectMapper objectMapper = new ObjectMapper();
                UserKafkaDTO userDto = objectMapper.treeToValue(dataNode, UserKafkaDTO.class);
                // 构建userVo对象
                UserVo userVo = buildUser(userDto);
                // 根据消息类型执行对应操作
                MsgTypeEnum typeEnum = MsgTypeEnum.fromCode(msgType);
                if (typeEnum == null) {
                    throw new IllegalArgumentException("不支持的消息类型: " + msgType);
                } else {
                    switch (typeEnum) {
                        case ADD:
                            //判断新增用户是否在本系统中存在
                            if (userVo.getId() != null) {
                                userManager.updateUser(userVo);
                                updateUserOrg(userVo, userDto);
                            } else {
                                userVo.setStatus(1);
                                userManager.addUser(userVo);
                                setAddUserOrg(userVo.getAccount(), userDto);
                            }
                            break;
                        case UPDATE:
                            //修改用户信息
                            userManager.updateUser(userVo);
                            //修改用户所关联的组织
                            updateUserOrg(userVo, userDto);
                            break;
                        case DELETE:
                            //根据userCode查询是否存在该用户
                            User user = userManager.getByAccount(userDto.getUserCode());
                            //如果存在则删除该用户
                            if (BeanUtils.isNotEmpty(user)) {
                                userManager.deleteUserByIds(user.getId());
                            }
                            break;
                        default:
                            throw new IllegalArgumentException("不支持的消息类型: " + msgType);
                    }
                }
                callLog.setIsSuccess(1);
                ack.acknowledge();
                bpmCallLogManager.save(callLog);
                ack.acknowledge();
                return; // 消费成功退出
            } catch (Exception e) {
                log.error("第 {} 次消费失败：{}", attempt, e.getMessage(), e);

                if (attempt == maxRetry) {
                    callLog.setParams(e.getMessage());
                    callLog.setIsSuccess(0);
                    bpmCallLogManager.save(callLog);
                    ack.acknowledge(); // 失败3次，放弃消息
                } else {
                    try {
                        Thread.sleep(1000); // 简单退避 1 秒
                    } catch (InterruptedException ignored) {

                    }
                }
            }
        }
    }

    @KafkaListener(
            id = "roleListenerContainer",
            topics = KafkaProducer.ROLE_TOPIC,
            groupId = KafkaProducer.TOPIC_GROUP,
            autoStartup = "false"
    )
    public void consumeRole(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Object msg = preHandleKafkaMsg(record, ack, topic);
        if (msg == null) {
            return;
        }
        BpmCallLog callLog = new BpmCallLog();
        // 重试次数
        int maxRetry = 3;
        for (int attempt = 1; attempt <= maxRetry; attempt++) {
            try {
                String jsonStr = JSON.toJSON(msg).toString();
                ObjectMapper objectMapper = new ObjectMapper();
                UserRoleItemDTO dto = objectMapper.readValue(jsonStr, UserRoleItemDTO.class);
                // 消费日志
                callLog = buildCallLog(record.topic(), jsonStr, String.valueOf(dto.getType()), null, "角色同步消息", dto.getDesc());
                // 收集本系统的角色信息
                List<UserRoleRelationDTO> userRoleRelationDtoList = new ArrayList<>();
                dto.getValue().forEach(userRoleRelationDto -> {
                            if (SYS_CODE.equals(userRoleRelationDto.getModuleCode())) {
                                userRoleRelationDtoList.add(userRoleRelationDto);
                            }
                        }
                );
                if (CollectionUtils.isEmpty(userRoleRelationDtoList)) {
                    callLog.setIsSuccess(1);
                    return;
                }
                if (1 == dto.getType()) {
                    // 用户关联角色,业务系统接收到用户关联角色的变更消息后，先删除当前用户与角色的关联关系，在根据消息内容添加用户与角色的关联关系。
                    User user = userManager.getByAccount(dto.getKey());
                    if (Objects.isNull(user)) {
                        throw new RuntimeException("根据手机号: " + dto.getKey() + "未找到对应的用户！");
                    }
                    // 固定一般用户不能被删除
                    String[] zsRoleCodes = userRoleRelationDtoList.stream().map(UserRoleRelationDTO::getRoleCode).toArray(String[]::new);
                    // 创建一个新数组并复制旧数据
                    String[] roleCodes = Arrays.copyOf(zsRoleCodes, zsRoleCodes.length + 1);
                    roleCodes[zsRoleCodes.length] = "ybry";
                    // 保存用户与角色的关系(排他性保存，除了传入的角色以外，用户拥有的其他角色会被删除)
                    userRoleManager.saveUserRole(user.getAccount(), roleCodes);
                } else if (2 == dto.getType()) {
                    // 角色关联用户,业务系统接收到角色关联用户的变更消息后，先删除当前角色与用户的关联关系，在根据消息内容添加角色与用户的关联关系。
                    Role role = roleManager.getRole(dto.getKey());
                    // 构建角色关联用户
                    List<UserRole> userRoleList = new ArrayList<>();
                    for (UserRoleRelationDTO userRoleRelationDto : userRoleRelationDtoList) {
                        User user = userManager.getByAccount(userRoleRelationDto.getPhone());
                        if (!Objects.isNull(user)) {
                            UserRole userRole = new UserRole();
                            userRole.setRoleId(role.getId());
                            userRole.setUserId(user.getId());
                            userRoleList.add(userRole);
                        }
                    }
                    if (CollectionUtils.isEmpty(userRoleList)) {
                        throw new RuntimeException("构建角色关联用户失败，用户无法匹配");
                    }
                    // 删除角色关联的所有用户
                    userRoleManager.removeByRoleId(role.getId(), LocalDateTime.now());
                    // 保存角色关联用户信息
                    userRoleManager.saveBatch(userRoleList);
                } else {
                    throw new IllegalArgumentException("不支持的消息类型: " + dto.getType());
                }
                callLog.setIsSuccess(1);
                bpmCallLogManager.save(callLog);
                ack.acknowledge();
                return; // 消费成功退出
            } catch (Exception e) {
                log.error("第 {} 次消费失败：{}", attempt, e.getMessage(), e);
                if (attempt == maxRetry) {
                    callLog.setParams(e.getMessage());
                    callLog.setIsSuccess(0);
                    bpmCallLogManager.save(callLog);
                    ack.acknowledge(); // 失败3次，放弃消息
                } else {
                    try {
                        Thread.sleep(1000); // 简单退避 1 秒
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
    }

    /**
     * 通用Kafka消息预处理方法，判断消息是否为空，并记录日志。
     *
     * @param record Kafka记录
     * @param ack    Kafka确认对象
     * @param topic  Kafka主题
     * @return 如果消息非空则返回record.value()，否则返回null（已自动ack）
     */
    private Object preHandleKafkaMsg(ConsumerRecord<?, ?> record, Acknowledgment ack, String topic) {
        log.info("Kafka 监听主题:{},消费内容:{}", record.topic(), record.value());

        Object msg = record.value();
        if (msg == null) {
            log.warn("Kafka 消息为空，topic:{}", topic);
            ack.acknowledge();
            return null;
        }
        return msg;
    }

    /**
     * 构建 BpmCallLog 日志对象
     *
     * @param topic   kafka监听的Topic
     * @param msgStr  kafka消息源数据
     * @param msgType 消息内容解析node
     * @param puuid   推送的UUID
     * @param subject 日志标题
     * @param desc    日志描述
     * @return BpmCallLog
     */
    private BpmCallLog buildCallLog(String topic, String msgStr, String msgType, String puuid, String
            subject, String desc) {
        BpmCallLog callLog = new BpmCallLog();
        callLog.setUrl(topic);
        callLog.setCallTime(LocalDateTime.now());
        callLog.setSubject(subject);
        callLog.setDesc(desc);
        callLog.setResponse(msgStr);
        callLog.setEventType(msgType);
        callLog.setTaskId(puuid);
        return callLog;
    }

    /**
     * 构建 OrgVo 对象
     *
     * @param orgDto 组织增量同步kafka报文内容DTO
     * @return OrgVo
     */
    private OrgVo buildOrgVo(OrgKafkaDTO orgDto) {
        // 默认的组织维度ID
        Demension defaultDemension = demensionManager.getDefaultDemension();
        String demId = defaultDemension.getId();
        // 构建 OrgVo 对象
        OrgVo orgVo = new OrgVo();
        orgVo.setDemId(demId);
        orgVo.setCode(orgDto.getCode());
        orgVo.setName(orgDto.getName());
        // 设置组织类型
        orgVo.setOrgKind(orgDto.getType() != null && orgDto.getType() == 1 ? "ogn" : "dept");
        // 设置排序
        if (orgDto.getSort() != null) {
            orgVo.setOrderNo(Long.valueOf(orgDto.getSort()));
        } else {
            orgVo.setOrderNo(999L);
        }
        // 设置上级信息
        if (StringUtil.isEmpty(orgDto.getParentCode())) {
            orgVo.setParentId("0");
            orgVo.setGrade("1");
        } else {
            orgVo.setParentCode(orgDto.getParentCode());
        }
        return orgVo;
    }

    /**
     * 构建 UserVo 对象
     *
     * @param
     * @return UserVo
     */
    private UserVo buildUser(UserKafkaDTO dto) throws Exception {
        UserVo user = new UserVo();
        User account = userManager.getByAccount(dto.getUserCode());
        user.setAccount(dto.getUserCode());
        user.setFullname(dto.getName());
        if (dto.getUserCode().matches("^1[3-9]\\d{9}$")) {
            user.setMobile(dto.getUserCode());
        }
        if (BeanUtils.isNotEmpty(account)) {
            user.setId(account.getId());
            user.setStatus(account.getStatus());
            user.setUserType(account.getUserType());
            user.setPassword(account.getPassword());
        } else {
            user.setUserType(2);
            String pwd = pwdStrategyManager.getDefault().getInitPwd();
            user.setPassword(pwd);
        }
        return user;
    }

    /**
     * 设置新增用户的组织
     *
     * @param account
     * @param userKafkaDto
     * @throws Exception
     */
    public void setAddUserOrg(String account, UserKafkaDTO userKafkaDto) throws Exception {
        if (CollectionUtils.isNotEmpty(userKafkaDto.getOrgInfoDtos())) {
            // 多级组织
            for (UserOrgInfoDTO orgInfoDto : userKafkaDto.getOrgInfoDtos()) {
                //新增新的用户id
                OrgUserVo orgUserVo = new OrgUserVo();
                orgUserVo.setAccount(account);
                orgUserVo.setOrgCode(orgInfoDto.getOrgCode());
                if (orgInfoDto.getOrgCode().equals(userKafkaDto.getDeptCode())) {
                    orgUserVo.setIsMaster(1);
                } else {
                    orgUserVo.setIsMaster(0);
                }
                orgManager.addOrgUser(orgUserVo);
            }
        } else {
            OrgUserVo orgUserVo = new OrgUserVo();
            orgUserVo.setAccount(account);
            orgUserVo.setOrgCode(userKafkaDto.getDeptCode());
            orgUserVo.setIsMaster(1);
            orgManager.addOrgUser(orgUserVo);
        }
    }


    /**
     * 修改用户信息
     *
     * @param userVo
     * @param userKafkaDto
     * @throws Exception
     */
    public void updateUserOrg(UserVo userVo, UserKafkaDTO userKafkaDto) throws Exception {
        List<Org> existingOrgs = orgManager.getOrgsByAccount(userVo.getAccount());
        if (CollectionUtils.isEmpty(existingOrgs)) {
            return;
        }

        Set<String> existingOrgCodes = existingOrgs.stream()
                .map(Org::getCode)
                .collect(Collectors.toSet());

        List<UserOrgInfoDTO> orgInfoList = userKafkaDto.getOrgInfoDtos();

        // 判断是否为多级组织
        if (CollectionUtils.isNotEmpty(orgInfoList)) {
            deleteUserUnboundOrg(userVo.getId());
            for (UserOrgInfoDTO orgInfoDto : orgInfoList) {
                OrgUserVo orgUserVo = new OrgUserVo();
                orgUserVo.setAccount(userVo.getAccount());
                orgUserVo.setOrgCode(orgInfoDto.getOrgCode());
                orgUserVo.setIsMaster(orgInfoDto.getOrgCode().equals(userKafkaDto.getDeptCode()) ? 1 : 0);
                orgManager.addOrgUser(orgUserVo);
            }
        } else {
            // 单一组织情况
            if (!existingOrgCodes.contains(userKafkaDto.getDeptCode())) {
                deleteUserUnboundOrg(userVo.getId());

                OrgUserVo orgUserVo = new OrgUserVo();
                orgUserVo.setAccount(userVo.getAccount());
                orgUserVo.setOrgCode(userKafkaDto.getDeptCode());
                orgUserVo.setIsMaster(1);
                orgManager.addOrgUser(orgUserVo);
            }
        }
    }

    /**
     * 删除用户的其余组织
     *
     * @param userId
     * @throws Exception
     */
    private void deleteUserUnboundOrg(String userId) throws Exception {
        QueryWrapper<OrgUser> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("USER_ID_", userId);
        List<Map<String, Object>> list = orgUserService.listMaps(queryWrapper);

        if (CollectionUtils.isNotEmpty(list)) {
            for (Map<String, Object> map : list) {
                Object relId = map.get("relId");
                if (ObjectUtils.isEmpty(relId)) {
                    Object id = map.get("ID_");
                    if (ObjectUtils.isEmpty(id)) {
                        id = map.get("id_");
                    }
                    if (id != null) {
                        orgManager.delOrgUser(id.toString());
                    }
                    break;
                }
            }
        }
    }
}
