package com.artfess.uc.kafka; import com.alibaba.fastjson.JSON; import com.artfess.base.util.BeanUtils; import com.artfess.base.util.StringUtil; import com.artfess.bpm.persistence.manager.BpmCallLogManager; import com.artfess.bpm.persistence.model.BpmCallLog; import com.artfess.uc.dto.*; import com.artfess.uc.enums.MsgTypeEnum; import com.artfess.uc.manager.*; import com.artfess.uc.model.*; import com.artfess.uc.params.org.OrgUserVo; import com.artfess.uc.params.org.OrgVo; import com.artfess.uc.params.user.UserVo; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; /** * @Author: wsf * @Description: kafka 消费者 * @DateTime: 2025/6/17 11:22 **/ @ConditionalOnProperty(name = "spring.kafka.enable", havingValue = "true", matchIfMissing = false) @Component @Slf4j public class KafkaConsumer { @Resource private BpmCallLogManager bpmCallLogManager; @Resource private DemensionManager demensionManager; @Resource private OrgManager orgManager; @Resource private UserManager userManager; @Resource private RoleManager roleManager; @Resource private UserRoleManager userRoleManager; @Resource private PwdStrategyManager pwdStrategyManager; @Resource private OrgUserManager orgUserService; /** * 系统编码,安全生产系统 */ private static final String SYS_CODE = "XCJY"; @KafkaListener( id = "orgListenerContainer",// 必须与 registry 中启动的一致 topics = KafkaProducer.ORG_TOPIC, groupId = KafkaProducer.TOPIC_GROUP, autoStartup = "false" // 关键点:不要自动启动 ) public void consumeOrg(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Object msg = preHandleKafkaMsg(record, ack, topic); if (msg == null) { return; } BpmCallLog callLog = new BpmCallLog(); try { ObjectMapper objectMapper = new ObjectMapper(); // 将消息字符串转成 JsonNode JsonNode rootNode = objectMapper.readTree(msg.toString()); String msgType = rootNode.path("msgType").asText(); String puuid = rootNode.path("puuid").asText(); // 消费日志 callLog = buildCallLog(record.topic(), msg.toString(), msgType, puuid, "组织同步消息", "kafka消费组织同步消息"); // 内容正文 JsonNode dataNode = rootNode.get("data"); OrgKafkaDTO orgDto = objectMapper.treeToValue(dataNode, OrgKafkaDTO.class); // 构建 OrgVo 对象 OrgVo orgVo = buildOrgVo(orgDto); // 根据消息类型执行对应操作 MsgTypeEnum typeEnum = MsgTypeEnum.fromCode(msgType); if (typeEnum == null) { throw new RuntimeException("不支持的消息类型: " + msgType); } else { switch (typeEnum) { case ADD: orgManager.addOrg(orgVo); break; case UPDATE: orgManager.updateOrg(orgVo); break; case DELETE: orgManager.deleteOrg(orgVo.getCode()); break; default: throw new RuntimeException("不支持的消息类型: " + msgType); } } callLog.setIsSuccess(1); } catch (Exception e) { // 记录异常信息 callLog.setParams(e.getMessage()); callLog.setIsSuccess(0); log.error("Kafka 消费异常:{}", e.getMessage(), e); } finally { // 保存日志 bpmCallLogManager.save(callLog); ack.acknowledge(); } } @KafkaListener( id = "userListenerContainer",// 必须与 registry 中启动的一致, topics = KafkaProducer.USER_TOPIC, groupId = KafkaProducer.TOPIC_GROUP, autoStartup = "false" // 关键点:不要自动启动 ) public void consumeUser(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Object msg = preHandleKafkaMsg(record, ack, topic); if (msg == null) { return; } BpmCallLog callLog = new BpmCallLog(); try { String jsonStr = JSON.toJSON(msg).toString(); JsonNode rootNode = new ObjectMapper().readTree(jsonStr); // 消费日志 String msgType = rootNode.path("msgType").asText(); callLog = buildCallLog(record.topic(), jsonStr, msgType, null, "用户同步消息", "kafka消费用户同步消息"); JsonNode dataNode = rootNode.get("data"); ObjectMapper objectMapper = new ObjectMapper(); UserKafkaDTO userDto = objectMapper.treeToValue(dataNode, UserKafkaDTO.class); // 构建userVo对象 UserVo userVo = buildUser(userDto); // 根据消息类型执行对应操作 MsgTypeEnum typeEnum = MsgTypeEnum.fromCode(msgType); if (typeEnum == null) { throw new IllegalArgumentException("不支持的消息类型: " + msgType); } else { switch (typeEnum) { case ADD: //判断新增用户是否在本系统中存在 if (userVo.getId() != null) { userManager.updateUser(userVo); updateUserOrg(userVo, userDto); } else { userVo.setStatus(1); userManager.addUser(userVo); setAddUserOrg(userVo.getAccount(), userDto); } break; case UPDATE: //修改用户信息 userManager.updateUser(userVo); //修改用户所关联的组织 updateUserOrg(userVo, userDto); break; case DELETE: //根据userCode查询是否存在该用户 User user = userManager.getByAccount(userDto.getUserCode()); //如果存在则删除该用户 if (BeanUtils.isNotEmpty(user)) { userManager.deleteUserByIds(user.getId()); } break; default: throw new IllegalArgumentException("不支持的消息类型: " + msgType); } } callLog.setIsSuccess(1); } catch (Exception e) { callLog.setParams(e.getMessage()); callLog.setIsSuccess(0); log.error("Kafka 消费用户异常:{}", e.getMessage(), e); } finally { bpmCallLogManager.save(callLog); ack.acknowledge(); } } @KafkaListener( id = "roleListenerContainer", topics = KafkaProducer.ROLE_TOPIC, groupId = KafkaProducer.TOPIC_GROUP, autoStartup = "false" ) public void consumeRole(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Object msg = preHandleKafkaMsg(record, ack, topic); if (msg == null) { return; } BpmCallLog callLog = new BpmCallLog(); try { String jsonStr = JSON.toJSON(msg).toString(); ObjectMapper objectMapper = new ObjectMapper(); UserRoleItemDTO dto = objectMapper.readValue(jsonStr, UserRoleItemDTO.class); // 消费日志 callLog = buildCallLog(record.topic(), jsonStr, String.valueOf(dto.getType()), null, "角色同步消息", dto.getDesc()); // 收集本系统的角色信息 List userRoleRelationDtoList = new ArrayList<>(); dto.getValue().forEach(userRoleRelationDto -> { if (SYS_CODE.equals(userRoleRelationDto.getModuleCode())) { userRoleRelationDtoList.add(userRoleRelationDto); } } ); if (CollectionUtils.isEmpty(userRoleRelationDtoList)) { callLog.setIsSuccess(1); return; } if (1 == dto.getType()) { // 用户关联角色,业务系统接收到用户关联角色的变更消息后,先删除当前用户与角色的关联关系,在根据消息内容添加用户与角色的关联关系。 User user = userManager.getByAccount(dto.getKey()); if (Objects.isNull(user)) { throw new RuntimeException("根据手机号: " + dto.getKey() + "未找到对应的用户!"); } // 固定一般用户不能被删除 String[] zsRoleCodes = userRoleRelationDtoList.stream().map(UserRoleRelationDTO::getRoleCode).toArray(String[]::new); // 创建一个新数组并复制旧数据 String[] roleCodes = Arrays.copyOf(zsRoleCodes, zsRoleCodes.length + 1); roleCodes[zsRoleCodes.length] = "ybry"; // 保存用户与角色的关系(排他性保存,除了传入的角色以外,用户拥有的其他角色会被删除) userRoleManager.saveUserRole(user.getAccount(),roleCodes); } else if (2 == dto.getType()) { // 角色关联用户,业务系统接收到角色关联用户的变更消息后,先删除当前角色与用户的关联关系,在根据消息内容添加角色与用户的关联关系。 Role role = roleManager.getRole(dto.getKey()); if (Objects.isNull(role)) { throw new RuntimeException("根据角色Code: " + dto.getKey() + "未找到对应的角色信息!"); } // 构建角色关联用户 List userRoleList = new ArrayList<>(); for (UserRoleRelationDTO userRoleRelationDto : userRoleRelationDtoList) { User user = userManager.getByAccount(userRoleRelationDto.getPhone()); if (!Objects.isNull(user)) { UserRole userRole = new UserRole(); userRole.setRoleId(role.getId()); userRole.setUserId(user.getId()); userRoleList.add(userRole); } } if (CollectionUtils.isEmpty(userRoleList)) { throw new RuntimeException("构建角色关联用户失败,用户无法匹配"); } // 删除角色关联的所有用户 userRoleManager.removeByRoleId(role.getId(), LocalDateTime.now()); // 保存角色关联用户信息 userRoleManager.saveBatch(userRoleList); } else { throw new IllegalArgumentException("不支持的消息类型: " + dto.getType()); } callLog.setIsSuccess(1); } catch (Exception e) { callLog.setParams(e.getMessage()); callLog.setIsSuccess(0); log.error("Kafka 消费角色异常:{}", e.getMessage(), e); } finally { bpmCallLogManager.save(callLog); ack.acknowledge(); } } /** * 通用Kafka消息预处理方法,判断消息是否为空,并记录日志。 * * @param record Kafka记录 * @param ack Kafka确认对象 * @param topic Kafka主题 * @return 如果消息非空则返回record.value(),否则返回null(已自动ack) */ private Object preHandleKafkaMsg(ConsumerRecord record, Acknowledgment ack, String topic) { log.info("Kafka 监听主题:{},消费内容:{}", record.topic(), record.value()); Object msg = record.value(); if (msg == null) { log.warn("Kafka 消息为空,topic:{}", topic); ack.acknowledge(); return null; } return msg; } /** * 构建 BpmCallLog 日志对象 * * @param topic kafka监听的Topic * @param msgStr kafka消息源数据 * @param msgType 消息内容解析node * @param puuid 推送的UUID * @param subject 日志标题 * @param desc 日志描述 * @return BpmCallLog */ private BpmCallLog buildCallLog(String topic, String msgStr, String msgType, String puuid, String subject, String desc) { BpmCallLog callLog = new BpmCallLog(); callLog.setUrl(topic); callLog.setCallTime(LocalDateTime.now()); callLog.setSubject(subject); callLog.setDesc(desc); callLog.setResponse(msgStr); callLog.setEventType(msgType); callLog.setTaskId(puuid); return callLog; } /** * 构建 OrgVo 对象 * * @param orgDto 组织增量同步kafka报文内容DTO * @return OrgVo */ private OrgVo buildOrgVo(OrgKafkaDTO orgDto) { // 默认的组织维度ID Demension defaultDemension = demensionManager.getDefaultDemension(); String demId = defaultDemension.getId(); // 构建 OrgVo 对象 OrgVo orgVo = new OrgVo(); orgVo.setDemId(demId); orgVo.setCode(orgDto.getCode()); orgVo.setName(orgDto.getName()); // 设置组织类型 orgVo.setOrgKind(orgDto.getType() != null && orgDto.getType() == 1 ? "ogn" : "dept"); // 设置排序 if (orgDto.getSort() != null) { orgVo.setOrderNo(Long.valueOf(orgDto.getSort())); } else { orgVo.setOrderNo(999L); } // 设置上级信息 if (StringUtil.isEmpty(orgDto.getParentCode())) { orgVo.setParentId("0"); orgVo.setGrade("1"); } else { orgVo.setParentCode(orgDto.getParentCode()); } return orgVo; } /** * 构建 UserVo 对象 * * @param * @return UserVo */ private UserVo buildUser(UserKafkaDTO dto) throws Exception { UserVo user = new UserVo(); User account = userManager.getByAccount(dto.getUserCode()); user.setAccount(dto.getUserCode()); user.setFullname(dto.getName()); if(dto.getUserCode().matches("^1[3-9]\\d{9}$")){ user.setMobile(dto.getUserCode()); } if (BeanUtils.isNotEmpty(account)){ user.setId(account.getId()); user.setStatus(account.getStatus()); user.setUserType(account.getUserType()); user.setPassword(account.getPassword()); }else { user.setUserType(2); String pwd = pwdStrategyManager.getDefault().getInitPwd(); user.setPassword(pwd); } return user; } /** * 设置新增用户的组织 * * @param account * @param userKafkaDto * @throws Exception */ public void setAddUserOrg(String account, UserKafkaDTO userKafkaDto) throws Exception { if (CollectionUtils.isNotEmpty(userKafkaDto.getOrgInfoDtos())) { // 多级组织 for (UserOrgInfoDTO orgInfoDto : userKafkaDto.getOrgInfoDtos()) { //新增新的用户id OrgUserVo orgUserVo = new OrgUserVo(); orgUserVo.setAccount(account); orgUserVo.setOrgCode(orgInfoDto.getOrgCode()); if (orgInfoDto.getOrgCode().equals(userKafkaDto.getDeptCode())) { orgUserVo.setIsMaster(1); } else { orgUserVo.setIsMaster(0); } orgManager.addOrgUser(orgUserVo); } } else { OrgUserVo orgUserVo = new OrgUserVo(); orgUserVo.setAccount(account); orgUserVo.setOrgCode(userKafkaDto.getDeptCode()); orgUserVo.setIsMaster(1); orgManager.addOrgUser(orgUserVo); } } /** * 修改用户信息 * * @param userVo * @param userKafkaDto * @throws Exception */ public void updateUserOrg(UserVo userVo, UserKafkaDTO userKafkaDto) throws Exception { List existingOrgs = orgManager.getOrgsByAccount(userVo.getAccount()); if (CollectionUtils.isEmpty(existingOrgs)) { return; } Set existingOrgCodes = existingOrgs.stream() .map(Org::getCode) .collect(Collectors.toSet()); List orgInfoList = userKafkaDto.getOrgInfoDtos(); // 判断是否为多级组织 if (CollectionUtils.isNotEmpty(orgInfoList)) { deleteUserUnboundOrg(userVo.getId()); for (UserOrgInfoDTO orgInfoDto : orgInfoList) { OrgUserVo orgUserVo = new OrgUserVo(); orgUserVo.setAccount(userVo.getAccount()); orgUserVo.setOrgCode(orgInfoDto.getOrgCode()); orgUserVo.setIsMaster(orgInfoDto.getOrgCode().equals(userKafkaDto.getDeptCode()) ? 1 : 0); orgManager.addOrgUser(orgUserVo); } } else { // 单一组织情况 if (!existingOrgCodes.contains(userKafkaDto.getDeptCode())) { deleteUserUnboundOrg(userVo.getId()); OrgUserVo orgUserVo = new OrgUserVo(); orgUserVo.setAccount(userVo.getAccount()); orgUserVo.setOrgCode(userKafkaDto.getDeptCode()); orgUserVo.setIsMaster(1); orgManager.addOrgUser(orgUserVo); } } } /** * 删除用户的其余组织 * * @param userId * @throws Exception */ private void deleteUserUnboundOrg(String userId) throws Exception { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("USER_ID_", userId); List> list = orgUserService.listMaps(queryWrapper); if (CollectionUtils.isNotEmpty(list)) { for (Map map : list) { Object relId = map.get("relId"); if (ObjectUtils.isEmpty(relId)) { Object id = map.get("ID_"); if (ObjectUtils.isEmpty(id)) { id = map.get("id_"); } if (id != null) { orgManager.delOrgUser(id.toString()); } break; } } } } }