package com.artfess.base.webSocket; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.artfess.base.conf.NettyConfig; import com.artfess.base.constants.WebsocketConst; import com.artfess.base.jwt.JwtTokenHandler; import com.artfess.base.util.AppUtil; import com.artfess.base.util.BeanUtils; import com.artfess.base.util.DateUtils; import com.artfess.base.util.StringUtil; import com.artfess.base.util.TokenUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.LinkedHashSet; import java.util.Set; /** * @author 陈实 * @Package com.artfess.base.webSocket * @date 2021/9/16 9:57 * @Description: websocket处理类 */ @Component @ChannelHandler.Sharable public class WebSocketHandler extends ChannelInboundHandlerAdapter {////TextWebSocketFrame是netty用于处理websocket发来的文本对象 private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class); //在线人数 public static int online; private WebSocketServerHandshaker handshaker; //websocket握手升级绑定页面 String wsFactoryUri = ""; @Value("${netty.ws.endPoint:/websocket}") private String wsUri; /** * 一旦连接,第一个被执行 * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.trace("【webSocket】:handlerAdded 被调用" + ctx.channel().id().asLongText()); // 添加到channelGroup 通道组 NettyConfig.getChannelGroup().add(ctx.channel()); //判断是否登录 /* String userId = AuthenticationUtil.getCurrentUserId(); if(StringUtil.isEmpty(userId)){ com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject(); obj.put(WebsocketConst.MSG_TXT, "人员没登录!");//消息内容 ctx.channel().writeAndFlush(new TextWebSocketFrame(obj.toJSONString())); log.error("人员没登录!"); NettyConfig.getChannelGroup().remove(ctx.channel()); sendUserNotLoginMsg(ctx); return; }*/ online = NettyConfig.getChannelGroup().size(); log.info("【webSocket】:" + ctx.channel().remoteAddress() + "上线了!"); log.trace("【webSocket】:========上线线:在线人数:" + online + "!========"); ctx.channel().id(); log.trace("【webSocket】:========" + ctx.channel().id() + "!========"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } //websocket消息处理(只支持文本) public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // 关闭请求 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // ping请求 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 只支持文本格式,不支持二进制消息 if (frame instanceof TextWebSocketFrame) { //接收到的消息 TextWebSocketFrame txFrame = (TextWebSocketFrame) frame; log.trace("【webSocket】:服务器接收到:{}", "来自:【" + ctx.channel().remoteAddress() + "】 的数据:" + txFrame.text()); JSONObject jsonObject = JSONUtil.parseObj(txFrame.text()); String messageType = jsonObject.getStr("messageType"); String account = jsonObject.getStr("account"); if (StringUtil.isNotEmpty(account)) { Set channelIdList = NettyConfig.getUserChanIdMap().get(account); if (BeanUtils.isNotEmpty(channelIdList)) { channelIdList.add(ctx.channel().id()); } else { channelIdList = new LinkedHashSet(); channelIdList.add(ctx.channel().id()); NettyConfig.getUserChanIdMap().put(account, channelIdList); } } if (messageType.equals("heartcheck")) { com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject(); obj.put(WebsocketConst.MSG_TXT, "心跳响应时间:" + DateUtils.now());//消息内容 ctx.channel().writeAndFlush(new TextWebSocketFrame(obj.toJSONString())); } else if (messageType.equals("topic")) { NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(jsonObject.toString())); } else if (messageType.equals("user")) { //处理私聊的任务,如果对方也在线,则推送消息 String jsrAccounts = jsonObject.getStr("chatId");//chatId可以是多个人的账号,以,隔开 if (StringUtil.isNotEmpty(jsrAccounts)) { String[] accountStr = jsrAccounts.split(","); for (String jsraccount : accountStr) { Set channelIdList = NettyConfig.getUserChanIdMap().get(jsraccount); if (BeanUtils.isNotEmpty(channelIdList)) { for (ChannelId chanId : channelIdList) { Channel userChannel = NettyConfig.getChannelGroup().find(chanId); if (userChannel != null) { userChannel.writeAndFlush(new TextWebSocketFrame(jsonObject.toString())); } } } } } } } } // 第一次请求是http请求,请求头包括ws的信息 public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // 如果HTTP解码失败,返回HTTP异常 String uri = request.uri(); String token = getUrlParams(uri); log.trace("【webSocket】:登录的用户Token是:{}", token); if (!TokenUtils.verifyToken(token)) { NettyConfig.getChannelGroup().remove(ctx.channel()); sendUserNotLoginMsg(ctx); removeUserId(ctx); log.error("【webSocket】:Token不合法!"); return; } /* AttributeKey key = AttributeKey.valueOf("token"); ctx.channel().attr(key).setIfAbsent(token); NettyConfig.getUserChannelMap().put(token, ctx.channel());*/ JwtTokenHandler jwtTokenHandler = AppUtil.getBean(JwtTokenHandler.class); // 解密获得username,用于和数据库进行对比 String username = jwtTokenHandler.getUsernameFromToken(token); Set list = new LinkedHashSet(); list.add(ctx.channel().id()); NettyConfig.getUserChanIdMap().put(username, list); //如果url包含参数,需要处理 if (uri.contains("?")) { String newUri = uri.substring(0, uri.indexOf("?")); request.setUri(newUri); } if (request instanceof HttpRequest) { HttpMethod method = request.getMethod(); // 如果是websocket请求就握手升级 if (wsUri.equalsIgnoreCase(request.getUri())) { System.out.println(" 【webSocket】:req instanceof HttpRequest"); WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( wsFactoryUri, null, false); handshaker = wsFactory.newHandshaker(request); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { } handshaker.handshake(ctx.channel(), request); } } } /** * 读取数据 * * @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { * log.info("服务器收到消息:{}",msg.text()); *

* // 获取用户名 * JSONObject jsonObject = JSONUtil.parseObj(msg.text()); * String uid = jsonObject.getStr("uid"); *

* if(!TokenUtils.verifyToken(uid)){ * NettyConfig.getChannelGroup().remove(ctx.channel()); * sendUserNotLoginMsg(ctx); * removeUserId(ctx); * return; * } * // 将用户名作为自定义属性加入到channel中,方便随时channel中获取用户名 * AttributeKey key = AttributeKey.valueOf("uid"); * ctx.channel().attr(key).setIfAbsent(uid); *

* // 关联channel * NettyConfig.getUserChannelMap().put(uid,ctx.channel()); * com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject(); * obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);//业务类型 * obj.put(WebsocketConst.MSG_TXT, "心跳响应");//消息内容 * //obj.put("code", 200); * //obj.put("msg", jsonObject.getStr("message")); *

* // 回复消息 * ctx.channel().writeAndFlush(new TextWebSocketFrame(obj.toJSONString())); * } */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.trace("【webSocket】:handlerRemoved 被调用" + ctx.channel().id().asLongText()); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); online = NettyConfig.getChannelGroup().size(); removeUserId(ctx); log.trace("【webSocket】:========下线:在线人数:" + online + "!========"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("【webSocket】:异常:{}", cause.getMessage(), cause); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); } private static String getUrlParams(String url) { if (!url.contains("=")) { return null; } String token = url.substring(url.indexOf("=") + 1); return token; } /** * 删除用户与channel的对应关系 * * @param ctx */ private void removeUserId(ChannelHandlerContext ctx) { AttributeKey key = AttributeKey.valueOf("account"); String account = ctx.channel().attr(key).get(); if (StringUtil.isNotEmpty(account)) { Set channelIdList = NettyConfig.getUserChanIdMap().get(account); if (BeanUtils.isNotEmpty(channelIdList)) { channelIdList.remove(ctx.channel().id()); } if (channelIdList.size() == 0) { NettyConfig.getUserChanIdMap().remove(account); } } log.info("【webSocket】:" + ctx.channel().remoteAddress() + "断开连接!"); } /** * 发送用户为登陆消息:
* * @param ctx
* @author yangxiaodong
* @taskId
*/ public void sendUserNotLoginMsg(ChannelHandlerContext ctx) { ChannelFuture future = ctx.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.POLICY_VIOLATION, "未登录!")); future.addListener(ChannelFutureListener.CLOSE); } }