package com.artfess.uc.kafka;

import com.alibaba.fastjson.JSON;
import com.artfess.base.util.BeanUtils;
import com.artfess.base.util.StringUtil;
import com.artfess.bpm.persistence.manager.BpmCallLogManager;
import com.artfess.bpm.persistence.model.BpmCallLog;
import com.artfess.uc.dto.*;
import com.artfess.uc.enums.MsgTypeEnum;
import com.artfess.uc.manager.*;
import com.artfess.uc.model.*;
import com.artfess.uc.params.org.OrgUserVo;
import com.artfess.uc.params.org.OrgVo;
import com.artfess.uc.params.user.UserVo;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @Author: wsf
 * @Description: kafka 消费者
 * @DateTime: 2025/6/17 11:22
 **/

@ConditionalOnProperty(name = "spring.kafka.enable", havingValue = "true", matchIfMissing = false)
@Component
@Slf4j
public class KafkaConsumer {

    @Resource
    private BpmCallLogManager bpmCallLogManager;
    @Resource
    private DemensionManager demensionManager;
    @Resource
    private OrgManager orgManager;
    @Resource
    private UserManager userManager;
    @Resource
    private RoleManager roleManager;
    @Resource
    private UserRoleManager userRoleManager;
    @Resource
    private PwdStrategyManager pwdStrategyManager;
    @Resource
    private OrgUserManager orgUserService;

    /**
     * 系统编码,安全生产系统
     */
    private static final String SYS_CODE = "XCJY";

    @KafkaListener(
            id = "orgListenerContainer",// 必须与 registry 中启动的一致
            topics = KafkaProducer.ORG_TOPIC,
            groupId = KafkaProducer.TOPIC_GROUP,
            autoStartup = "false" // 关键点：不要自动启动
    )
    public void consumeOrg(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Object msg = preHandleKafkaMsg(record, ack, topic);
        if (msg == null) {
            return;
        }
        BpmCallLog callLog = new BpmCallLog();
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            // 将消息字符串转成 JsonNode
            JsonNode rootNode = objectMapper.readTree(msg.toString());
            String msgType = rootNode.path("msgType").asText();
            String puuid = rootNode.path("puuid").asText();
            // 消费日志
            callLog = buildCallLog(record.topic(), msg.toString(), msgType, puuid, "组织同步消息", "kafka消费组织同步消息");

            // 内容正文
            JsonNode dataNode = rootNode.get("data");
            OrgKafkaDTO orgDto = objectMapper.treeToValue(dataNode, OrgKafkaDTO.class);

            // 构建 OrgVo 对象
            OrgVo orgVo = buildOrgVo(orgDto);

            // 根据消息类型执行对应操作
            MsgTypeEnum typeEnum = MsgTypeEnum.fromCode(msgType);
            if (typeEnum == null) {
                throw new RuntimeException("不支持的消息类型: " + msgType);
            } else {
                switch (typeEnum) {
                    case ADD:
                        orgManager.addOrg(orgVo);
                        break;
                    case UPDATE:
                        orgManager.updateOrg(orgVo);
                        break;
                    case DELETE:
                        orgManager.deleteOrg(orgVo.getCode());
                        break;
                    default:
                        throw new RuntimeException("不支持的消息类型: " + msgType);
                }
            }
            callLog.setIsSuccess(1);
        } catch (Exception e) {
            // 记录异常信息
            callLog.setParams(e.getMessage());
            callLog.setIsSuccess(0);
            log.error("Kafka 消费异常：{}", e.getMessage(), e);
        } finally {
            // 保存日志
            bpmCallLogManager.save(callLog);
            ack.acknowledge();
        }
    }

    @KafkaListener(
            id = "userListenerContainer",// 必须与 registry 中启动的一致,
            topics = KafkaProducer.USER_TOPIC,
            groupId = KafkaProducer.TOPIC_GROUP,
            autoStartup = "false" // 关键点：不要自动启动
    )
    public void consumeUser(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Object msg = preHandleKafkaMsg(record, ack, topic);
        if (msg == null) {
            return;
        }
        BpmCallLog callLog = new BpmCallLog();
        try {
            String jsonStr = JSON.toJSON(msg).toString();
            JsonNode rootNode = new ObjectMapper().readTree(jsonStr);
            // 消费日志
            String msgType = rootNode.path("msgType").asText();
            callLog = buildCallLog(record.topic(), jsonStr, msgType, null, "用户同步消息", "kafka消费用户同步消息");
            JsonNode dataNode = rootNode.get("data");
            ObjectMapper objectMapper = new ObjectMapper();
            UserKafkaDTO userDto = objectMapper.treeToValue(dataNode, UserKafkaDTO.class);
            // 构建userVo对象
            UserVo userVo = buildUser(userDto);
            // 根据消息类型执行对应操作
            MsgTypeEnum typeEnum = MsgTypeEnum.fromCode(msgType);
            if (typeEnum == null) {
                throw new IllegalArgumentException("不支持的消息类型: " + msgType);
            } else {
                switch (typeEnum) {
                    case ADD:
                        //判断新增用户是否在本系统中存在
                        if (userVo.getId() != null) {
                            userManager.updateUser(userVo);
                            updateUserOrg(userVo, userDto);
                        } else {
                            userVo.setStatus(1);
                            userManager.addUser(userVo);
                            setAddUserOrg(userVo.getAccount(), userDto);
                        }
                        break;
                    case UPDATE:
                        //修改用户信息
                        userManager.updateUser(userVo);
                        //修改用户所关联的组织
                        updateUserOrg(userVo, userDto);
                        break;
                    case DELETE:
                        //根据userCode查询是否存在该用户
                        User user = userManager.getByAccount(userDto.getUserCode());
                        //如果存在则删除该用户
                        if (BeanUtils.isNotEmpty(user)) {
                            userManager.deleteUserByIds(user.getId());
                        }
                        break;
                    default:
                        throw new IllegalArgumentException("不支持的消息类型: " + msgType);
                }
            }
            callLog.setIsSuccess(1);
        } catch (Exception e) {
            callLog.setParams(e.getMessage());
            callLog.setIsSuccess(0);
            log.error("Kafka 消费用户异常：{}", e.getMessage(), e);
        } finally {
            bpmCallLogManager.save(callLog);
            ack.acknowledge();
        }
    }

    @KafkaListener(
            id = "roleListenerContainer",
            topics = KafkaProducer.ROLE_TOPIC,
            groupId = KafkaProducer.TOPIC_GROUP,
            autoStartup = "false"
    )
    public void consumeRole(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Object msg = preHandleKafkaMsg(record, ack, topic);
        if (msg == null) {
            return;
        }
        BpmCallLog callLog = new BpmCallLog();
        try {
            String jsonStr = JSON.toJSON(msg).toString();
            ObjectMapper objectMapper = new ObjectMapper();
            UserRoleItemDTO dto = objectMapper.readValue(jsonStr, UserRoleItemDTO.class);
            // 消费日志
            callLog = buildCallLog(record.topic(), jsonStr, String.valueOf(dto.getType()), null, "角色同步消息", dto.getDesc());
            // 收集本系统的角色信息
            List<UserRoleRelationDTO> userRoleRelationDtoList = new ArrayList<>();
            dto.getValue().forEach(userRoleRelationDto -> {
                        if (SYS_CODE.equals(userRoleRelationDto.getModuleCode())) {
                            userRoleRelationDtoList.add(userRoleRelationDto);
                        }
                    }
            );
            if (CollectionUtils.isEmpty(userRoleRelationDtoList)) {
                callLog.setIsSuccess(1);
                return;
            }
            if (1 == dto.getType()) {
                // 用户关联角色,业务系统接收到用户关联角色的变更消息后，先删除当前用户与角色的关联关系，在根据消息内容添加用户与角色的关联关系。
                User user = userManager.getByAccount(dto.getKey());
                if (Objects.isNull(user)) {
                    throw new RuntimeException("根据手机号: " + dto.getKey() + "未找到对应的用户！");
                }
                // 固定一般用户不能被删除
                String[] zsRoleCodes = userRoleRelationDtoList.stream().map(UserRoleRelationDTO::getRoleCode).toArray(String[]::new);
                // 创建一个新数组并复制旧数据
                String[] roleCodes = Arrays.copyOf(zsRoleCodes, zsRoleCodes.length + 1);
                roleCodes[zsRoleCodes.length] = "ybry";
                // 保存用户与角色的关系(排他性保存，除了传入的角色以外，用户拥有的其他角色会被删除)
                userRoleManager.saveUserRole(user.getAccount(),roleCodes);
            } else if (2 == dto.getType()) {
                // 角色关联用户,业务系统接收到角色关联用户的变更消息后，先删除当前角色与用户的关联关系，在根据消息内容添加角色与用户的关联关系。
                Role role = roleManager.getRole(dto.getKey());
                if (Objects.isNull(role)) {
                    throw new RuntimeException("根据角色Code: " + dto.getKey() + "未找到对应的角色信息！");
                }
                // 构建角色关联用户
                List<UserRole> userRoleList = new ArrayList<>();
                for (UserRoleRelationDTO userRoleRelationDto : userRoleRelationDtoList) {
                    User user = userManager.getByAccount(userRoleRelationDto.getPhone());
                    if (!Objects.isNull(user)) {
                        UserRole userRole = new UserRole();
                        userRole.setRoleId(role.getId());
                        userRole.setUserId(user.getId());
                        userRoleList.add(userRole);
                    }
                }
                if (CollectionUtils.isEmpty(userRoleList)) {
                    throw new RuntimeException("构建角色关联用户失败，用户无法匹配");
                }
                // 删除角色关联的所有用户
                userRoleManager.removeByRoleId(role.getId(), LocalDateTime.now());
                // 保存角色关联用户信息
                userRoleManager.saveBatch(userRoleList);
            } else {
                throw new IllegalArgumentException("不支持的消息类型: " + dto.getType());
            }
            callLog.setIsSuccess(1);
        } catch (Exception e) {
            callLog.setParams(e.getMessage());
            callLog.setIsSuccess(0);
            log.error("Kafka 消费角色异常：{}", e.getMessage(), e);
        } finally {
            bpmCallLogManager.save(callLog);
            ack.acknowledge();
        }
    }

    /**
     * 通用Kafka消息预处理方法，判断消息是否为空，并记录日志。
     *
     * @param record Kafka记录
     * @param ack    Kafka确认对象
     * @param topic  Kafka主题
     * @return 如果消息非空则返回record.value()，否则返回null（已自动ack）
     */
    private Object preHandleKafkaMsg(ConsumerRecord<?, ?> record, Acknowledgment ack, String topic) {
        log.info("Kafka 监听主题:{},消费内容:{}", record.topic(), record.value());

        Object msg = record.value();
        if (msg == null) {
            log.warn("Kafka 消息为空，topic:{}", topic);
            ack.acknowledge();
            return null;
        }
        return msg;
    }

    /**
     * 构建 BpmCallLog 日志对象
     *
     * @param topic   kafka监听的Topic
     * @param msgStr  kafka消息源数据
     * @param msgType 消息内容解析node
     * @param puuid   推送的UUID
     * @param subject 日志标题
     * @param desc    日志描述
     * @return BpmCallLog
     */
    private BpmCallLog buildCallLog(String topic, String msgStr, String msgType, String puuid, String subject, String desc) {
        BpmCallLog callLog = new BpmCallLog();
        callLog.setUrl(topic);
        callLog.setCallTime(LocalDateTime.now());
        callLog.setSubject(subject);
        callLog.setDesc(desc);
        callLog.setResponse(msgStr);
        callLog.setEventType(msgType);
        callLog.setTaskId(puuid);
        return callLog;
    }

    /**
     * 构建 OrgVo 对象
     *
     * @param orgDto 组织增量同步kafka报文内容DTO
     * @return OrgVo
     */
    private OrgVo buildOrgVo(OrgKafkaDTO orgDto) {
        // 默认的组织维度ID
        Demension defaultDemension = demensionManager.getDefaultDemension();
        String demId = defaultDemension.getId();
        // 构建 OrgVo 对象
        OrgVo orgVo = new OrgVo();
        orgVo.setDemId(demId);
        orgVo.setCode(orgDto.getCode());
        orgVo.setName(orgDto.getName());
        // 设置组织类型
        orgVo.setOrgKind(orgDto.getType() != null && orgDto.getType() == 1 ? "ogn" : "dept");
        // 设置排序
        if (orgDto.getSort() != null) {
            orgVo.setOrderNo(Long.valueOf(orgDto.getSort()));
        } else {
            orgVo.setOrderNo(999L);
        }
        // 设置上级信息
        if (StringUtil.isEmpty(orgDto.getParentCode())) {
            orgVo.setParentId("0");
            orgVo.setGrade("1");
        } else {
            orgVo.setParentCode(orgDto.getParentCode());
        }
        return orgVo;
    }

    /**
     * 构建 UserVo 对象
     *
     * @param
     * @return UserVo
     */
    private UserVo buildUser(UserKafkaDTO dto) throws Exception {
        UserVo user = new UserVo();
        User account = userManager.getByAccount(dto.getUserCode());
        user.setAccount(dto.getUserCode());
        user.setFullname(dto.getName());
        if(dto.getUserCode().matches("^1[3-9]\\d{9}$")){
            user.setMobile(dto.getUserCode());
        }
        if (BeanUtils.isNotEmpty(account)){
            user.setId(account.getId());
            user.setStatus(account.getStatus());
            user.setUserType(account.getUserType());
            user.setPassword(account.getPassword());
        }else {
            user.setUserType(2);
            String pwd = pwdStrategyManager.getDefault().getInitPwd();
            user.setPassword(pwd);
        }
        return user;
    }

    /**
     * 设置新增用户的组织
     *
     * @param account
     * @param userKafkaDto
     * @throws Exception
     */
    public void setAddUserOrg(String account, UserKafkaDTO userKafkaDto) throws Exception {
        if (CollectionUtils.isNotEmpty(userKafkaDto.getOrgInfoDtos())) {
            // 多级组织
            for (UserOrgInfoDTO orgInfoDto : userKafkaDto.getOrgInfoDtos()) {
                //新增新的用户id
                OrgUserVo orgUserVo = new OrgUserVo();
                orgUserVo.setAccount(account);
                orgUserVo.setOrgCode(orgInfoDto.getOrgCode());
                if (orgInfoDto.getOrgCode().equals(userKafkaDto.getDeptCode())) {
                    orgUserVo.setIsMaster(1);
                } else {
                    orgUserVo.setIsMaster(0);
                }
                orgManager.addOrgUser(orgUserVo);
            }
        } else {
            OrgUserVo orgUserVo = new OrgUserVo();
            orgUserVo.setAccount(account);
            orgUserVo.setOrgCode(userKafkaDto.getDeptCode());
            orgUserVo.setIsMaster(1);
            orgManager.addOrgUser(orgUserVo);
        }
    }


    /**
     * 修改用户信息
     *
     * @param userVo
     * @param userKafkaDto
     * @throws Exception
     */
    public void updateUserOrg(UserVo userVo, UserKafkaDTO userKafkaDto) throws Exception {
        List<Org> existingOrgs = orgManager.getOrgsByAccount(userVo.getAccount());
        if (CollectionUtils.isEmpty(existingOrgs)) {
            return;
        }

        Set<String> existingOrgCodes = existingOrgs.stream()
                .map(Org::getCode)
                .collect(Collectors.toSet());

        List<UserOrgInfoDTO> orgInfoList = userKafkaDto.getOrgInfoDtos();

        // 判断是否为多级组织
        if (CollectionUtils.isNotEmpty(orgInfoList)) {
            deleteUserUnboundOrg(userVo.getId());
            for (UserOrgInfoDTO orgInfoDto : orgInfoList) {
                OrgUserVo orgUserVo = new OrgUserVo();
                orgUserVo.setAccount(userVo.getAccount());
                orgUserVo.setOrgCode(orgInfoDto.getOrgCode());
                orgUserVo.setIsMaster(orgInfoDto.getOrgCode().equals(userKafkaDto.getDeptCode()) ? 1 : 0);
                orgManager.addOrgUser(orgUserVo);
            }
        } else {
            // 单一组织情况
            if (!existingOrgCodes.contains(userKafkaDto.getDeptCode())) {
                deleteUserUnboundOrg(userVo.getId());

                OrgUserVo orgUserVo = new OrgUserVo();
                orgUserVo.setAccount(userVo.getAccount());
                orgUserVo.setOrgCode(userKafkaDto.getDeptCode());
                orgUserVo.setIsMaster(1);
                orgManager.addOrgUser(orgUserVo);
            }
        }
    }

    /**
     * 删除用户的其余组织
     *
     * @param userId
     * @throws Exception
     */
    private void deleteUserUnboundOrg(String userId) throws Exception {
        QueryWrapper<OrgUser> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("USER_ID_", userId);
        List<Map<String, Object>> list = orgUserService.listMaps(queryWrapper);

        if (CollectionUtils.isNotEmpty(list)) {
            for (Map<String, Object> map : list) {
                Object relId = map.get("relId");
                if (ObjectUtils.isEmpty(relId)) {
                    Object id = map.get("ID_");
                    if (ObjectUtils.isEmpty(id)) {
                        id = map.get("id_");
                    }
                    if (id != null) {
                        orgManager.delOrgUser(id.toString());
                    }
                    break;
                }
            }
        }
    }
}
