简介

HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术,属于应用层协议。它基于TCP传输协议,并复用HTTP的握手通道。

优点:

  • 支持双向通信,实时性更强。

  • 更好的二进制支持。

  • 较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。

  • 支持扩展。ws协议定义了扩展,用户可以扩展协议,或者实现自定义的子协议。(比如支持自定义压缩算法等

使用场景

websocket是前后端通过长连接通信的常用解决方案,相比于定时轮询的方式,突出的就是一个时效性,对于消息的接收和推送是实时的。

什么场景下会用到 websocket 呢?

站内信

websocket 的双向通信能让我们迅速联想到实时聊天、点赞评论通知、站内信等等,那接下来就具体讲讲使用 websocket 实现站内信的设计思路。

分布式场景下,当服务端由于访问压力过高,启动两个服务的时候,那么客户端连接就会出现 session 不共享的问题,在服务端B上根本没有客户端a的session信息,那么必然是不能进行发送的:

image-20240507145005489

redis+websocket+springboot

使用 redis 可以很好地解决这个问题,还记得登录时也遇到过 session 不共享地问题吗?那也是使用 redis + JWT 来解决的。

image-20240507150417178

redis 监听配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

/**
* @description: redis监听配置类
* @author:weirx
* @date:2021/3/22 14:08
* @version:3.0
*/
@Configuration
public class RedisConfig {

/**
* description: 手动注册Redis监听到IOC
*
* @param redisConnectionFactory
* @return: org.springframework.data.redis.listener.RedisMessageListenerContainer
* @author: weirx
* @time: 2021/3/22 14:11
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
return container;
}
}

webSocket配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @description: websocket配置类
* @author:weirx
* @date:2021/3/22 14:11
* @version:3.0
*/
@Configuration
public class WebSocketConfig {

/**
* description: 这个配置类的作用是要注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。
* 如果是使用独立的servlet容器,而不是直接使用springboot的内置容器,
* 就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
*
* @return: org.springframework.web.socket.server.standard.ServerEndpointExporter
* @author: weirx
* @time: 2021/3/22 14:12
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}

redis工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class RedisUtil {

@Autowired
private StringRedisTemplate stringRedisTemplate;


/**
* 发布
*
* @param key
*/
public void publish(String key, String value) {
stringRedisTemplate.convertAndSend(key, value);
}
}

定义SubscribeListener监听,实现MessageListener接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import javax.websocket.Session;
import java.io.IOException;

/**
* @description: redis监听
* @author:weirx
* @date:2021/3/22 14:16
* @version:3.0
*/
@Slf4j
public class SubscribeListener implements MessageListener {

/**
* 当前websocket的session
*/
private Session session;

public Session getSession() {
return session;
}

public void setSession(Session session) {
this.session = session;
}

@Override
public void onMessage(Message message, byte[] bytes) {
String msg = new String(message.getBody());
if (ObjectUtil.isNotEmpty(session) && session.isOpen()) {
try {
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
log.info("发送消息异常,msg = {} , e = {}", msg, e);
}
}
}
}

WebSocket服务提供类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* description: @ServerEndpoint 注解是一个类层次的注解,
* 它的功能主要是将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,
* 客户端可以通过这个URL来连接到WebSocket服务器端使用springboot的唯一区别是要@Component声明下,
* 而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。
*
* @author: weirx
* @time: 2021/3/22 14:31
*/
@Slf4j
@Component
@ServerEndpoint("/websocket/server/{loginName}")
public class WebSocketServer {

/**
* 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
*/
private RedisMessageListenerContainer redisMessageListenerContainer =
ApplicationContextProvider.getBean(RedisMessageListenerContainer.class);

/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static AtomicInteger onlineCount = new AtomicInteger(0);

/**
* concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。
* 若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
*/
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;

/**
* redis监听
*/
private SubscribeListener subscribeListener;

/**
* 连接建立成功调用的方法
*
* @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
@OnOpen
public void onOpen(@PathParam("loginName") String loginName, Session session) {
this.session = session;
//加入set中
webSocketSet.add(this);
//在线数加1
addOnlineCount();
log.info("有新连接[" + loginName + "]加入!当前在线人数为{}", getOnlineCount());
subscribeListener = new SubscribeListener();
subscribeListener.setSession(session);
//设置订阅topic
redisMessageListenerContainer.addMessageListener(
subscribeListener, new ChannelTopic(Constants.TOPIC_PREFIX + loginName));

}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() throws IOException {
//从set中删除
webSocketSet.remove(this);
//在线数减1
subOnlineCount();
redisMessageListenerContainer.removeMessageListener(subscribeListener);
log.info("有一连接关闭!当前在线人数为{}", getOnlineCount());
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("来自客户端的消息:{}", message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
log.info("发送消息异常:msg = {}", e);
continue;
}
}
}

/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.info("发生错误,{}", error);
}

/**
* 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
*
* @param message
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}

public int getOnlineCount() {
return onlineCount.get();
}

public void addOnlineCount() {
WebSocketServer.onlineCount.getAndIncrement();
}

public void subOnlineCount() {
WebSocketServer.onlineCount.getAndDecrement();
}

}

redis消息发布:

1
2
3
4
5
6
7
8
9
@Autowired
private RedisUtil redisUtil;

@Override
public Result send(String loginName, String msg) {
//推送站内信webSocket
redisUtil.publish("TOPIC" + loginName, msg);
return Result.success();
}

代理配置

springcloud gateway是目前在微服务当中使用较为广泛的网关,我们可以通过以下配置达到websocket的动态代理: 静态路由配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
cloud:
gateway:
discovery:
locator:
enabled: true # 开启从注册中心动态创建路由的功能,利用微服务名进行路由
routes:
- id: websocket # 路由 ID,保持唯一
uri: lb:ws://inbox-model # uri指目标服务地址,lb代表从注册中心获取服务
predicates:
- Path=/websocket/server/**
filters:
- StripPrefix=0

动态路由配置:使用动态路由,需要网关支持且开放动态路由功能

1
2
3
4
5
6
7
8
9
10
11
[{
"id": "websocket",
"order": 2,
"predicates": [{
"args": {
"pattern": "/websocket/server/**"
},
"name": "Path"
}],
"uri": "lb:ws://inbox-model"
}]

nginx 配置:

1
2
3
4
5
6
7
8
9
location /websocket {
proxy_pass http://gateway;
#以下三项是官方指定配置
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
#超时时间的配置,如果不配置,在一段时间内会导致客户端接收不到消息,根据登录session的超时时间保持一致就行
proxy_read_timeout 3600s;
}

表设计

系统通知(System Notice)

系统通知一般是由后台管理员发出,然后指定某一类(全体,个人等)用户接收。基于此设想,可以把系统通知大致分为两张表:

  1. t_manager_system_notice(管理员系统通知表) :记录管理员发出的通知 ;
  2. t_user_system_notice(用户系统通知表) : 存储用户接受的通知。

t_manager_system_notice(管理员系统通知表)

字段名 类型 描述
system_notice_id bigint 系统通知id
title varchar 标题
content text 内容
type varchar 发送用户群体:single,all,vip
state tinyint 是否被拉取过
recipient_id bigint 接收通知的用户id,若type不是single,则该值为0
manager_id bigint 发布通知的管理员id
publish_time timestamp 发布时间

t_user_system_notice(用户系统通知表)

字段名 类型 描述
user_notice_id bigint 主键id
state tinyint 是否已读
system_notice_id bigint 系统通知的id
recipient_id bigint 接收通知的用户id
pull_time timstamp 拉取时间

当管理员发布一条通知后,将通知插入 t_manager_system_notice 表中,然后系统定时的从 t_manager_system_notice 表中拉取通知,然后根据通知的 type 将通知插入 t_user_system_notice 表中。

如果通知的 type 是 single 的,那就只需要插入一条记录到 t_user_system_notice 中。如果是全体用户,那么就需要将一个通知批量根据不同的用户 ID 插入到 t_user_system_notice 中,这个数据量就需要根据平台的用户量来计算。

举个例子: 管理员 A 发布了一个活动的通知,他需要将这个通知发布给全体用户,当拉取时间到来时,系统会将这一条通知取出。随后系统到用户表中查询选取所有用户的 ID,然后将这一条通知的信息根据所有用户的 ID,批量插入 t_user_system_notice 中。用户需要查看系统通知时,从 t_user_system_notice 表中查询就行了。

注意:

  1. 因为一次拉取的数据量可能很大,所以两次拉取的时间间隔可以设置的长一些。
  2. 拉取 t_manager_system_notice 表中的通知时,需要判断 state,如果已经拉取过,就不需要重复拉取, 否则会造成重复消费。

事件提醒(EventRemind)

之所以称提醒类型的消息为事件提醒,是因为此类消息均是通过用户的行为产生的,如下:

  • xxx 在某个评论中@了你;
  • xxx 点赞了你的文章;
  • xxx 点赞了你的评论;
  • xxx 回复了你的文章;
  • xxx 回复了你的评论。

事件提醒表 t_event_remind

字段名 类型 描述
event_remind_id bigint 消息id
action varchar 动作类型,如点赞、艾特、回复等
source_id bigint 事件源id,如评论id、文章id
source_type varchar 事件源类型:comment、post等
source_content varchar 事件源内容,比如回复的内容、评论等
url varchar 事件所发生的地点链接 url
state tinyint 是否已读
sender_id bigint 操作者id,即谁关注了你、at了你
recipient_id bigint 接受通知的用户id
remind_time timestatmp 提醒时间
消息聚合

消息聚合只适用于事件提醒,以聚合之后的点赞消息来说:

  • 100 人 {点赞} 了你的 {文章 ID = 1} :《A》;
  • 100 人 {点赞} 了你的 {文章 ID = 2} :《B》;
  • 100 人 {点赞} 了你的 {评论 ID = 3} :《C》;

聚合之后的消息明显有两个特征,即:action 和 source type,这是系统消息和私信都不具备的, 所以我个人认为事件提醒的设计要稍微比系统消息和私信复杂。

如何聚合?

稍稍观察下聚合的消息就可以发现:某一类的聚合消息之间是按照 source type 和 source id 来分组的, 因此我们可以得出以下伪 SQL:

1
2
SELECT * FROM t_event_remind WHERE recipient_id = 用户ID
AND action = 点赞 AND state = FALSE GROUP BY source_id , source_type;

SQL 层面的结果集处理还是很麻烦的,所以我的想法先把用户所有的点赞消息先查出来, 然后在程序里面进行分组,这样会简单不少。

站内消息系统的设计

聊天室表 t_private_chat

字段名 类型 描述
private_chat_id bigint 聊天室id
user1_id bigint 用户1的id
user2_id bigint 用户2的id
last_message varchar 最后一条消息的内容

私信表 t_private_message

字段名 类型 描述
private_message_id bigint 私信id
content text 私信内容
state tinyint 是否已读
sender_remove tinyint 发送消息的人是否删除掉了这条消息
recipient_remove tinyint 接受人是否把这条消息从聊天记录删除
sender_id bigint 发送者id
recipent_id bigint 接收者id
send_time timestamp 发送时间
消息设置

可以看到 b 站还添加了陌生人选项,也就是说如果给你发送私信的用户不是你关注的用户,那么视之为陌生人私信,就不接受。

以下是我对于消息设置的设计:

字段名 类型 描述
user_id bigint 用户id
like_message tinyint 是否接收点赞消息
reply_message tinyint 是否接收回复消息
at_message tinyint 是否接收at消息
stranger_message tinyint 是否接收陌生人私信

扩展性设计

实现了站内信功能以后,要是以后需要实现邮件、短信呢?拿到一个需求进行功能设计的时候,往往需要考虑其扩展性。

为了给这个需求后续的扩展附加功能,同时代码的改动最小化,我们可以引入策略模式来实现。

  • 消息类型分为:站内信,短信,邮件
  • 推送范围分为:个人,组织机构,地域,用户组

定义抽象类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class ISendMessageService {

/**
* description: 发送

* @return: com.botany.spore.core.result.Result
* @author: weirx
* @time: 2021/3/18 13:37
* @param inboxMessageTemplateDOMap
* @param personMap
* @param inboxBusinessConfigDO
* @param claimUser
*/
public abstract Result send(InboxMessageTemplateDO inboxMessageTemplateDOMap, Map<String, Object> personMap,
InboxBusinessConfigDO inboxBusinessConfigDO, String claimUser);

}

在调用发消息时,如何使用?又如何确定我发送的是何种消息,走哪个实现类?看如下调用位置代码:

1
2
3
4
5
6
7
8
private void sendMessage(String messageType, Map<String, InboxMessageTemplateDO> templateDOMap,
Map<String, Object> personListByPushRange, InboxBusinessConfigDO inboxBusinessConfigDO, String username) {
for (String msgType : messageType.split(Constants.COMMA)) {
ISendMessageService sendMessageService =
SendMessageTypeEnum.newInstance(SendMessageTypeEnum.getEnum(msgType));
sendMessageService.send(templateDOMap.get(msgType), personListByPushRange, inboxBusinessConfigDO, username);
}
}

枚举类内部:根据不同的code会返回不同的bean实例。

1
2
3
4
5
6
7
8
9
/**
* 站内信
*/
MESSAGE("sendMessageService", "站内信") {
@Override
public ISendMessageService create(String code) {
return (ISendMessageService) ApplicationContextProvider.getBean(code);
}
}

到上面为止,一个简单的策略模式就完成了。假如我们后面扩展微信,那么只需要新增实现类,同时在枚举中添加新的枚举就好了,不需要修改整个发送消息业务代码。同理,推送范围也是如此。

平台与客户端通信

RPA项目中,启动一个RPA应用实际上是通过xxl-job的executor执行器,而xxl-job内定位到的执行器都是内网ip,那平台侧是怎么找到指定的执行器的呢?

这其中就存在平台侧和客户端侧的通信了。实际上,只要在xxl-job上建立一个与客户端通信的长链接,那么客户端的回调、注册等操作都能被监测到,而且平台也能指使客户端去做一些操作,不需要将平台和客户端放到同一个网段下。

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/**
* websocket 服务端处理类
* @author rpa
* @version 1.0
* @date 2022/8/15 13:48
*/
@Slf4j
@ServerEndpoint(path = "${websocket.path}", port = "${websocket.port}", maxFramePayloadLength = "${websocket.maxFramePayloadLength}", bossLoopGroupThreads = "${websocket.bossLoopGroupThreads}", workerLoopGroupThreads = "${websocket.workerLoopGroupThreads}")
public class ServerWebSocketEndpoint {

/**
* 握手前调用
* @param session 会话
* @param headers 请求头
*/
@BeforeHandshake
public void handshake(Session session, HttpHeaders headers){
String clientId = headers.get(ExecutorConstant.WEBSOCKET_HEADER_CLIENT_ID);
if(StringUtils.hasText(clientId)){
// 校验 token
String token = headers.get(ExecutorConstant.WEBSOCKET_HEADER_AUTHORIZATION);
String tenantCode = headers.get(ExecutorConstant.WEBSOCKET_HEADER_TENANT_CODE);

ReturnT<String> t = JwtRsaUtil.verifyToken(token, tenantCode);
if(t.getCode() == ReturnT.SUCCESS_CODE){
// 缓存 clientId、username
session.setAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID, clientId);
session.setAttribute(AdminConstant.WEBSOCKET_ATTR_USERNAME, t.getContent());
return;
}
}
session.close();
}

/**
* 连接成功后调用
* @param session 会话
*/
@OnOpen
public void onOpen(Session session){
String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID);
log.info("websocket连接建立,连接管理器加入对应连接:clientId:{}", clientId);
ServerSessionManager.add(clientId, session);
}

/**
* 收到消息
* @param session 会话
* @param payload 消息
*/
@OnMessage
public void onMessage(Session session, String payload) {
String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID);
// 解密
String decrypt = AesUtil.decrypt(payload, clientId);
if(decrypt != null){
Message message = JSON.parseObject(decrypt, Message.class);
if(message.isAnswer()){
this.handleResponse(message);
}else{
this.handleRequest(message, session);
}
}
}

/**
* 处理响应
* @param message 消息
*/
private void handleResponse(Message message) {
String eventId = message.getEventId();
if(eventId != null){
MessageContext messageContext = ServerSessionManager.messageContextMap.get(eventId);
if(messageContext != null){
messageContext.setResponse(message.getBody());
}else{
log.warn("收到响应时,请求已断开:message:{}", message);
}
}else{
log.warn("响应消息缺少事件id:message:{}", message);
}
}

/**
* 处理请求
* @param message 消息
* @param session 会话
*/
private void handleRequest(Message message, Session session) {
String uri = message.getPath(), eventId = message.getEventId(), body = message.getBody();
ReturnT t = null;
try{
if (AdminConstant.INTERFACE_URL_CALLBACK.equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(body, List.class, HandleCallbackParam.class);
t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().callback(callbackParamList);
} else if (AdminConstant.INTERFACE_URL_REGISTRY.equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(body, RegistryParam.class);
t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().registry(registryParam);
} else if (AdminConstant.INTERFACE_URL_REGISTRY_REMOVE.equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(body, RegistryParam.class);
t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().registryRemove(registryParam);
} else if (AdminConstant.INTERFACE_URL_TASK_LIST.equals(uri)){
String clientId = GsonTool.fromJson(body, String.class);
t = XxlJobAdminConfig.getAdminConfig().getAdminBiz().taskList(clientId);
}else {
t = new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
}catch (Throwable e){
log.warn("websocket请求处理异常:message:{}; error:{}", message, e.getMessage());
t = new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID);
ServerSessionManager.response(clientId, eventId, t);
}

/**
* 连接异常
* @param session 会话
* @param throwable 异常
*/
@OnError
public void onError(Session session, Throwable throwable) {
String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID);
log.warn("websocket连接发生异常:clientId:{}; error:{}", clientId, throwable.getMessage());
}

/**
* 连接关闭
* @param session 会话
*/
@OnClose
public void onClose(Session session){
String clientId = session.getAttribute(AdminConstant.WEBSOCKET_ATTR_CLIENT_ID);
if(clientId != null){
log.info("websocket连接关闭,连接管理器移除对应连接:clientId:{}", clientId);
ServerSessionManager.remove(clientId);
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* websocket 客户端处理类
* @author rpa
* @version 1.0
* @date 2022/8/15 13:48
*/
@Slf4j
public class WebSocketClientHandler extends AbstractWebSocketHandler {

/** 执行器 */
private ExecutorBiz executorBiz;

public WebSocketClientHandler(ExecutorBiz executorBiz) {
this.executorBiz = executorBiz;
}

/**
* 连接成功后调用
* @param session 会话
*/
@Override
public void afterConnectionEstablished(WebSocketSession session){
// 设置传入的文本最大限制
session.setTextMessageSizeLimit(ExecutorConstant.WEBSOCKET_INCOMING_TEXT_MAXIMUM_SIZE);
}

/**
* 收到文本消息
* @param session 会话
* @param textMessage 消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage textMessage){
String clientId = session.getHandshakeHeaders().getFirst(ExecutorConstant.WEBSOCKET_HEADER_CLIENT_ID);
String payload = textMessage.getPayload();
// 解密
String decrypt = AesUtil.decrypt(payload, clientId);
if(decrypt != null){
Message message = JSON.parseObject(decrypt, Message.class);
if(message.isAnswer()){
this.handleResponse(message);
}else{
this.handleRequest(message);
}
}
}

/**
* 处理响应
* @param message 消息
*/
private void handleResponse(Message message) {
String eventId = message.getEventId();
if(eventId != null){
MessageContext messageContext = WebSocketClient.messageContextMap.get(eventId);
if(messageContext != null){
messageContext.setResponse(message.getBody());
}else{
log.warn("收到响应时,请求已断开:message:{}", message);
}
}else{
log.warn("响应消息缺少事件id:message:{}", message);
}
}

/**
* 处理请求
* @param message 消息
*/
private void handleRequest(Message message) {
String uri = message.getPath(), eventId = message.getEventId(), body = message.getBody();
ReturnT t = null;
try{
if (ExecutorConstant.INTERFACE_URL_BEAT.equals(uri)) {
t = executorBiz.beat();
} else if (ExecutorConstant.INTERFACE_URL_IDLE_BEAT.equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(body, IdleBeatParam.class);
t = executorBiz.idleBeat(idleBeatParam);
} else if (ExecutorConstant.INTERFACE_URL_GLOBAL_IDLE_BEAT.equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(body, IdleBeatParam.class);
t = executorBiz.globalIdleBeat(idleBeatParam);
} else if (ExecutorConstant.INTERFACE_URL_RUN.equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(body, TriggerParam.class);
t = executorBiz.run(triggerParam);
} else if (ExecutorConstant.INTERFACE_URL_KILL.equals(uri)) {
KillParam killParam = GsonTool.fromJson(body, KillParam.class);
t = executorBiz.kill(killParam);
} else if (ExecutorConstant.INTERFACE_URL_LOG.equals(uri)) {
LogParam logParam = GsonTool.fromJson(body, LogParam.class);
t = executorBiz.log(logParam);
} else {
t = new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
}catch (Throwable e){
log.warn("websocket请求处理异常:message:{}; error:{}", message, e.getMessage());
t = new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
WebSocketClient.response(eventId, t);
}

/**
* 连接异常
* @param session 会话
* @param throwable 异常信息
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable throwable){
log.warn("websocket连接发生异常:sessionId:{}; error:{}", session.getId(), throwable.getMessage());
}

/**
* 连接关闭
* @param session 会话
* @param closeStatus 关闭状态
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus){
log.info("websocket连接关闭:sessionId:{}", session.getId());
WebSocketClient.removeSession();
}

/**
* 是否支持分片消息(不支持)
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}

定时任务清除长连接

如果项目中引用了 websocket,往往需要有个定时任务定期清除websocket的长链接。原因就是,比如用户直接关掉了浏览器,这样后端没有办法知道这个长链接已经关闭了,所以需要定期清除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/**
* websocket会话缓存管理
* <p>
* * @author heshengsheng
*/
@Slf4j
public class StandardSessionCache {

/**
* 会话缓存,可以多开窗口,即一个用户可以同时有多个websocket连接,存储结构是ConcurrentMap中存放Set,map的key是一个用户,set集合中是一个用户的多个websocket连接
* 用非线程安全的set原因:即使用线程安全的set,也无法保证set和map跨集合操作的数据一致性(即线程安全的set仅保证单个集合操作的线程安全),多个集合操作的数据一致性需要自己加锁实现,既然自己要加更大粒度的锁保证数据的一致性,就无需用线程安全的set(普通set性能更高)
* 关于synchronized的性能:set是每个key(用户)一个,对每个key的set加锁竞争极小,synchronized有一个锁升级的过程,没有竞争的时候是偏向锁(偏向锁是在对象头上做标记,不会产生系统调用),不影响性能
*/
private static ConcurrentMap<String, Set<Session>> sessionSetMap = new ConcurrentHashMap<String, Set<Session>>();

/**
* sessionSetMap中key的拼装规则
*/
private static String getUserKey(Long userId) {
return String.valueOf(userId);
}

/**
* 添加会话到缓存,多线程安全
*/
public static void addSession(Long userId, Session session) {
String key = getUserKey(userId);
addSession(key, session);
}

private static void addSession(String key, Session session) {
addSession(key, session, 1);
}

private static void addSession(String key, Session session, int count) {
Set sessionSet = sessionSetMap.get(key);
if (sessionSet == null) {
//没有key
sessionSet = new HashSet();
sessionSet.add(session);
//新增成功返回null,失败返回原来的Set(前面虽然读取到没有key,但是可能有多个线程同时新增,所以要用putIfAbsent)
Set putSet = sessionSetMap.putIfAbsent(key, sessionSet);
if (putSet != null) {
//存在key,加锁添加会话到set中(被其他线程先新增key的情况)
addSessionLock(putSet, key, session, count);
}
} else {
//存在key,加锁添加会话到set中
addSessionLock(sessionSet, key, session, count);
}
}

/**
* 加锁添加会话到set中
*/
private static void addSessionLock(Set sessionSet, String key, Session session, int count) {
//用set对象加锁
synchronized (sessionSet) {
//双重检测(加锁成功之前,set可能被其他线程remove)
if (sessionSet == sessionSetMap.get(key)) {
//set没有变化,添加session
sessionSet.add(session);
} else {
//set有变化,重试操作(避免意外问题,最多操作10次)
if (count < 10) {
addSession(key, session, ++count);
}
}
}
}

/**
* 删除缓存中的会话,多线程安全
*/
public static void removeSession(Long userId, Session session) {
String key = getUserKey(userId);
removeSession(key, session);
}

private static void removeSession(String key, Session session) {
Set sessionSet = sessionSetMap.get(key);
if (sessionSet != null) {
//存在key,用set对象加锁
synchronized (sessionSet) {
//删除session
sessionSet.remove(session);
if (sessionSet.size() == 0) {
//set大小为0,删除key
sessionSetMap.remove(key, sessionSet);
}
}
}
}

/**
* 按key发送信息
*/
public static void sendByKey(Long userId, String msg) {
String key = getUserKey(userId);
sendByKey(key, msg);
}

private static void sendByKey(String key, String msg) {
Set<Session> sessionSet = sessionSetMap.get(key);
if (sessionSet != null) {
//读取的时候加锁(synchronized没有竞争的时候是偏向锁,不影响性能)
synchronized (sessionSet) {
for (Session session : sessionSet) {
try {
if (session.isOpen()) {
session.getAsyncRemote().sendText(msg);
}
} catch (Exception e) {
log.warn("sendByKey error, key:{}, session:{}, msg:{}", key, session, msg, e);
}
}
}
}
}

/**
* 清除缓存中已关闭的会话(定时任务调用,避免缓存中有未删除的已关闭会话,可以半小时到1小时调用一次)
*/
public static void clearClosedSession() {
log.info("clearClosedSession 执行前map size:{}", sessionSetMap.size());
for (String key : sessionSetMap.keySet()) {
try {
Set<Session> sessionSet = sessionSetMap.get(key);
if (sessionSet != null) {
//读取的时候加锁(synchronized没有竞争的时候是偏向锁,不影响性能)
synchronized (sessionSet) {
//没有数据就清除
if (sessionSet.size() == 0) {
sessionSetMap.remove(key, sessionSet);
continue;
}

Iterator<Session> it = sessionSet.iterator();
while (it.hasNext()) {
Session session = it.next();
try {
//用isOpen还是isActive判断
if (!session.isOpen()) {
//删除session
it.remove();
if (sessionSet.size() == 0) {
//set大小为0,删除key
sessionSetMap.remove(key, sessionSet);
}
}
} catch (Exception e) {
log.warn("clearClosedSession error, key:{}, session:{}", key, session, e);
}
}
}
}
} catch (Exception e) {
log.warn("clearClosedSession error, key:{}", key, e);
}
}
log.info("clearClosedSession 执行后map size:{}", sessionSetMap.size());
}
}