package com.artfess.base.webSocket; import cn.hutool.json.JSONUtil; import com.artfess.base.conf.NettyConfig; import com.artfess.base.constants.WebsocketConst; import com.artfess.base.feign.ApplicationFeignService; import com.artfess.base.feign.BizFeignService; import com.artfess.base.util.AppUtil; import com.artfess.base.util.BeanUtils; import com.artfess.base.util.DateUtils; import com.artfess.base.util.JsonUtil; import com.artfess.base.util.StringUtil; import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.LinkedHashSet; import java.util.Set; /** * @author 陈实 * @Package com.artfess.base.webSocket * @date 2021/9/16 9:57 * @Description: websocket处理类 */ @Component @ChannelHandler.Sharable public class WebSocketHandler extends SimpleChannelInboundHandler {////TextWebSocketFrame是netty用于处理websocket发来的文本对象 private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class); //在线人数 public static int online; private WebSocketServerHandshaker handshaker; /** * 一旦连接,第一个被执行 * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.trace("【webSocket】:handlerAdded 被调用" + ctx.channel().id().asLongText()); // 添加到channelGroup 通道组 NettyConfig.getChannelGroup().add(ctx.channel()); //判断是否登录 online = NettyConfig.getChannelGroup().size(); log.info("【webSocket】:"+ctx.channel().remoteAddress() + "上线了!"); log.trace("【webSocket】:========上线线:在线人数:" + online + "!========"); ctx.channel().id(); log.trace("【webSocket】:========" + ctx.channel().id() + "!========"); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception { //接收到的消息 SubroutineManage subroutineManage = AppUtil.getBean(SubroutineManage.class); TextWebSocketFrame txFrame = (TextWebSocketFrame) frame; log.debug("【webSocket】:服务器接收到:{}", "来自:【" + ctx.channel().remoteAddress() + "】 的数据:" + txFrame.text()); WebScoketMsg msgContent = JSONUtil.toBean(txFrame.text(), WebScoketMsg.class); String messageType = msgContent.getMessageType().toString(); String account = msgContent.getSender(); if (StringUtil.isNotEmpty(account)) { Set channelIdList = NettyConfig.getUserChanIdMap().get(account); if(BeanUtils.isNotEmpty(channelIdList)){ channelIdList.add(ctx.channel().id()); }else{ channelIdList = new LinkedHashSet(); channelIdList.add(ctx.channel().id()); NettyConfig.getUserChanIdMap().put(account,channelIdList); } } if (messageType.equals("ping")) { //修改子程序的现在状态 String message = msgContent.getMessage(); NettyConfig.getSubroutineChanIdMap().put(ctx.channel().id().asShortText(),message); /* if(StringUtil.isNotEmpty(message)){ subroutineManage.updateStatusByHeartbeat(message,"NORMAL"); }*/ //心跳响应 ObjectNode obj = JsonUtil.getMapper().createObjectNode(); obj.put(WebsocketConst.MSG_TXT, "心跳响应时间:"+DateUtils.now());//消息内容 ctx.channel().writeAndFlush(new TextWebSocketFrame(obj.toString())); } else if (messageType.equals("topic")) { String message = msgContent.getMessage(); NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(message)); } else if (messageType.equals("msg")) { //处理私聊的任务,如果对方也在线,则推送消息 String jsrAccounts = msgContent.getReceiver();//chatId可以是多个人的账号,以,隔开 if(StringUtil.isNotEmpty(jsrAccounts)){ String[] accountStr = jsrAccounts.split(","); for(String jsraccount : accountStr){ Set channelIdList = NettyConfig.getUserChanIdMap().get(jsraccount); if(BeanUtils.isNotEmpty(channelIdList)){ for(ChannelId chanId : channelIdList){ Channel userChannel = NettyConfig.getChannelGroup().find(chanId); if (userChannel != null) { userChannel.writeAndFlush(new TextWebSocketFrame(msgContent.getMessage())); } } } } } } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.trace("【webSocket】:handlerRemoved 被调用" + ctx.channel().id().asLongText()); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); online = NettyConfig.getChannelGroup().size(); removeUserId(ctx); log.trace("【webSocket】:========下线:在线人数:" + online + "!========"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("【webSocket】:异常:{}", cause.getMessage(), cause); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); } /** * 删除用户与channel的对应关系 * * @param ctx */ private void removeUserId(ChannelHandlerContext ctx) { AttributeKey key = AttributeKey.valueOf("account"); String account = ctx.channel().attr(key).get(); if (StringUtil.isNotEmpty(account)) { Set channelIdList = NettyConfig.getUserChanIdMap().get(account); if(BeanUtils.isNotEmpty(channelIdList)){ channelIdList.remove(ctx.channel().id()); } if(channelIdList.size()==0){ NettyConfig.getUserChanIdMap().remove(account); } } //删除子程序的在线状态 // SubroutineManage subroutineManage = AppUtil.getBean(SubroutineManage.class); // String message = NettyConfig.getSubroutineChanIdMap().get(ctx.channel().id().asShortText()); // subroutineManage.updateStatusByHeartbeat(message,"NONE"); NettyConfig.getSubroutineChanIdMap().remove(ctx.channel().id().asShortText()); log.info("【webSocket】:"+ctx.channel().remoteAddress() + "断开连接!"); } /** * 发送用户为登陆消息:
* * @param ctx
* @author yangxiaodong
* @taskId
*/ public void sendUserNotLoginMsg(ChannelHandlerContext ctx) { ChannelFuture future = ctx.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.POLICY_VIOLATION, "未登录!")); future.addListener(ChannelFutureListener.CLOSE); } }