package com.artfess.rescue.config; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.artfess.base.conf.NettyConfig; import com.artfess.base.constants.WebsocketConst; import com.artfess.base.util.*; import com.artfess.base.webSocket.SubroutineManage; import com.artfess.base.webSocket.WebScoketMsg; import com.artfess.rescue.patrol.manager.BizInspectionVideoManager; import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.channel.*; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.util.AttributeKey; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * @author 陈实 * @Package com.artfess.base.webSocket * @date 2021/9/16 9:57 * @Description: websocket处理类 */ @Component @ChannelHandler.Sharable public class RescueWebSocketHandler extends SimpleChannelInboundHandler { /// /TextWebSocketFrame是netty用于处理websocket发来的文本对象 private static final Logger log = LoggerFactory.getLogger(RescueWebSocketHandler.class); //在线人数 public static int online; /** * 视频学习webSocket的messageType */ private static final String VIDEO_TASK_TYPE = "video_task"; @Resource private BizInspectionVideoManager inspectionVideoManager; @Resource(name = "bmpExecutorService") private ExecutorService executorService; /** * 一旦连接,第一个被执行 * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.trace("【webSocket】:handlerAdded 被调用" + ctx.channel().id().asLongText()); // 添加到channelGroup 通道组 NettyConfig.getChannelGroup().add(ctx.channel()); //判断是否登录 online = NettyConfig.getChannelGroup().size(); log.info("【webSocket】:" + ctx.channel().remoteAddress() + "上线了!"); log.trace("【webSocket】:========上线线:在线人数:" + online + "!========"); ctx.channel().id(); log.trace("【webSocket】:========" + ctx.channel().id() + "!========"); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception { //接收到的消息 SubroutineManage subroutineManage = AppUtil.getBean(SubroutineManage.class); TextWebSocketFrame txFrame = frame; log.trace("【webSocket】:服务器接收到:{}", "来自:【" + ctx.channel().remoteAddress() + "】 的数据:" + txFrame.text()); WebScoketMsg msgContent = JSONUtil.toBean(txFrame.text(), WebScoketMsg.class); String messageType = msgContent.getMessageType(); String account = msgContent.getSender(); if (StringUtil.isNotEmpty(account)) { Set channelIdList = NettyConfig.getUserChanIdMap().get(account); if (BeanUtils.isNotEmpty(channelIdList)) { channelIdList.add(ctx.channel().id()); } else { channelIdList = new LinkedHashSet(); channelIdList.add(ctx.channel().id()); NettyConfig.getUserChanIdMap().put(account, channelIdList); } } if (messageType.equals("ping")) { //修改子程序的现在状态 String message = ""; message=msgContent.getMessage(); if(StringUtil.isEmpty(message)){ message= LocalDateTime.now().toString(); } NettyConfig.getSubroutineChanIdMap().put(ctx.channel().id().asShortText(), message); // if(StringUtil.isNotEmpty(message)){ // subroutineManage.updateStatusByHeartbeat(message,"NORMAL"); // } //心跳响应 ObjectNode obj = JsonUtil.getMapper().createObjectNode(); obj.put(WebsocketConst.MSG_TXT, "心跳响应时间:" + DateUtils.now());//消息内容 ctx.channel().writeAndFlush(new TextWebSocketFrame(obj.toString())); } else if (messageType.equals("topic")) { String message = msgContent.getMessage(); NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(message)); } else if (messageType.equals("msg")) { //处理私聊的任务,如果对方也在线,则推送消息 String jsrAccounts = msgContent.getReceiver();//chatId可以是多个人的账号,以,隔开 if (StringUtil.isNotEmpty(jsrAccounts)) { String[] accountStr = jsrAccounts.split(","); for (String jsraccount : accountStr) { Set channelIdList = NettyConfig.getUserChanIdMap().get(jsraccount); if (BeanUtils.isNotEmpty(channelIdList)) { for (ChannelId chanId : channelIdList) { Channel userChannel = NettyConfig.getChannelGroup().find(chanId); if (userChannel != null) { userChannel.writeAndFlush(new TextWebSocketFrame(msgContent.getMessage())); } } } } } } else if (VIDEO_TASK_TYPE.equals(messageType)) { String message = msgContent.getMessage(); String userId = msgContent.getSender(); JSONObject jsonObject = JSONObject.parseObject(message); Integer time = jsonObject.getInteger("time"); JSONArray jsonIds = jsonObject.getJSONArray("ids"); if (CollectionUtils.isEmpty(jsonIds) || time == null) { log.info("webSocket接收参数异常!userId:{},time:{},ids:{}", userId, time, jsonIds); return; } List ids = IntStream.range(0, jsonIds.size()) .mapToObj(jsonIds::getString) .collect(Collectors.toList()); executorService.execute(() -> { try { inspectionVideoManager.inspect(userId, time, ids); } catch (Exception e) { log.info("webSocket异步批量保存巡检结果失败,参数 userId:{},time:{},ids:{}", userId, time, ids); } }); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.trace("【webSocket】:handlerRemoved 被调用" + ctx.channel().id().asLongText()); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); online = NettyConfig.getChannelGroup().size(); // studyFileManager.delStudyTask(ctx.channel().id().asLongText()); removeUserId(ctx); log.trace("【webSocket】:========下线:在线人数:" + online + "!========"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("【webSocket】:异常:{}", cause.getMessage(), cause); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); } /** * 删除用户与channel的对应关系 * * @param ctx */ private void removeUserId(ChannelHandlerContext ctx) { AttributeKey key = AttributeKey.valueOf("account"); String account = ctx.channel().attr(key).get(); if (StringUtil.isNotEmpty(account)) { Set channelIdList = NettyConfig.getUserChanIdMap().get(account); if (BeanUtils.isNotEmpty(channelIdList)) { channelIdList.remove(ctx.channel().id()); } if (channelIdList.size() == 0) { NettyConfig.getUserChanIdMap().remove(account); } } //删除子程序的在线状态 // SubroutineManage subroutineManage = AppUtil.getBean(SubroutineManage.class); // String message = NettyConfig.getSubroutineChanIdMap().get(ctx.channel().id().asShortText()); // subroutineManage.updateStatusByHeartbeat(message,"NONE"); NettyConfig.getSubroutineChanIdMap().remove(ctx.channel().id().asShortText()); log.info("【webSocket】:" + ctx.channel().remoteAddress() + "断开连接!"); } /** * 发送用户为登陆消息:
* * @param ctx
* @author yangxiaodong
* @taskId
*/ public void sendUserNotLoginMsg(ChannelHandlerContext ctx) { ChannelFuture future = ctx.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.POLICY_VIOLATION, "未登录!")); future.addListener(ChannelFutureListener.CLOSE); } }