简介
HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术,属于应用层协议。它基于TCP传输协议,并复用HTTP的握手通道。
优点:
使用场景
websocket是前后端通过长连接通信的常用解决方案,相比于定时轮询的方式,突出的就是一个时效性,对于消息的接收和推送是实时的。
什么场景下会用到 websocket 呢?
站内信
websocket 的双向通信能让我们迅速联想到实时聊天、点赞评论通知、站内信等等,那接下来就具体讲讲使用 websocket 实现站内信的设计思路。
分布式场景下,当服务端由于访问压力过高,启动两个服务的时候,那么客户端连接就会出现 session 不共享的问题,在服务端B上根本没有客户端a的session信息,那么必然是不能进行发送的:

redis+websocket+springboot
使用 redis 可以很好地解决这个问题,还记得登录时也遇到过 session 不共享地问题吗?那也是使用 redis + JWT 来解决的。

redis 监听配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Configuration public class RedisConfig {
@Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); return container; } }
|
webSocket配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Configuration public class WebSocketConfig {
@Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
|
redis工具类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Component public class RedisUtil {
@Autowired private StringRedisTemplate stringRedisTemplate;
public void publish(String key, String value) { stringRedisTemplate.convertAndSend(key, value); } }
|
定义SubscribeListener监听,实现MessageListener接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import cn.hutool.core.util.ObjectUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener;
import javax.websocket.Session; import java.io.IOException;
@Slf4j public class SubscribeListener implements MessageListener {
private Session session;
public Session getSession() { return session; }
public void setSession(Session session) { this.session = session; }
@Override public void onMessage(Message message, byte[] bytes) { String msg = new String(message.getBody()); if (ObjectUtil.isNotEmpty(session) && session.isOpen()) { try { session.getBasicRemote().sendText(msg); } catch (IOException e) { log.info("发送消息异常,msg = {} , e = {}", msg, e); } } } }
|
WebSocket服务提供类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
@Slf4j @Component @ServerEndpoint("/websocket/server/{loginName}") public class WebSocketServer {
private RedisMessageListenerContainer redisMessageListenerContainer = ApplicationContextProvider.getBean(RedisMessageListenerContainer.class);
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
private Session session;
private SubscribeListener subscribeListener;
@OnOpen public void onOpen(@PathParam("loginName") String loginName, Session session) { this.session = session; webSocketSet.add(this); addOnlineCount(); log.info("有新连接[" + loginName + "]加入!当前在线人数为{}", getOnlineCount()); subscribeListener = new SubscribeListener(); subscribeListener.setSession(session); redisMessageListenerContainer.addMessageListener( subscribeListener, new ChannelTopic(Constants.TOPIC_PREFIX + loginName));
}
@OnClose public void onClose() throws IOException { webSocketSet.remove(this); subOnlineCount(); redisMessageListenerContainer.removeMessageListener(subscribeListener); log.info("有一连接关闭!当前在线人数为{}", getOnlineCount()); }
@OnMessage public void onMessage(String message, Session session) { log.info("来自客户端的消息:{}", message); for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { log.info("发送消息异常:msg = {}", e); continue; } } }
@OnError public void onError(Session session, Throwable error) { log.info("发生错误,{}", error); }
public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); }
public int getOnlineCount() { return onlineCount.get(); }
public void addOnlineCount() { WebSocketServer.onlineCount.getAndIncrement(); }
public void subOnlineCount() { WebSocketServer.onlineCount.getAndDecrement(); }
}
|
redis消息发布:
1 2 3 4 5 6 7 8 9
| @Autowired private RedisUtil redisUtil;
@Override public Result send(String loginName, String msg) { redisUtil.publish("TOPIC" + loginName, msg); return Result.success(); }
|
代理配置
springcloud gateway是目前在微服务当中使用较为广泛的网关,我们可以通过以下配置达到websocket的动态代理: 静态路由配置:
1 2 3 4 5 6 7 8 9 10 11 12 13
| spring: cloud: gateway: discovery: locator: enabled: true routes: - id: websocket uri: lb:ws://inbox-model predicates: - Path=/websocket/server/** filters: - StripPrefix=0
|
动态路由配置:使用动态路由,需要网关支持且开放动态路由功能
1 2 3 4 5 6 7 8 9 10 11
| [{ "id": "websocket", "order": 2, "predicates": [{ "args": { "pattern": "/websocket/server/**" }, "name": "Path" }], "uri": "lb:ws://inbox-model" }]
|
nginx 配置:
1 2 3 4 5 6 7 8 9
| location /websocket { proxy_pass http://gateway; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_read_timeout 3600s; }
|
表设计
系统通知(System Notice)
系统通知一般是由后台管理员发出,然后指定某一类(全体,个人等)用户接收。基于此设想,可以把系统通知大致分为两张表:
- t_manager_system_notice(管理员系统通知表) :记录管理员发出的通知 ;
- t_user_system_notice(用户系统通知表) : 存储用户接受的通知。
t_manager_system_notice(管理员系统通知表)
字段名 |
类型 |
描述 |
system_notice_id |
bigint |
系统通知id |
title |
varchar |
标题 |
content |
text |
内容 |
type |
varchar |
发送用户群体:single,all,vip |
state |
tinyint |
是否被拉取过 |
recipient_id |
bigint |
接收通知的用户id,若type不是single,则该值为0 |
manager_id |
bigint |
发布通知的管理员id |
publish_time |
timestamp |
发布时间 |
t_user_system_notice(用户系统通知表)
字段名 |
类型 |
描述 |
user_notice_id |
bigint |
主键id |
state |
tinyint |
是否已读 |
system_notice_id |
bigint |
系统通知的id |
recipient_id |
bigint |
接收通知的用户id |
pull_time |
timstamp |
拉取时间 |
当管理员发布一条通知后,将通知插入 t_manager_system_notice 表中,然后系统定时的从 t_manager_system_notice 表中拉取通知,然后根据通知的 type 将通知插入 t_user_system_notice 表中。
如果通知的 type 是 single 的,那就只需要插入一条记录到 t_user_system_notice 中。如果是全体用户,那么就需要将一个通知批量根据不同的用户 ID 插入到 t_user_system_notice 中,这个数据量就需要根据平台的用户量来计算。
举个例子: 管理员 A 发布了一个活动的通知,他需要将这个通知发布给全体用户,当拉取时间到来时,系统会将这一条通知取出。随后系统到用户表中查询选取所有用户的 ID,然后将这一条通知的信息根据所有用户的 ID,批量插入 t_user_system_notice 中。用户需要查看系统通知时,从 t_user_system_notice 表中查询就行了。
注意:
- 因为一次拉取的数据量可能很大,所以两次拉取的时间间隔可以设置的长一些。
- 拉取 t_manager_system_notice 表中的通知时,需要判断 state,如果已经拉取过,就不需要重复拉取, 否则会造成重复消费。
事件提醒(EventRemind)
之所以称提醒类型的消息为事件提醒,是因为此类消息均是通过用户的行为产生的,如下:
- xxx 在某个评论中@了你;
- xxx 点赞了你的文章;
- xxx 点赞了你的评论;
- xxx 回复了你的文章;
- xxx 回复了你的评论。
事件提醒表 t_event_remind
字段名 |
类型 |
描述 |
event_remind_id |
bigint |
消息id |
action |
varchar |
动作类型,如点赞、艾特、回复等 |
source_id |
bigint |
事件源id,如评论id、文章id |
source_type |
varchar |
事件源类型:comment、post等 |
source_content |
varchar |
事件源内容,比如回复的内容、评论等 |
url |
varchar |
事件所发生的地点链接 url |
state |
tinyint |
是否已读 |
sender_id |
bigint |
操作者id,即谁关注了你、at了你 |
recipient_id |
bigint |
接受通知的用户id |
remind_time |
timestatmp |
提醒时间 |
消息聚合
消息聚合只适用于事件提醒,以聚合之后的点赞消息来说:
- 100 人 {点赞} 了你的 {文章 ID = 1} :《A》;
- 100 人 {点赞} 了你的 {文章 ID = 2} :《B》;
- 100 人 {点赞} 了你的 {评论 ID = 3} :《C》;
聚合之后的消息明显有两个特征,即:action 和 source type,这是系统消息和私信都不具备的, 所以我个人认为事件提醒的设计要稍微比系统消息和私信复杂。
如何聚合?
稍稍观察下聚合的消息就可以发现:某一类的聚合消息之间是按照 source type 和 source id 来分组的, 因此我们可以得出以下伪 SQL:
1 2
| SELECT * FROM t_event_remind WHERE recipient_id = 用户ID AND action = 点赞 AND state = FALSE GROUP BY source_id , source_type;
|
SQL 层面的结果集处理还是很麻烦的,所以我的想法先把用户所有的点赞消息先查出来, 然后在程序里面进行分组,这样会简单不少。
站内消息系统的设计
聊天室表 t_private_chat
字段名 |
类型 |
描述 |
private_chat_id |
bigint |
聊天室id |
user1_id |
bigint |
用户1的id |
user2_id |
bigint |
用户2的id |
last_message |
varchar |
最后一条消息的内容 |
私信表 t_private_message
字段名 |
类型 |
描述 |
private_message_id |
bigint |
私信id |
content |
text |
私信内容 |
state |
tinyint |
是否已读 |
sender_remove |
tinyint |
发送消息的人是否删除掉了这条消息 |
recipient_remove |
tinyint |
接受人是否把这条消息从聊天记录删除 |
sender_id |
bigint |
发送者id |
recipent_id |
bigint |
接收者id |
send_time |
timestamp |
发送时间 |
消息设置
可以看到 b 站还添加了陌生人选项,也就是说如果给你发送私信的用户不是你关注的用户,那么视之为陌生人私信,就不接受。
以下是我对于消息设置的设计:
字段名 |
类型 |
描述 |
user_id |
bigint |
用户id |
like_message |
tinyint |
是否接收点赞消息 |
reply_message |
tinyint |
是否接收回复消息 |
at_message |
tinyint |
是否接收at消息 |
stranger_message |
tinyint |
是否接收陌生人私信 |
扩展性设计
实现了站内信功能以后,要是以后需要实现邮件、短信呢?拿到一个需求进行功能设计的时候,往往需要考虑其扩展性。
为了给这个需求后续的扩展附加功能,同时代码的改动最小化,我们可以引入策略模式来实现。
- 消息类型分为:站内信,短信,邮件
- 推送范围分为:个人,组织机构,地域,用户组
定义抽象类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public abstract class ISendMessageService {
public abstract Result send(InboxMessageTemplateDO inboxMessageTemplateDOMap, Map<String, Object> personMap, InboxBusinessConfigDO inboxBusinessConfigDO, String claimUser);
}
|
在调用发消息时,如何使用?又如何确定我发送的是何种消息,走哪个实现类?看如下调用位置代码:
1 2 3 4 5 6 7 8
| private void sendMessage(String messageType, Map<String, InboxMessageTemplateDO> templateDOMap, Map<String, Object> personListByPushRange, InboxBusinessConfigDO inboxBusinessConfigDO, String username) { for (String msgType : messageType.split(Constants.COMMA)) { ISendMessageService sendMessageService = SendMessageTypeEnum.newInstance(SendMessageTypeEnum.getEnum(msgType)); sendMessageService.send(templateDOMap.get(msgType), personListByPushRange, inboxBusinessConfigDO, username); } }
|
枚举类内部:根据不同的code会返回不同的bean实例。
1 2 3 4 5 6 7 8 9
|
MESSAGE("sendMessageService", "站内信") { @Override public ISendMessageService create(String code) { return (ISendMessageService) ApplicationContextProvider.getBean(code); } }
|
到上面为止,一个简单的策略模式就完成了。假如我们后面扩展微信,那么只需要新增实现类,同时在枚举中添加新的枚举就好了,不需要修改整个发送消息业务代码。同理,推送范围也是如此。
平台与客户端通信
RPA项目中,启动一个RPA应用实际上是通过xxl-job的executor执行器,而xxl-job内定位到的执行器都是内网ip,那平台侧是怎么找到指定的执行器的呢?
这其中就存在平台侧和客户端侧的通信了。实际上,只要在xxl-job上建立一个与客户端通信的长链接,那么客户端的回调、注册等操作都能被监测到,而且平台也能指使客户端去做一些操作,不需要将平台和客户端放到同一个网段下。
服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
@Slf4j @ServerEndpoint(path = "${websocket.path}", port = "${websocket.port}", maxFramePayloadLength = "${websocket.maxFramePayloadLength}", bossLoopGroupThreads = "${websocket.bossLoopGroupThreads}", workerLoopGroupThreads = "${websocket.workerLoopGroupThreads}") public class ServerWebSocketEndpoint {
@BeforeHandshake public void handshake(Session session, HttpHeaders headers){ String clientId = headers.get(ExecutorConstant.WEBSOCKET_HEADER_CLIENT_ID); if(StringUtils.hasText(clientId)){ String token = headers.get(ExecutorConstant.WEBSOCKET_HEADER_AUTHORIZATION); String tenantCode = headers.get(ExecutorConstant.WEBSOCKET_HEADER_TENANT_CODE);
ReturnT<String> t = JwtRsaUtil.verifyToken(token, tenantCode); if(t.getCode() == ReturnT.SUCCESS_CODE){ session.setAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID, clientId); session.setAttribute(AdminConstant.WEBSOCKET_ATTR_USERNAME, t.getContent()); return; } } session.close(); }
@OnOpen public void onOpen(Session session){ String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID); log.info("websocket连接建立,连接管理器加入对应连接:clientId:{}", clientId); ServerSessionManager.add(clientId, session); }
@OnMessage public void onMessage(Session session, String payload) { String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID); String decrypt = AesUtil.decrypt(payload, clientId); if(decrypt != null){ Message message = JSON.parseObject(decrypt, Message.class); if(message.isAnswer()){ this.handleResponse(message); }else{ this.handleRequest(message, session); } } }
private void handleResponse(Message message) { String eventId = message.getEventId(); if(eventId != null){ MessageContext messageContext = ServerSessionManager.messageContextMap.get(eventId); if(messageContext != null){ messageContext.setResponse(message.getBody()); }else{ log.warn("收到响应时,请求已断开:message:{}", message); } }else{ log.warn("响应消息缺少事件id:message:{}", message); } }
private void handleRequest(Message message, Session session) { String uri = message.getPath(), eventId = message.getEventId(), body = message.getBody(); ReturnT t = null; try{ if (AdminConstant.INTERFACE_URL_CALLBACK.equals(uri)) { List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(body, List.class, HandleCallbackParam.class); t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().callback(callbackParamList); } else if (AdminConstant.INTERFACE_URL_REGISTRY.equals(uri)) { RegistryParam registryParam = GsonTool.fromJson(body, RegistryParam.class); t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().registry(registryParam); } else if (AdminConstant.INTERFACE_URL_REGISTRY_REMOVE.equals(uri)) { RegistryParam registryParam = GsonTool.fromJson(body, RegistryParam.class); t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().registryRemove(registryParam); } else if (AdminConstant.INTERFACE_URL_TASK_LIST.equals(uri)){ String clientId = GsonTool.fromJson(body, String.class); t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().taskList(clientId); }else { t = new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); } }catch (Throwable e){ log.warn("websocket请求处理异常:message:{}; error:{}", message, e.getMessage()); t = new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); } String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID); ServerSessionManager.response(clientId, eventId, t); }
@OnError public void onError(Session session, Throwable throwable) { String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID); log.warn("websocket连接发生异常:clientId:{}; error:{}", clientId, throwable.getMessage()); }
@OnClose public void onClose(Session session){ String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID); if(clientId != null){ log.info("websocket连接关闭,连接管理器移除对应连接:clientId:{}", clientId); ServerSessionManager.remove(clientId); } } }
|
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
@Slf4j public class WebSocketClientHandler extends AbstractWebSocketHandler {
private ExecutorBiz executorBiz;
public WebSocketClientHandler(ExecutorBiz executorBiz) { this.executorBiz = executorBiz; }
@Override public void afterConnectionEstablished(WebSocketSession session){ session.setTextMessageSizeLimit(ExecutorConstant.WEBSOCKET_INCOMING_TEXT_MAXIMUM_SIZE); }
@Override protected void handleTextMessage(WebSocketSession session, TextMessage textMessage){ String clientId = session.getHandshakeHeaders().getFirst(ExecutorConstant.WEBSOCKET_HEADER_CLIENT_ID); String payload = textMessage.getPayload(); String decrypt = AesUtil.decrypt(payload, clientId); if(decrypt != null){ Message message = JSON.parseObject(decrypt, Message.class); if(message.isAnswer()){ this.handleResponse(message); }else{ this.handleRequest(message); } } }
private void handleResponse(Message message) { String eventId = message.getEventId(); if(eventId != null){ MessageContext messageContext = WebSocketClient.messageContextMap.get(eventId); if(messageContext != null){ messageContext.setResponse(message.getBody()); }else{ log.warn("收到响应时,请求已断开:message:{}", message); } }else{ log.warn("响应消息缺少事件id:message:{}", message); } }
private void handleRequest(Message message) { String uri = message.getPath(), eventId = message.getEventId(), body = message.getBody(); ReturnT t = null; try{ if (ExecutorConstant.INTERFACE_URL_BEAT.equals(uri)) { t = executorBiz.beat(); } else if (ExecutorConstant.INTERFACE_URL_IDLE_BEAT.equals(uri)) { IdleBeatParam idleBeatParam = GsonTool.fromJson(body, IdleBeatParam.class); t = executorBiz.idleBeat(idleBeatParam); } else if (ExecutorConstant.INTERFACE_URL_GLOBAL_IDLE_BEAT.equals(uri)) { IdleBeatParam idleBeatParam = GsonTool.fromJson(body, IdleBeatParam.class); t = executorBiz.globalIdleBeat(idleBeatParam); } else if (ExecutorConstant.INTERFACE_URL_RUN.equals(uri)) { TriggerParam triggerParam = GsonTool.fromJson(body, TriggerParam.class); t = executorBiz.run(triggerParam); } else if (ExecutorConstant.INTERFACE_URL_KILL.equals(uri)) { KillParam killParam = GsonTool.fromJson(body, KillParam.class); t = executorBiz.kill(killParam); } else if (ExecutorConstant.INTERFACE_URL_LOG.equals(uri)) { LogParam logParam = GsonTool.fromJson(body, LogParam.class); t = executorBiz.log(logParam); } else { t = new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); } }catch (Throwable e){ log.warn("websocket请求处理异常:message:{}; error:{}", message, e.getMessage()); t = new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); } WebSocketClient.response(eventId, t); }
@Override public void handleTransportError(WebSocketSession session, Throwable throwable){ log.warn("websocket连接发生异常:sessionId:{}; error:{}", session.getId(), throwable.getMessage()); }
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus){ log.info("websocket连接关闭:sessionId:{}", session.getId()); WebSocketClient.removeSession(); }
@Override public boolean supportsPartialMessages() { return false; } }
|
定时任务清除长连接
如果项目中引用了 websocket,往往需要有个定时任务定期清除websocket的长链接。原因就是,比如用户直接关掉了浏览器,这样后端没有办法知道这个长链接已经关闭了,所以需要定期清除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
@Slf4j public class StandardSessionCache {
private static ConcurrentMap<String, Set<Session>> sessionSetMap = new ConcurrentHashMap<String, Set<Session>>();
private static String getUserKey(Long userId) { return String.valueOf(userId); }
public static void addSession(Long userId, Session session) { String key = getUserKey(userId); addSession(key, session); }
private static void addSession(String key, Session session) { addSession(key, session, 1); }
private static void addSession(String key, Session session, int count) { Set sessionSet = sessionSetMap.get(key); if (sessionSet == null) { sessionSet = new HashSet(); sessionSet.add(session); Set putSet = sessionSetMap.putIfAbsent(key, sessionSet); if (putSet != null) { addSessionLock(putSet, key, session, count); } } else { addSessionLock(sessionSet, key, session, count); } }
private static void addSessionLock(Set sessionSet, String key, Session session, int count) { synchronized (sessionSet) { if (sessionSet == sessionSetMap.get(key)) { sessionSet.add(session); } else { if (count < 10) { addSession(key, session, ++count); } } } }
public static void removeSession(Long userId, Session session) { String key = getUserKey(userId); removeSession(key, session); }
private static void removeSession(String key, Session session) { Set sessionSet = sessionSetMap.get(key); if (sessionSet != null) { synchronized (sessionSet) { sessionSet.remove(session); if (sessionSet.size() == 0) { sessionSetMap.remove(key, sessionSet); } } } }
public static void sendByKey(Long userId, String msg) { String key = getUserKey(userId); sendByKey(key, msg); }
private static void sendByKey(String key, String msg) { Set<Session> sessionSet = sessionSetMap.get(key); if (sessionSet != null) { synchronized (sessionSet) { for (Session session : sessionSet) { try { if (session.isOpen()) { session.getAsyncRemote().sendText(msg); } } catch (Exception e) { log.warn("sendByKey error, key:{}, session:{}, msg:{}", key, session, msg, e); } } } } }
public static void clearClosedSession() { log.info("clearClosedSession 执行前map size:{}", sessionSetMap.size()); for (String key : sessionSetMap.keySet()) { try { Set<Session> sessionSet = sessionSetMap.get(key); if (sessionSet != null) { synchronized (sessionSet) { if (sessionSet.size() == 0) { sessionSetMap.remove(key, sessionSet); continue; }
Iterator<Session> it = sessionSet.iterator(); while (it.hasNext()) { Session session = it.next(); try { if (!session.isOpen()) { it.remove(); if (sessionSet.size() == 0) { sessionSetMap.remove(key, sessionSet); } } } catch (Exception e) { log.warn("clearClosedSession error, key:{}, session:{}", key, session, e); } } } } } catch (Exception e) { log.warn("clearClosedSession error, key:{}", key, e); } } log.info("clearClosedSession 执行后map size:{}", sessionSetMap.size()); } }
|