This commit is contained in:
2023-08-27 12:14:19 +08:00
parent 260381856c
commit 64cb57062d
3 changed files with 327 additions and 339 deletions

View File

@@ -14,7 +14,7 @@ netty作为一个异步NIO框架多线程肯定是它的基础但是对于
先看下EventExecutorGroup的定义
```
```java
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>
```
@@ -24,18 +24,18 @@ EventExecutorGroup继承自JDK的ScheduledExecutorService即可以执行定
EventExecutorGroup有两个和Iterable相关的方法分别是next和iterator
```
EventExecutor next();
```java
EventExecutor next();
@Override
Iterator<EventExecutor> iterator();
@Override
Iterator<EventExecutor> iterator();
```
在EventExecutorGroup中调用next方法会返回一个EventExecutor对象那么EventExecutorGroup和EventExecutor是什么关系呢
我们再来看一下EventExecutor的定义
```
```java
public interface EventExecutor extends EventExecutorGroup
```
@@ -63,7 +63,7 @@ EventExecutorGroup中其他的方法都是一些对JDK中ScheduledExecutorServic
在EventExecutor中它重写了这个方法
```
```java
@Override
EventExecutor next();
```
@@ -72,21 +72,21 @@ EventExecutor next();
另外因为EventExecutor是由EventExecutorGroup来管理的所以EventExecutor中还存在一个parent方法用来返回管理EventExecutor的EventExecutorGroup
```
```java
EventExecutorGroup parent();
```
EventExecutor中新加了两个inEventLoop方法用来判断给定的线程是否在event loop中执行。
```
boolean inEventLoop();
```java
boolean inEventLoop();
boolean inEventLoop(Thread thread);
boolean inEventLoop(Thread thread);
```
EventExecutor还提供两个方法可以返回Promise和ProgressivePromise.
```
```java
<V> Promise<V> newPromise();
<V> ProgressivePromise<V> newProgressivePromise();
```
@@ -97,10 +97,10 @@ ProgressivePromise更进一步在Promise基础上提供了一个progress
除此之外EventExecutor还提供了对Succeeded的结果和Failed异常封装成为Future的方法。
```
<V> Future<V> newSucceededFuture(V result);
```java
<V> Future<V> newSucceededFuture(V result);
<V> Future<V> newFailedFuture(Throwable cause);
<V> Future<V> newFailedFuture(Throwable cause);
```
# EventExecutorGroup在netty中的基本实现
@@ -119,10 +119,10 @@ netty中EventExecutorGroup的默认实现叫做DefaultEventExecutorGroup,它的
在AbstractEventExecutorGroup中几乎所有EventExecutorGroup中的方法实现都是调用next()方法来完成的以submit方法为例
```
```java
public Future<?> submit(Runnable task) {
return next().submit(task);
}
return next().submit(task);
}
```
可以看到submit方法首先调用next获取到的EventExecutor然后再调用EventExecutor中的submit方法。
@@ -133,85 +133,85 @@ MultithreadEventExecutorGroup继承自AbstractEventExecutorGroup提供了多
MultithreadEventExecutorGroup有两类构造函数在构造函数中可以指定多线程的个数还有任务执行器Executor,如果没有提供Executor的话可以提供一个ThreadFactory,MultithreadEventExecutorGroup会调用`new ThreadPerTaskExecutor(threadFactory)`来为每一个线程构造一个Executor
```
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
```java
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
```
MultithreadEventExecutorGroup对多线程的支持是怎么实现的呢
首先MultithreadEventExecutorGroup提供了两个children分别是children和readonlyChildren
```
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
```java
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
```
children和MultithreadEventExecutorGroup中的线程个数是一一对应的有多少个线程children就有多大。
```
```java
children = new EventExecutor[nThreads];
```
然后通过调用newChild方法将传入的executor构造成为EventExecutor返回
```
```java
children[i] = newChild(executor, args);
```
看一下newChild方法的定义
```
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
```java
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
```
这个方法在MultithreadEventExecutorGroup中并没有实现需要在更具体的类中实现它。
readonlyChildren是child的只读版本用来在遍历方法中返回
```
```java
readonlyChildren = Collections.unmodifiableSet(childrenSet);
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
return readonlyChildren.iterator();
}
```
我们现在有了Group中的所有EventExecutor那么在MultithreadEventExecutorGroup中next方法是怎么选择具体返回哪一个EventExecutor呢
先来看一下next方法的定义:
```
```java
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
chooser = chooserFactory.newChooser(children);
public EventExecutor next() {
return chooser.next();
}
return chooser.next();
}
```
next方法调用的是chooser的next方法看一下chooser的next方法具体实现
```
```java
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
```
可以看到其实是一个很简单的根据index来获取对象的操作。
最后看一下DefaultEventExecutorGroup中对newChild方法的实现
```
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
}
```java
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
}
```
newChild返回的EventExecutor使用的是DefaultEventExecutor。这个类是EventExecutor在netty中的默认实现我们在下一小结中详细进行讲解。
@@ -226,7 +226,7 @@ DefaultEventExecutor继承自SingleThreadEventExecutor,而SingleThreadEventExecu
先来看一下AbstractEventExecutor的定义
```
```java
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor
```
@@ -236,97 +236,97 @@ AbstractExecutorService是JDK中的类它提供了 ExecutorService 的一些
AbstractEventExecutor作为ExecutorGroup的一员它提供了一个EventExecutorGroup类型的parent属性
```
```java
private final EventExecutorGroup parent;
public EventExecutorGroup parent() {
return parent;
}
return parent;
}
```
对于next方法来说AbstractEventExecutor返回的是它本身
```
```java
public EventExecutor next() {
return this;
}
return this;
}
```
AbstractScheduledEventExecutor继承自AbstractEventExecutor,它内部使用了一个PriorityQueue来存储包含定时任务的ScheduledFutureTask,从而实现定时任务的功能:
```
```java
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
```
接下来是SingleThreadEventExecutor从名字可以看出SingleThreadEventExecutor使用的是单线程来执行提交的tasksSingleThreadEventExecutor提供了一个默认的pending执行task的任务大小DEFAULT_MAX_PENDING_EXECUTOR_TASKS还定义了任务执行的几种状态
```
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
```java
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
```
之前提到了EventExecutor中有一个特有的inEventLoop方法判断给定的thread是否在eventLoop中在SingleThreadEventExecutor中我们看一下具体的实现
```
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
```java
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
```
具体而言就是判断给定的线程和SingleThreadEventExecutor中定义的thread属性是不是同一个threadSingleThreadEventExecutor中的thread是这样定义的
```
```java
```
这个thread是在doStartThread方法中进行初始化的
```
```java
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
@Override
public void run() {
thread = Thread.currentThread();
```
所以这个thread是任务执行的线程也就是executor中执行任务用到的线程。
再看一下非常关键的execute方法
```
```java
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
```
这个方法首先将task添加到任务队列中然后调用startThread开启线程来执行任务。
最后来看一下DefaultEventExecutor这个netty中的默认实现
```
```java
public final class DefaultEventExecutor extends SingleThreadEventExecutor
```
DefaultEventExecutor继承自SingleThreadEventExecutor这个类中它定义了run方法如何实现
```
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
```java
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
break;
}
}
}
```
在SingleThreadEventExecutor中我们会把任务加入到task queue中在run方法中会从task queue中取出对应的task然后调用task的run方法执行。

View File

@@ -1,11 +1,5 @@
# netty系列之:EventLoop,EventLoopGroup和netty的默认实现
文章目录
[简介](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#简介)[EventLoopGroup和EventLoop](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#EventLoopGroup和EventLoop)[EventLoopGroup在netty中的默认实现](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#EventLoopGroup在netty中的默认实现)[EventLoop在netty中的默认实现](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#EventLoop在netty中的默认实现)[总结](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#总结)
# 简介
在netty中不管是服务器端的ServerBootstrap还是客户端的Bootstrap在创建的时候都需要在group方法中传入一个EventLoopGroup参数用来处理所有的ServerChannel和Channel中所有的IO操作和event。
@@ -16,33 +10,33 @@
EventLoopGroup继承自EventExecutorGroup:
```
```java
public interface EventLoopGroup extends EventExecutorGroup
```
在前面的文章中我们讲过EventExecutorGroup中有一个next方法可以返回对应的EventExecutor这个方法在EventLoopGroup中进行了重写
```
EventLoop next();
```java
EventLoop next();
```
next方法返回的不再是一个EventExecutor而是一个EventLoop。
事实上EventLoop和EventLoopGroup的关系与EventExecutor和EventExecutorGroup的关系有些类似EventLoop也是继承自EventLoopGroupEventLoopGroup是EventLoop的集合。
```
```java
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup
```
在EventLoopGroup中除了重写的next方法之外还添加了channel的注册方法register,用于将channel和注册到EventLoop中从而实现channel和EventLoop的绑定。
```
```java
ChannelFuture register(Channel channel);
```
在EventLoop中自多添加了一个parent方法用来表示EventLoop和EventLoopGroup的关联关系
```
```java
EventLoopGroup parent();
```
@@ -54,7 +48,7 @@ EventLoopGroup在netty中的默认实现叫做DefaultEventLoopGroup先来看
如果看了之前我讲解的关于EventExecutorGroup的朋友可以看出来DefaultEventLoopGroup和DefaultEventExecutorGroup的继承关系是很类似的DefaultEventLoopGroup继承自MultithreadEventLoopGroup,而MultithreadEventLoopGroup又继承自MultithreadEventExecutorGroup并且实现了EventLoopGroup接口
```
```java
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
```
@@ -62,33 +56,33 @@ MultithreadEventLoopGroup是用多线程来处理Event Loop。
在MultithreadEventLoopGroup中定义了一个DEFAULT_EVENT_LOOP_THREADS来存储默认的处理Event Loop线程的个数
```
```java
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
```
对于EventLoopGroup中新加的几个register方法MultithreadEventLoopGroup都是调用对应的next方法来实现的
```
```java
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
return next().register(channel);
}
```
这里的next()方法的实现实际上调用的是父类的next方法也就是MultithreadEventExecutorGroup中的next方法来选择Group管理的一个EventLoop:
```
```java
public EventLoop next() {
return (EventLoop) super.next();
}
return (EventLoop) super.next();
}
```
对于DefaultEventLoopGroup来说它继承自MultithreadEventLoopGroup实现了一个newChild方法用来将传入的executor封装成为EventLoop:
```
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}
```java
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}
```
# EventLoop在netty中的默认实现
@@ -115,23 +109,23 @@ private final Queue<Runnable> tailTasks;
这个tailTasks会被用在任务个数的判断和操作上
```
final boolean removeAfterEventLoopIterationTask(Runnable task) {
return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
```java
final boolean removeAfterEventLoopIterationTask(Runnable task) {
return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
public int pendingTasks() {
return super.pendingTasks() + tailTasks.size();
}
public int pendingTasks() {
return super.pendingTasks() + tailTasks.size();
}
```
SingleThreadEventLoop中对register方法的实现最终调用的是注册的channel中unsafe的register方法
```
```java
channel.unsafe().register(this, promise);
```
@@ -143,20 +137,20 @@ public class DefaultEventLoop extends SingleThreadEventLoop
除了构造函数之外DefaultEventLoop实现了一个run方法,用来具体任务的执行逻辑:
```
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
```java
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
break;
}
}
}
```
如果对比可以发现DefaultEventLoop和DefaultEventExecutor中run方法的实现是一样的。

View File

@@ -1,11 +1,5 @@
# netty系列之:NIO和netty详解
文章目录
[简介](http://www.flydean.com/05-2-netty-nioeventloop/#简介)[NIO常用用法](http://www.flydean.com/05-2-netty-nioeventloop/#NIO常用用法)[NIO和EventLoopGroup](http://www.flydean.com/05-2-netty-nioeventloop/#NIO和EventLoopGroup)[NioEventLoopGroup](http://www.flydean.com/05-2-netty-nioeventloop/#NioEventLoopGroup)[NioEventLoop](http://www.flydean.com/05-2-netty-nioeventloop/#NioEventLoop)[总结](http://www.flydean.com/05-2-netty-nioeventloop/#总结)
# 简介
netty为什么快呢这是因为netty底层使用了JAVA的NIO技术并在其基础上进行了性能的优化虽然netty不是单纯的JAVA nio但是netty的底层还是基于的是nio技术。
@@ -20,7 +14,7 @@ nio的三大核心是Selector,channel和Buffer本文我们将会深入探究N
因为是一个简单的聊天室我们选择Socket协议为基础的ServerSocketChannel,首先就是open这个Server channel:
```
```java
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 9527));
serverSocketChannel.configureBlocking(false);
@@ -28,91 +22,91 @@ serverSocketChannel.configureBlocking(false);
然后向server channel中注册selector:
```
```java
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
```
虽然是NIO但是对于Selector来说它的select方法是阻塞方法只有找到匹配的channel之后才会返回为了多次进行select操作我们需要在一个while循环里面进行selector的select操作
```
```java
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
if (selectionKey.isAcceptable()) {
register(selector, serverSocketChannel);
}
if (selectionKey.isReadable()) {
serverResponse(byteBuffer, selectionKey);
}
iter.remove();
}
Thread.sleep(1000);
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
if (selectionKey.isAcceptable()) {
register(selector, serverSocketChannel);
}
if (selectionKey.isReadable()) {
serverResponse(byteBuffer, selectionKey);
}
iter.remove();
}
Thread.sleep(1000);
}
```
selector中会有一些SelectionKey,SelectionKey中有一些表示操作状态的OP Status,根据这个OP Status的不同selectionKey可以有四种状态分别是isReadable,isWritable,isConnectable和isAcceptable。
当SelectionKey处于isAcceptable状态的时候表示ServerSocketChannel可以接受连接了我们需要调用register方法将serverSocketChannel accept生成的socketChannel注册到selector中以监听它的OP READ状态后续可以从中读取数据
```
private static void register(Selector selector, ServerSocketChannel serverSocketChannel)
throws IOException {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
```java
private static void register(Selector selector, ServerSocketChannel serverSocketChannel)
throws IOException {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
```
当selectionKey处于isReadable状态的时候表示可以从socketChannel中读取数据然后进行处理
```
private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey)
throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes= new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
log.info(new String(bytes).trim());
if(new String(bytes).trim().equals(BYE_BYE)){
log.info("说再见不如不见!");
socketChannel.write(ByteBuffer.wrap("再见".getBytes()));
socketChannel.close();
}else {
socketChannel.write(ByteBuffer.wrap("你是个好人".getBytes()));
}
byteBuffer.clear();
```java
private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey)
throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes= new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
log.info(new String(bytes).trim());
if(new String(bytes).trim().equals(BYE_BYE)){
log.info("说再见不如不见!");
socketChannel.write(ByteBuffer.wrap("再见".getBytes()));
socketChannel.close();
}else {
socketChannel.write(ByteBuffer.wrap("你是个好人".getBytes()));
}
byteBuffer.clear();
}
```
上面的serverResponse方法中从selectionKey中拿到对应的SocketChannel然后调用SocketChannel的read方法将channel中的数据读取到byteBuffer中要想回复消息到channel中还是使用同一个socketChannel然后调用write方法回写消息给client端到这里一个简单的回写客户端消息的server端就完成了。
接下来就是对应的NIO客户端在NIO客户端需要使用SocketChannel,首先建立和服务器的连接:
```
```java
socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527));
```
然后就可以使用这个channel来发送和接受消息了
```
public String sendMessage(String msg) throws IOException {
byteBuffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
socketChannel.write(byteBuffer);
byteBuffer.clear();
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes= new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
response =new String(bytes).trim();
byteBuffer.clear();
return response;
}
```java
public String sendMessage(String msg) throws IOException {
byteBuffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
socketChannel.write(byteBuffer);
byteBuffer.clear();
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes= new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
response =new String(bytes).trim();
byteBuffer.clear();
return response;
}
```
向channel中写入消息可以使用write方法从channel中读取消息可以使用read方法。
@@ -125,13 +119,13 @@ socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527));
以netty的ServerBootstrap为例启动的时候需要指定它的group,先来看一下ServerBootstrap的group方法
```
```java
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
...
// ...
}
```
@@ -139,26 +133,26 @@ ServerBootstrap可以接受一个EventLoopGroup或者两个EventLoopGroupEven
EventLoopGroup只是一个接口我们常用的一个实现就是NioEventLoopGroup,如下所示是一个常用的netty服务器端代码
```
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FirstServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
```java
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 绑定端口并开始接收连接
ChannelFuture f = b.bind(port).sync();
// 等待server socket关闭
f.channel().closeFuture().sync();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FirstServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口并开始接收连接
ChannelFuture f = b.bind(port).sync();
// 等待server socket关闭
f.channel().closeFuture().sync();
```
这里和NIO相关的有两个类分别是NioEventLoopGroup和NioServerSocketChannel事实上在他们的底层还有两个类似的类分别叫做NioEventLoop和NioSocketChannel接下来我们分别讲解一些他们的底层实现和逻辑关系。
@@ -167,31 +161,31 @@ EventLoopGroup只是一个接口我们常用的一个实现就是NioEventLoop
NioEventLoopGroup和DefaultEventLoopGroup一样都是继承自MultithreadEventLoopGroup
```
```java
public class NioEventLoopGroup extends MultithreadEventLoopGroup
```
他们的不同之处在于newChild方法的不同newChild用来构建Group中的实际对象NioEventLoopGroup来说newChild返回的是一个NioEventLoop对象先来看下NioEventLoopGroup的newChild方法
```
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
```java
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
```
这个newChild方法除了固定的executor参数之外还可以根据NioEventLoopGroup的构造函数传入的参数来实现更多的功能。
@@ -212,7 +206,7 @@ SelectorProvider是JDK中的类它提供了一个静态的provider()方法可
SelectStrategyFactory是一个接口里面只定义了一个方法用来返回SelectStrategy:
```
```java
public interface SelectStrategyFactory {
SelectStrategy newSelectStrategy();
@@ -223,12 +217,12 @@ public interface SelectStrategyFactory {
先看下SelectStrategy中定义了哪些Strategy:
```
int SELECT = -1;
```java
int SELECT = -1;
int CONTINUE = -2;
int CONTINUE = -2;
int BUSY_WAIT = -3;
int BUSY_WAIT = -3;
```
SelectStrategy中定义了3个strategy,分别是SELECT、CONTINUE和BUSY_WAIT。
@@ -243,7 +237,7 @@ BUSY_WAIT是一个特殊的strategy是指IO 循环轮询新事件而不阻塞
RejectedExecutionHandler是netty自己的类和 java.util.concurrent.RejectedExecutionHandler类似但是是特别针对SingleThreadEventExecutor来说的。这个接口定义了一个rejected方法用来表示因为SingleThreadEventExecutor容量限制导致的任务添加失败而被拒绝的情况
```
```java
void rejected(Runnable task, SingleThreadEventExecutor executor);
```
@@ -251,7 +245,7 @@ void rejected(Runnable task, SingleThreadEventExecutor executor);
EventLoopTaskQueueFactory是一个接口用来创建存储提交给EventLoop的taskQueue
```
```java
Queue<Runnable> newTaskQueue(int maxCapacity);
```
@@ -263,7 +257,7 @@ Queue<Runnable> newTaskQueue(int maxCapacity);
首先NioEventLoop和DefaultEventLoop一样都是继承自SingleThreadEventLoop
```
```java
public final class NioEventLoop extends SingleThreadEventLoop
```
@@ -271,17 +265,17 @@ public final class NioEventLoop extends SingleThreadEventLoop
首先作为一个NIO的实现必须要有selector在NioEventLoop中定义了两个selector分别是selector和unwrappedSelector
```
private Selector selector;
private Selector unwrappedSelector;
```java
private Selector selector;
private Selector unwrappedSelector;
```
在NioEventLoop的构造函数中他们是这样定义的
```
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
```java
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
```
首先调用openSelector方法然后通过返回的SelectorTuple来获取对应的selector和unwrappedSelector。
@@ -290,7 +284,7 @@ public final class NioEventLoop extends SingleThreadEventLoop
在openSelector方法中首先通过调用provider的openSelector方法返回一个Selector这个Selector就是unwrappedSelector
```
```java
final Selector unwrappedSelector;
unwrappedSelector = provider.openSelector();
```
@@ -299,24 +293,24 @@ unwrappedSelector = provider.openSelector();
DISABLE_KEY_SET_OPTIMIZATION表示的是是否对select key set进行优化:
```
```java
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
return new SelectorTuple(unwrappedSelector);
}
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
```
如果DISABLE_KEY_SET_OPTIMIZATION被设置为false那么意味着我们需要对select key set进行优化具体是怎么进行优化的呢
先来看下最后的返回:
```
```java
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
```
最后返回的SelectorTuple第二个参数就是selector这里的selector是一个SelectedSelectionKeySetSelector对象。
@@ -324,20 +318,20 @@ return new SelectorTuple(unwrappedSelector,
SelectedSelectionKeySetSelector继承自selector构造函数传入的第一个参数是一个delegate,所有的Selector中定义的方法都是通过调用
delegate来实现的不同的是对于select方法来说会首先调用selectedKeySet的reset方法,下面是以isOpen和select方法为例观察一下代码的实现
```
public boolean isOpen() {
return delegate.isOpen();
}
```java
public boolean isOpen() {
return delegate.isOpen();
}
public int select(long timeout) throws IOException {
selectionKeys.reset();
return delegate.select(timeout);
}
public int select(long timeout) throws IOException {
selectionKeys.reset();
return delegate.select(timeout);
}
```
selectedKeySet是一个SelectedSelectionKeySet对象是一个set集合用来存储SelectionKey在openSelector()方法中使用new来实例化这个对象
```
```java
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
```
@@ -345,32 +339,32 @@ netty实际是想用这个SelectedSelectionKeySet类来管理Selector中的selec
首先判断系统中有没有sun.nio.ch.SelectorImpl的实现
```
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
```java
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
```
SelectorImpl中有两个Set字段
```
```java
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
```
这两个字段就是我们需要替换的对象。如果有SelectorImpl的话首先使用Unsafe类调用PlatformDependent中的objectFieldOffset方法拿到这两个字段相对于对象示例的偏移量然后调用putObject将这两个字段替换成为前面初始化的selectedKeySet对象
```
```java
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
@@ -379,30 +373,30 @@ if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
```
如果系统设置不支持Unsafe那么就用反射再做一次
```
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
```java
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
```
在NioEventLoop中我们需要关注的一个非常重要的重写方法就是run方法在run方法中实现了如何执行task的逻辑。
@@ -414,42 +408,42 @@ strategy的值来进行对应的处理。
BUSY_WAIT在NIO中是不支持的如果是SELECT状态那么会在curDeadlineNanos之后再次进行select操作
```
```java
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
```
如果strategy > 0,表示有拿到了SelectedKeys,那么需要调用processSelectedKeys方法对SelectedKeys进行处理
```
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
```java
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
```
上面提到了NioEventLoop中有两个selector还有一个selectedKeys属性这个selectedKeys存储的就是Optimized SelectedKeys如果这个值不为空就调用processSelectedKeysOptimized方法否则就调用processSelectedKeysPlain方法。
@@ -458,18 +452,18 @@ processSelectedKeysOptimized和processSelectedKeysPlain这两个方法差别不
处理的逻辑是首先拿到selectedKeys的key然后调用它的attachment方法拿到attach的对象
```
```java
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
selectedKeys.keys[i] = null;
final Object a = k.attachment();
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
```
如果channel还没有建立连接那么这个对象可能是一个NioTask,用来处理channelReady和channelUnregistered的事件。
@@ -480,7 +474,7 @@ final SelectionKey k = selectedKeys.keys[i];
对第一种情况会调用task的channelReady方法
```
```java
task.channelReady(k.channel(), k);
```