Skip to main content

Netty 源码解析

· 13 min read
Le Dai
Sr Soft Engineer

Netty NioEventLoopGroup 基于Java NIO 实现的netty事件处理循环组 初始化线程数 优先从系统环境变量 默认CPU core * 2 使用jdk 提供的默认线程工厂管理

/**
* Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
* the same time.
*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
//初始化线程数 优先从系统环境变量 默认CPU core * 2
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}

/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
* EventExecutorChooserFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}

@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}

@Override
public EventLoop next() {
return (EventLoop) super.next();
}

@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}

@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}

public class TestServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();//初始化线程工厂 线程数量
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();//
ServerBootstrap channel = serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitalizer());


//使用之前的线程工厂创建 NioServerSocketChannel 实例对象 对象初始化调用java nio 底层SelectorProvider.provider() 并注册SelectionKey.OP_ACCEPT
ChannelFuture future = serverBootstrap.bind(8899).sync();
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}
//bind 方法绑定 netty内默认do开头的都是私有方法   
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

//注册初始化
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//调用工厂创建channel 也就是.channel(NioServerSocketChannel.class) 设置的时候 通过反射的方式创建 在构造内调用了java nio 的 provider.openServerSocketChannel() 在父级中AbstractChannel 还初始化了ChannelPipeline
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thre
return regFuture;
}


//初始化ChannelPipeline 部分 AbstractChannelHandlerContext 内部由一个双向链表实现 内部包含eventloop group eventhandler 处理器 后续调用addLast
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, tru
tail = new TailContext(this);
head = new HeadContext(thi
head.next = tail;
tail.prev = head;
}

//往Pipeline 加入处理handler
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handle
newCtx = newContext(group, filterName(name, handler), handle
addLast0(newCt
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
//channel ChannelPipeline模型
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
* p.addLast("1", new InboundHandlerA());
* p.addLast("2", new InboundHandlerB());
* p.addLast("3", new OutboundHandlerA());
* p.addLast("4", new OutboundHandlerB());
* p.addLast("5", new InboundOutboundHandlerX());
* 执行顺序 in -> 1 2 5 out-> 5 4 3

ChannelInitializer 继承了 ChannelInboundHandlerAdapter 本身是一个InboundHandler 通过初始化 加入所有处理handler 然后移除自己

ChannelInboundHandlerAdapter 通用适配器 减少开发需要

SimpleChannelInboundHandler 继承上面适配器

有范型 特点了处理指定类型数据 还有一个引用计数器 主要用于 清空msg 底层bytebuffer 资源 默认调用 可以手动修改

ReferenceCounted 初始化为1 retain +1 release -1 为0时候 引用为空 下一次gc 清除

ChannelInboundHandlerAdapter

netty 两种发送数据方式

ChannelHandlerContext writeAndFlush and channel writeAndFlush 对于前一种方式来说 数据会从当前handler的下一个handler 传递发送短路的机制 而 channel 则是会从最后一个往前 经过每个handler 最终到达另一端

如果ChannelHandlerContext 不影响数据 则调用ChannelHandlerContext writeAndFlush 会减少处理流程

Netty Future

JDK 提供的Future 只能通过手工方式判断执行的结果 而这个操作会阻塞; netty 则是对ChannelFuture 提供了增强 通过ChannelFutureListener 以回调的方式以获取执行结果 回调依然是 channel thread 回调 ChannelFutureListener operationcomplete 方法 若有耗时操作依然需要异步 去除了手工检查阻塞操作 通过

Netty 缓冲区

1.heap buffer 堆上缓冲区

优点:数据存储与堆上内存 可以快速创建 释放 并且提供直接访问内部字节数组方法

缺点:通过网络传输的时候 需要额外的一次数据copy 从堆上copy 到 os 堆外

2.direct buffer 直接缓冲区

优点:在 进行socket 传输时候 性能较好 因为数据直接存储在os 堆外内存无需额外copy动作

缺点:创建 释放 比对堆内慢 并且创建流程相对复杂

Netty 通过内存池的方式解决这个问题 复用

缺点:通过网络传输的时候 需要额外的一次数据copy 从堆上copy 到 os 堆外

3.composite buffer 复合缓冲区

对于后端业务消息处理逻辑推荐使用heapbuffer 对于需要I/O 读写缓冲区时候 推荐使用directbuffer

Netty的 ByteBuf 与 jdk ByteBuffer 区别

ByteBuf 相对于 ByteBuffer 增加了动态扩容的机制 索引维护 类似jdk 的 list 对数组的动态扩容

使用了 两个指针 readerindex and writeindex 读写分离不需要调用 flip 以及 rewind

Netty AbstractReferenceCountedByteBuf 引用计数器

private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

private volatile int refCnt = 1;

private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}∂∂∂
return this;
}


private boolean release0(int decrement) {
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}

volatile refCnt 计数器 进行原子更新 使用AtomicIntegerFieldUpdater CAS 自旋锁的方式

这里为何没有使用AtomicInteger 因为在netty 运行过程中会有大量bytebuf 实例 如果使用AtomicInteger 会有大量实例产生 所以使用全局唯一 静态常量AtomicIntegerFieldUpdater 减少实例的创建

当引用计数器为0时 调用deallocate 销毁方法

非池化的bytebuf 堆上的 将引用指向null 堆外内存 调用unsafe native 销毁

池化的bytebuf 会将内存还回缓冲池

TCP 粘包与拆包

因为tcp 网络传输的机制问题 可能会导致在tcp 连接过程中 会出现 一条活着多条数据在一起被接收 活着 在一起被发送 所以需要手动拆包

å

Netty 编码与解码器

ByteToMessageDecoder 顶级基础解码器 将byte 缓冲区数据转换为内部业务数据message

需要注意 转换message的时候 bytebuf内数据剩余可读长度是否够 如果不够一定要保证readindex 返回之前的初始化值 需要手动保证拆包正确性

ReplayingDecoder

是ByteToMessageDecoder 的变种 在继承ReplayingDecoder 无需考虑缓冲区的数据长度问题

/**
*A specialized variation of {@link ByteToMessageDecoder} which enables implementation
* of a non-blocking decoder in the blocking I/O paradigm.
* <p>
* The biggest difference between {@link ReplayingDecoder} and
* {@link ByteToMessageDecoder} is that {@link ReplayingDecoder} allows you to
* implement the {@code decode()} and {@code decodeLast()} methods just like
* all required bytes were received already, rather than checking the
* availability of the required bytes. For example, the following
* {@link ByteToMessageDecoder} implementation:
*/

LineBasedFrameDecoder 行解码器

FixedLengthFrameDecoder 固定长度解码器

DelimiterBasedFrameDecoder 分隔符解码器

LengthFieldBasedFrameDecoder 基于长度字段解码器

Netty 模型

1.一个EventLoopGroup当中会包含一个或多个EventLoop 2.一个EventLoop在它的整个生命周期当中都只会与唯一的一个thread 进行绑定 3.所有由EventLoop所处理的各种I/O事件都将在它所关联的那个Thread上进行处理 4.一个Channel在它的整个生命周期只会注册一个EventLoop上 5.一个EventLoop在运行过程中,会被分配一个或者多个Channel上.

也就是说在channel 连接到服务器以后 对于同一个channel 处理 handler 流程 是同一个线程处理的 保证对于同一个channel处理的同步性 channel是线程安全的 并且channel的处理一定会顺序执行 内部使用queue 实现 若在处理handler 过程中有耗时操作 最好使用异步线程池去操作 因为会阻塞该线程上的其他任务

Netty 高性能设计:

1.底层封装jdk nio 设计思路完全基于 Reactor 模型 事件驱动的方式

java nio pdf java nio pdf

2.使用AdaptiveRecvByteBufAllocator 自适应缓冲区buffer 分配策略

默认1024 byte 最低64 最高65536 根据上一次recive package大小 推断下次大小 达到动态扩容缩容 内置了 int[] 大小数组 以16的倍数作为分界值

3.使用jvm direct memory 堆外内存 利用操作系统的 zero-copy 策略提高io 性能

根据操作系统 和 classpath内是否有unsafe 包 以及系统属性 -d 手动指定

Reactor 模型

Reactor模型

1.Hanle (具柄描述符) 表示一个个事件 外部事件比如是外部客户端连接的事件 内部事件 操作系统定时器的事件

  1. Synchronous Event Demultiplexer (同步事件分离器): 本身是一个系统的调用 , 用于等待事件的发生 调用方在调用他的时候会被阻塞 一直 阻塞到有事件发生为止 对于Linux 来说就是指常用的 io 多路复用机制 比如 select poll epoll 在java nio 种对应的就是Selector 对应堵塞方法就是select()
  2. Event Handler (事件处理器) : 本身又多个回调方法构成 是应用层面对某个事件发生的反馈机制 在netty内就是内置Handler 在触发某些事件后的回调事件
  3. Concrete Event Handler (具体事件处理器impl) 是事件处理器的具体实现业务service 对于netty来说 也就是用户自定义hanler
  4. Initiation Dispatcher(初始化分发器) 也就是java nio 中的Reactor 用于控制事件的调度方式 同时又提供了应用进行尸检处理器的注册 删除等操作, 分离事件 调用事件处理器 最后调用相关回调方法调用

Reactor模式流程

  1. 当应用向Initiation Dispatcher 注册具体事件处理器时, 应用会标识出该事件希望Initiation Dispatcher 在某个事件发生时向其通知该事件, 该事件与hanle关联.
  2. Initiation Dispatcher 会要求每个事件处理器向其传递内部的hanle. 该hanle 向操作系统标识了事件处理器
  3. 当所有的事件处理器注册完毕后, 应用会调用hanle_events方法来启动Initiation Dispatcher的事件循环. 这时 Initiation Dispatcher会将每个注册事件管理器的hanle 合并起来并使用同步事件分离器等待这些事件的发生. 比如说 TCP协议层会使用select 同步事件分离器来等待客户端发送的数据到达连接的sockect hanle上.
  4. 当与某个事件源对应的hanle变成ready状态时候 (比如 tcp socket 变成等待读状态时) 同步事件分离器就会通知Initiation Dispatcher
  5. Initiation Dispatcher 会出发事件处理器的回调方法, 从而响应这个处于ready状态的hanle .当事件发生时 Initiation Dispatcher会将事件源激活的hanle 作为key 来寻找并分发恰当的事件处理器回调方法
  6. Initiation Dispatcher 会回调事件处理器的hanle_events回调方法来执行特定于应用的功能

(开发者自己所编写的功能) 从而响应这个事件, 所发生的事件类型可以作为该方法参数并被该方法内部使用来执行额外的特定与服务的分离与分发.