初识Netty

Netty 是一个 NIO 客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。有了Netty,你可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。

Netty 的 I/O 模型是基于非阻塞 I/O 实现的,底层依赖的是 JDK NIO 框架的多路复用器 Selector。一个多路复用器 Selector 可以同时轮询多个 Channel,采用 epoll 模式后,只需要一个线程负责 Selector 的轮询,就可以接入成千上万的客户端。

Netty 所采用的主从 Reactor 多线程模型,所有的 I/O 事件都注册到一个 I/O 多路复用器上,当有 I/O 事件准备就绪后,I/O 多路复用器会将该 I/O 事件通过事件分发器分发到对应的事件处理器中。该线程模型避免了同步问题以及多线程切换带来的资源开销,真正做到高性能、低延迟。

NIO的特点:

  1. 一个线程可以处理多个通道,减少线程创建数量;
  2. 读写非阻塞,节约资源:没有可读/可写数据时,不会发生阻塞导致线程资源的浪费

为什么选择Netty

高性能低延迟

  • 异步非阻塞 I/O 和事件驱动模型:Netty 能够在一个或少量线程下同时处理多个连接的读写事件,避免了大量线程因阻塞等待 I/O 操作而导致的资源浪费,从而显著提高了系统的吞吐量和性能。

  • 零拷贝技术的有效利用:许多网络框架在数据传输过程中涉及多次数据复制,而 Netty 的零拷贝技术能够减少数据在内存中的复制次数。例如,在文件传输或大数据量的网络通信场景下,这可以极大地减少 CPU 开销,提高数据传输效率。

高度的灵活性和可定制性

  • 编解码框架的插件式设计:Netty 提供了灵活的编解码框架,支持用户轻松插入自定义的编解码器。允许针对不同的协议(如自定义的二进制协议、特殊的文本协议等)编写编解码器。
  • 管道(Pipeline)机制的强大功能:Netty 的管道机制允许构建一个由多个处理器(Handler)组成的处理链。

广泛的协议支持和良好的跨平台性

  • 多种协议支持:Netty 对多种常见的网络协议(如 HTTP、WebSocket、TCP、UDP 等)提供了很好的支持,并且能够方便地在不同协议之间切换或同时支持多种协议。
  • 跨平台性强:基于 Java 语言编写的 Netty 具有良好的跨平台特性,能够在不同的操作系统(如 Windows、Linux、Mac 等)上运行,并且保持性能和功能的一致性。

如果不用netty,使用原生JDK的话,有如下问题:

1、API复杂,需要对多线程很熟悉:因为NIO涉及到Reactor模式

2、可定制化的线程模型,用户可以通过启动的配置参数选择 Reactor 线程模型;另一个是可扩展的事件驱动模型,将框架层和业务层的关注点分离。大部分情况下,开发者只需要关注 ChannelHandler 的业务逻辑实现。

3、Netty 更加可靠稳定,修复和完善了 JDK NIO 较多已知问题,例如臭名昭著的 select 空转导致 CPU 消耗 100%,TCP 断线重连,keep-alive 检测等问题。

而Netty来说,他的api简单、性能高而且社区活跃(dubbo、rocketmq等都使用了它)

Netty组件

image-20250103181703382

Channel

Channel是 Java NIO 的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。

ServerBootstrap 与 Bootstrap

Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。

Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。

ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。

EventLoop与EventLoopGroup

EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。

一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。

EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。

ChannelHandler 与 ChannelPipeline

ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。

ChannelFuture

Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。

Netty 的异步编程模型都是建立在 Future 与回调概念之上的。

Netty+SpringBoot的使用

假设要搭建一个基于 Netty 的 WebSocket 服务端应用,这段代码就可以作为基础的配置启动部分。

例如,应用是一个在线聊天系统的后端服务端,通过 WebSocket 与前端的网页客户端或者移动端客户端建立长连接来实时收发聊天消息。

bossGroup 负责接收众多客户端发起的 WebSocket 连接请求,一旦连接建立成功,对应的客户端连接通道就会被交给 workerGroup 中的 EventLoop 来处理后续的消息收发等操作。

SO_BACKLOG 设置为 128 可以应对一定量的并发连接请求排队情况,SO_KEEPALIVE 开启保活机制确保长时间空闲的连接依然处于可用状态,

而 NettyWebsocketHandlerInit 中添加的各种 ChannelHandler 则会负责对客户端发送过来的聊天消息进行解码、按照业务逻辑处理(如将消息转发给对应的接收方等)以及编码后再回复给客户端等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
@Slf4j
public class NettyServer implements InitializingBean {

private final NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final NioEventLoopGroup workerGroup = new NioEventLoopGroup(2);

@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

public void start() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new NettyWebsocketHandlerInit());
ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
log.info("***********Netty服务器启动成功**************");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
log.error("Netty启动异常:{}",e.getMessage(),e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

@Override
public void afterPropertiesSet() throws Exception {
new Thread(()->{ new NettyServer().start();}).start();
}
}

针对不同的协议,可以使用不同的Handler去处理,也支持自定义Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class NettyWebsocketHandlerInit  extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 1 -支持http协议
socketChannel.pipeline().addLast(new HttpServerCodec());
// 2 -支持大数据
socketChannel.pipeline().addLast(new ChunkedWriteHandler());
// 3 -支持http消息整合
socketChannel.pipeline().addLast(new HttpObjectAggregator(10240));
// 4 -自定义handler 重新定义uri需要在WebSocketServerProtocolHandler之前
socketChannel.pipeline().addLast(new NettyWebsocketHandler());
socketChannel.pipeline().addLast(new NettyServerIdleHandler(15,60,0));
// 5 -支持websocket协议
socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10485760));
}
}

这里给出一个支持WebSocket协议的自定义Handler的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Slf4j
public class NettyWebsocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("WebSocket-连接成功:{}", ctx.channel().remoteAddress());
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 首次连接是FullHttpRequest,处理参数
if (msg instanceof FullHttpRequest) {
// 校验请求头、msg,根据不同的消息做不同的业务处理
// 可以在其中根据消息类型进行分发,将TextWebSocketFrame类型的消息转发到专门的channelRead0 - 类似的处理方法中。
} catch (Exception e) {
log.error("连接异常:{}", e.getMessage());
ctx.channel().close();
}
}
super.channelRead(ctx, msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
channelCloseHandler(ctx.channel(), true, cause);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channelCloseHandler(ctx.channel(), false, null);
}


@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
// 只接收 TextWebSocketFrame 类型的消息
}


private void channelCloseHandler(Channel channel, boolean isError, Throwable cause) {
// 根据不同的情况remove掉channel中的消息
}
}

使用 channel.writeAndFlush() 可以实现发送消息,可以定义一个WebSocketContext去封装发消息的内容

1
2
3
4
5
6
7
8
9
10
11
public class WebSocketContext {

public static TextWebSocketFrame sendMessage(String message,boolean success) {
return new TextWebSocketFrame(JSONObject.toJSONString(success ? R.ok(message) : R.fail(message)));
}

public static void sendMessage(Long userId, WebsocketResponseEnum msgType, Object message, WebsocketSourceEnum sourceEnum){
channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(R.restResult(message, msgType.getCode(), msgType.getDesc()))));

}
}

Redis发布订阅模型

Netty虽然可以解决单实例通信的问题,但是如果线上一个服务有多个实例,可能会使得实例A发送的消息未能同步到实例B,可能会漏掉某些用户的消息。针对于这种情况,需要引入Redis发布订阅模型将消息广播到各个实例上去。

image-20241231182153201

配置一个 RedisMessageListenerContainer,用于监听 Redis 消息队列中的消息,并在接收到消息时调用指定的消息处理方法。

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class RedisReceiver {

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,RedisMessageListener messageListener) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(messageListener,"onMessage");
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(WebSocketContext.SOCKET_REDIS_TOPIC));
return container;
}
}

封装redis广播消息的方法。

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class RedisSubscribe {
@Resource
private RedisTemplate redisTemplate;

private void sendMessage(List<MessageEntity> message) {
redisTemplate.convertAndSend(WebSocketContext.SOCKET_REDIS_TOPIC, JSONObject.toJSONString(message));
}
}

监听redis广播消息,并使用netty发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
@Slf4j
public class RedisMessageListener implements MessageListener {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public void onMessage(Message message, byte[] bytes) {

String channel = new String(message.getChannel());
if(WebSocketContext.SOCKET_REDIS_TOPIC.equals(channel)){
try{
String data = mapper.readValue(message.getBody(), String.class);
log.info("Redis Received message on channel:{} body:{} ", channel, data);
List<MessageEntity> messageEntities = JSONObject.parseArray(data, MessageEntity.class);
for(MessageEntity messageEntity:messageEntities){
WebSocketContext.sendMessage(messageEntity);
}
}catch (Exception e){
log.error("redis message error:{}",e.getMessage(),e);
}
}

}
}