From 64cb57062d8ebf49034a85f477a1d8f152f0a167 Mon Sep 17 00:00:00 2001 From: yanzuoguang Date: Sun, 27 Aug 2023 12:14:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B8=B8=E8=A7=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ntExecutor,EventExecutorGroup和netty中的实现.md | 170 ++++---- ...之EventLoop,EventLoopGroup和netty的默认实现.md | 88 ++-- .../54.netty系列之NIO和netty详解.md | 408 +++++++++--------- 3 files changed, 327 insertions(+), 339 deletions(-) diff --git a/learn-netty文章/52.netty系列之EventExecutor,EventExecutorGroup和netty中的实现.md b/learn-netty文章/52.netty系列之EventExecutor,EventExecutorGroup和netty中的实现.md index e194ece..cbebe0b 100644 --- a/learn-netty文章/52.netty系列之EventExecutor,EventExecutorGroup和netty中的实现.md +++ b/learn-netty文章/52.netty系列之EventExecutor,EventExecutorGroup和netty中的实现.md @@ -14,7 +14,7 @@ netty作为一个异步NIO框架,多线程肯定是它的基础,但是对于 先看下EventExecutorGroup的定义: -``` +```java public interface EventExecutorGroup extends ScheduledExecutorService, Iterable ``` @@ -24,18 +24,18 @@ EventExecutorGroup继承自JDK的ScheduledExecutorService,即可以执行定 EventExecutorGroup有两个和Iterable相关的方法,分别是next和iterator: -``` - EventExecutor next(); +```java +EventExecutor next(); - @Override - Iterator iterator(); +@Override +Iterator iterator(); ``` 在EventExecutorGroup中调用next方法会返回一个EventExecutor对象,那么EventExecutorGroup和EventExecutor是什么关系呢? 我们再来看一下EventExecutor的定义: -``` +```java public interface EventExecutor extends EventExecutorGroup ``` @@ -63,7 +63,7 @@ EventExecutorGroup中其他的方法都是一些对JDK中ScheduledExecutorServic 在EventExecutor中,它重写了这个方法: -``` +```java @Override EventExecutor next(); ``` @@ -72,21 +72,21 @@ EventExecutor next(); 另外,因为EventExecutor是由EventExecutorGroup来管理的,所以EventExecutor中还存在一个parent方法,用来返回管理EventExecutor的EventExecutorGroup: -``` +```java EventExecutorGroup parent(); ``` EventExecutor中新加了两个inEventLoop方法,用来判断给定的线程是否在event loop中执行。 -``` - boolean inEventLoop(); +```java +boolean inEventLoop(); - boolean inEventLoop(Thread thread); +boolean inEventLoop(Thread thread); ``` EventExecutor还提供两个方法可以返回Promise和ProgressivePromise. -``` +```java Promise newPromise(); ProgressivePromise newProgressivePromise(); ``` @@ -97,10 +97,10 @@ ProgressivePromise更进一步,在Promise基础上,提供了一个progress 除此之外,EventExecutor还提供了对Succeeded的结果和Failed异常封装成为Future的方法。 -``` - Future newSucceededFuture(V result); +```java + Future newSucceededFuture(V result); - Future newFailedFuture(Throwable cause); + Future newFailedFuture(Throwable cause); ``` # EventExecutorGroup在netty中的基本实现 @@ -119,10 +119,10 @@ netty中EventExecutorGroup的默认实现叫做DefaultEventExecutorGroup,它的 在AbstractEventExecutorGroup中,几乎所有EventExecutorGroup中的方法实现,都是调用next()方法来完成的,以submit方法为例: -``` +```java public Future submit(Runnable task) { - return next().submit(task); - } + return next().submit(task); +} ``` 可以看到submit方法首先调用next获取到的EventExecutor,然后再调用EventExecutor中的submit方法。 @@ -133,85 +133,85 @@ MultithreadEventExecutorGroup继承自AbstractEventExecutorGroup,提供了多 MultithreadEventExecutorGroup有两类构造函数,在构造函数中可以指定多线程的个数,还有任务执行器Executor,如果没有提供Executor的话,可以提供一个ThreadFactory,MultithreadEventExecutorGroup会调用`new ThreadPerTaskExecutor(threadFactory)`来为每一个线程构造一个Executor: -``` - protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { - this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args); - } +```java +protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { + this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args); +} - protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { - this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); - } +protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { + this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); +} ``` MultithreadEventExecutorGroup对多线程的支持是怎么实现的呢? 首先MultithreadEventExecutorGroup提供了两个children,分别是children和readonlyChildren: -``` - private final EventExecutor[] children; - private final Set readonlyChildren; +```java +private final EventExecutor[] children; +private final Set readonlyChildren; ``` children和MultithreadEventExecutorGroup中的线程个数是一一对应的,有多少个线程,children就有多大。 -``` +```java children = new EventExecutor[nThreads]; ``` 然后通过调用newChild方法,将传入的executor构造成为EventExecutor返回: -``` +```java children[i] = newChild(executor, args); ``` 看一下newChild方法的定义: -``` - protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception; +```java +protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception; ``` 这个方法在MultithreadEventExecutorGroup中并没有实现,需要在更具体的类中实现它。 readonlyChildren是child的只读版本,用来在遍历方法中返回: -``` +```java readonlyChildren = Collections.unmodifiableSet(childrenSet); public Iterator iterator() { - return readonlyChildren.iterator(); - } + return readonlyChildren.iterator(); +} ``` 我们现在有了Group中的所有EventExecutor,那么在MultithreadEventExecutorGroup中,next方法是怎么选择具体返回哪一个EventExecutor呢? 先来看一下next方法的定义: -``` +```java private final EventExecutorChooserFactory.EventExecutorChooser chooser; chooser = chooserFactory.newChooser(children); public EventExecutor next() { - return chooser.next(); - } + return chooser.next(); +} ``` next方法调用的是chooser的next方法,看一下chooser的next方法具体实现: -``` +```java public EventExecutor next() { - return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; - } + return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; +} ``` 可以看到,其实是一个很简单的根据index来获取对象的操作。 最后看一下DefaultEventExecutorGroup中对newChild方法的实现: -``` - protected EventExecutor newChild(Executor executor, Object... args) throws Exception { - return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]); - } +```java +protected EventExecutor newChild(Executor executor, Object... args) throws Exception { + return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]); +} ``` newChild返回的EventExecutor使用的是DefaultEventExecutor。这个类是EventExecutor在netty中的默认实现,我们在下一小结中详细进行讲解。 @@ -226,7 +226,7 @@ DefaultEventExecutor继承自SingleThreadEventExecutor,而SingleThreadEventExecu 先来看一下AbstractEventExecutor的定义: -``` +```java public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor ``` @@ -236,97 +236,97 @@ AbstractExecutorService是JDK中的类,它提供了 ExecutorService 的一些 AbstractEventExecutor作为ExecutorGroup的一员,它提供了一个EventExecutorGroup类型的parent属性: -``` +```java private final EventExecutorGroup parent; public EventExecutorGroup parent() { - return parent; - } + return parent; +} ``` 对于next方法来说,AbstractEventExecutor返回的是它本身: -``` +```java public EventExecutor next() { - return this; - } + return this; +} ``` AbstractScheduledEventExecutor继承自AbstractEventExecutor,它内部使用了一个PriorityQueue来存储包含定时任务的ScheduledFutureTask,从而实现定时任务的功能: -``` +```java PriorityQueue> scheduledTaskQueue; ``` 接下来是SingleThreadEventExecutor,从名字可以看出,SingleThreadEventExecutor使用的是单线程来执行提交的tasks,SingleThreadEventExecutor提供了一个默认的pending执行task的任务大小:DEFAULT_MAX_PENDING_EXECUTOR_TASKS,还定义了任务执行的几种状态: -``` - private static final int ST_NOT_STARTED = 1; - private static final int ST_STARTED = 2; - private static final int ST_SHUTTING_DOWN = 3; - private static final int ST_SHUTDOWN = 4; - private static final int ST_TERMINATED = 5; +```java +private static final int ST_NOT_STARTED = 1; +private static final int ST_STARTED = 2; +private static final int ST_SHUTTING_DOWN = 3; +private static final int ST_SHUTDOWN = 4; +private static final int ST_TERMINATED = 5; ``` 之前提到了EventExecutor中有一个特有的inEventLoop方法,判断给定的thread是否在eventLoop中,在SingleThreadEventExecutor中,我们看一下具体的实现: -``` - public boolean inEventLoop(Thread thread) { - return thread == this.thread; - } +```java +public boolean inEventLoop(Thread thread) { + return thread == this.thread; +} ``` 具体而言就是判断给定的线程和SingleThreadEventExecutor中定义的thread属性是不是同一个thread,SingleThreadEventExecutor中的thread是这样定义的: -``` +```java ``` 这个thread是在doStartThread方法中进行初始化的: -``` +```java executor.execute(new Runnable() { - @Override - public void run() { - thread = Thread.currentThread(); + @Override + public void run() { + thread = Thread.currentThread(); ``` 所以这个thread是任务执行的线程,也就是executor中执行任务用到的线程。 再看一下非常关键的execute方法: -``` +```java private void execute(Runnable task, boolean immediate) { - boolean inEventLoop = inEventLoop(); - addTask(task); - if (!inEventLoop) { - startThread(); + boolean inEventLoop = inEventLoop(); + addTask(task); + if (!inEventLoop) { + startThread(); ``` 这个方法首先将task添加到任务队列中,然后调用startThread开启线程来执行任务。 最后来看一下DefaultEventExecutor,这个netty中的默认实现: -``` +```java public final class DefaultEventExecutor extends SingleThreadEventExecutor ``` DefaultEventExecutor继承自SingleThreadEventExecutor,这个类中,它定义了run方法如何实现: -``` - protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } +```java +protected void run() { + for (;;) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + break; } } +} ``` 在SingleThreadEventExecutor中,我们会把任务加入到task queue中,在run方法中,会从task queue中取出对应的task,然后调用task的run方法执行。 diff --git a/learn-netty文章/53.netty系列之EventLoop,EventLoopGroup和netty的默认实现.md b/learn-netty文章/53.netty系列之EventLoop,EventLoopGroup和netty的默认实现.md index d72386e..d97cf31 100644 --- a/learn-netty文章/53.netty系列之EventLoop,EventLoopGroup和netty的默认实现.md +++ b/learn-netty文章/53.netty系列之EventLoop,EventLoopGroup和netty的默认实现.md @@ -1,11 +1,5 @@ # netty系列之:EventLoop,EventLoopGroup和netty的默认实现 -文章目录 - - - -[简介](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#简介)[EventLoopGroup和EventLoop](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#EventLoopGroup和EventLoop)[EventLoopGroup在netty中的默认实现](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#EventLoopGroup在netty中的默认实现)[EventLoop在netty中的默认实现](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#EventLoop在netty中的默认实现)[总结](http://www.flydean.com/05-1-netty-eventloop-eventloopgroup/#总结) - # 简介 在netty中不管是服务器端的ServerBootstrap还是客户端的Bootstrap,在创建的时候都需要在group方法中传入一个EventLoopGroup参数,用来处理所有的ServerChannel和Channel中所有的IO操作和event。 @@ -16,33 +10,33 @@ EventLoopGroup继承自EventExecutorGroup: -``` +```java public interface EventLoopGroup extends EventExecutorGroup ``` 在前面的文章中我们讲过,EventExecutorGroup中有一个next方法可以返回对应的EventExecutor,这个方法在EventLoopGroup中进行了重写: -``` - EventLoop next(); +```java +EventLoop next(); ``` next方法返回的不再是一个EventExecutor,而是一个EventLoop。 事实上,EventLoop和EventLoopGroup的关系与EventExecutor和EventExecutorGroup的关系有些类似,EventLoop也是继承自EventLoopGroup,EventLoopGroup是EventLoop的集合。 -``` +```java public interface EventLoop extends OrderedEventExecutor, EventLoopGroup ``` 在EventLoopGroup中,除了重写的next方法之外,还添加了channel的注册方法register,用于将channel和注册到EventLoop中,从而实现channel和EventLoop的绑定。 -``` +```java ChannelFuture register(Channel channel); ``` 在EventLoop中,自多添加了一个parent方法,用来表示EventLoop和EventLoopGroup的关联关系: -``` +```java EventLoopGroup parent(); ``` @@ -54,7 +48,7 @@ EventLoopGroup在netty中的默认实现叫做DefaultEventLoopGroup,先来看 如果看了之前我讲解的关于EventExecutorGroup的朋友可以看出来,DefaultEventLoopGroup和DefaultEventExecutorGroup的继承关系是很类似的,DefaultEventLoopGroup继承自MultithreadEventLoopGroup,而MultithreadEventLoopGroup又继承自MultithreadEventExecutorGroup并且实现了EventLoopGroup接口: -``` +```java public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup ``` @@ -62,33 +56,33 @@ MultithreadEventLoopGroup是用多线程来处理Event Loop。 在MultithreadEventLoopGroup中定义了一个DEFAULT_EVENT_LOOP_THREADS来存储默认的处理Event Loop线程的个数: -``` +```java DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( - "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); + "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); ``` 对于EventLoopGroup中新加的几个register方法,MultithreadEventLoopGroup都是调用对应的next方法来实现的: -``` +```java public ChannelFuture register(Channel channel) { - return next().register(channel); - } + return next().register(channel); +} ``` 这里的next()方法的实现实际上调用的是父类的next方法,也就是MultithreadEventExecutorGroup中的next方法,来选择Group管理的一个EventLoop: -``` +```java public EventLoop next() { - return (EventLoop) super.next(); - } + return (EventLoop) super.next(); +} ``` 对于DefaultEventLoopGroup来说,它继承自MultithreadEventLoopGroup,实现了一个newChild方法,用来将传入的executor封装成为EventLoop: -``` - protected EventLoop newChild(Executor executor, Object... args) throws Exception { - return new DefaultEventLoop(this, executor); - } +```java +protected EventLoop newChild(Executor executor, Object... args) throws Exception { + return new DefaultEventLoop(this, executor); +} ``` # EventLoop在netty中的默认实现 @@ -115,23 +109,23 @@ private final Queue tailTasks; 这个tailTasks会被用在任务个数的判断和操作上: -``` - final boolean removeAfterEventLoopIterationTask(Runnable task) { - return tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); - } +```java +final boolean removeAfterEventLoopIterationTask(Runnable task) { + return tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); +} - protected boolean hasTasks() { - return super.hasTasks() || !tailTasks.isEmpty(); - } +protected boolean hasTasks() { + return super.hasTasks() || !tailTasks.isEmpty(); +} - public int pendingTasks() { - return super.pendingTasks() + tailTasks.size(); - } +public int pendingTasks() { + return super.pendingTasks() + tailTasks.size(); +} ``` SingleThreadEventLoop中对register方法的实现最终调用的是注册的channel中unsafe的register方法: -``` +```java channel.unsafe().register(this, promise); ``` @@ -143,20 +137,20 @@ public class DefaultEventLoop extends SingleThreadEventLoop 除了构造函数之外,DefaultEventLoop实现了一个run方法,用来具体任务的执行逻辑: -``` - protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } +```java +protected void run() { + for (;;) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + break; } } +} ``` 如果对比可以发现,DefaultEventLoop和DefaultEventExecutor中run方法的实现是一样的。 diff --git a/learn-netty文章/54.netty系列之NIO和netty详解.md b/learn-netty文章/54.netty系列之NIO和netty详解.md index 4981263..21d4886 100644 --- a/learn-netty文章/54.netty系列之NIO和netty详解.md +++ b/learn-netty文章/54.netty系列之NIO和netty详解.md @@ -1,11 +1,5 @@ # netty系列之:NIO和netty详解 -文章目录 - - - -[简介](http://www.flydean.com/05-2-netty-nioeventloop/#简介)[NIO常用用法](http://www.flydean.com/05-2-netty-nioeventloop/#NIO常用用法)[NIO和EventLoopGroup](http://www.flydean.com/05-2-netty-nioeventloop/#NIO和EventLoopGroup)[NioEventLoopGroup](http://www.flydean.com/05-2-netty-nioeventloop/#NioEventLoopGroup)[NioEventLoop](http://www.flydean.com/05-2-netty-nioeventloop/#NioEventLoop)[总结](http://www.flydean.com/05-2-netty-nioeventloop/#总结) - # 简介 netty为什么快呢?这是因为netty底层使用了JAVA的NIO技术,并在其基础上进行了性能的优化,虽然netty不是单纯的JAVA nio,但是netty的底层还是基于的是nio技术。 @@ -20,7 +14,7 @@ nio的三大核心是Selector,channel和Buffer,本文我们将会深入探究N 因为是一个简单的聊天室,我们选择Socket协议为基础的ServerSocketChannel,首先就是open这个Server channel: -``` +```java ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress("localhost", 9527)); serverSocketChannel.configureBlocking(false); @@ -28,91 +22,91 @@ serverSocketChannel.configureBlocking(false); 然后向server channel中注册selector: -``` +```java Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); ``` 虽然是NIO,但是对于Selector来说,它的select方法是阻塞方法,只有找到匹配的channel之后才会返回,为了多次进行select操作,我们需要在一个while循环里面进行selector的select操作: -``` +```java while (true) { - selector.select(); - Set selectedKeys = selector.selectedKeys(); - Iterator iter = selectedKeys.iterator(); - while (iter.hasNext()) { - SelectionKey selectionKey = iter.next(); - if (selectionKey.isAcceptable()) { - register(selector, serverSocketChannel); - } - if (selectionKey.isReadable()) { - serverResponse(byteBuffer, selectionKey); - } - iter.remove(); - } - Thread.sleep(1000); + selector.select(); + Set selectedKeys = selector.selectedKeys(); + Iterator iter = selectedKeys.iterator(); + while (iter.hasNext()) { + SelectionKey selectionKey = iter.next(); + if (selectionKey.isAcceptable()) { + register(selector, serverSocketChannel); } + if (selectionKey.isReadable()) { + serverResponse(byteBuffer, selectionKey); + } + iter.remove(); + } + Thread.sleep(1000); +} ``` selector中会有一些SelectionKey,SelectionKey中有一些表示操作状态的OP Status,根据这个OP Status的不同,selectionKey可以有四种状态,分别是isReadable,isWritable,isConnectable和isAcceptable。 当SelectionKey处于isAcceptable状态的时候,表示ServerSocketChannel可以接受连接了,我们需要调用register方法将serverSocketChannel accept生成的socketChannel注册到selector中,以监听它的OP READ状态,后续可以从中读取数据: -``` - private static void register(Selector selector, ServerSocketChannel serverSocketChannel) - throws IOException { - SocketChannel socketChannel = serverSocketChannel.accept(); - socketChannel.configureBlocking(false); - socketChannel.register(selector, SelectionKey.OP_READ); - } +```java +private static void register(Selector selector, ServerSocketChannel serverSocketChannel) + throws IOException { + SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + socketChannel.register(selector, SelectionKey.OP_READ); +} ``` 当selectionKey处于isReadable状态的时候,表示可以从socketChannel中读取数据然后进行处理: -``` - private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey) - throws IOException { - SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); - socketChannel.read(byteBuffer); - byteBuffer.flip(); - byte[] bytes= new byte[byteBuffer.limit()]; - byteBuffer.get(bytes); - log.info(new String(bytes).trim()); - if(new String(bytes).trim().equals(BYE_BYE)){ - log.info("说再见不如不见!"); - socketChannel.write(ByteBuffer.wrap("再见".getBytes())); - socketChannel.close(); - }else { - socketChannel.write(ByteBuffer.wrap("你是个好人".getBytes())); - } - byteBuffer.clear(); +```java +private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey) + throws IOException { + SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); + socketChannel.read(byteBuffer); + byteBuffer.flip(); + byte[] bytes= new byte[byteBuffer.limit()]; + byteBuffer.get(bytes); + log.info(new String(bytes).trim()); + if(new String(bytes).trim().equals(BYE_BYE)){ + log.info("说再见不如不见!"); + socketChannel.write(ByteBuffer.wrap("再见".getBytes())); + socketChannel.close(); + }else { + socketChannel.write(ByteBuffer.wrap("你是个好人".getBytes())); } + byteBuffer.clear(); +} ``` 上面的serverResponse方法中,从selectionKey中拿到对应的SocketChannel,然后调用SocketChannel的read方法,将channel中的数据读取到byteBuffer中,要想回复消息到channel中,还是使用同一个socketChannel,然后调用write方法回写消息给client端,到这里一个简单的回写客户端消息的server端就完成了。 接下来就是对应的NIO客户端,在NIO客户端需要使用SocketChannel,首先建立和服务器的连接: -``` +```java socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527)); ``` 然后就可以使用这个channel来发送和接受消息了: -``` - public String sendMessage(String msg) throws IOException { - byteBuffer = ByteBuffer.wrap(msg.getBytes()); - String response = null; - socketChannel.write(byteBuffer); - byteBuffer.clear(); - socketChannel.read(byteBuffer); - byteBuffer.flip(); - byte[] bytes= new byte[byteBuffer.limit()]; - byteBuffer.get(bytes); - response =new String(bytes).trim(); - byteBuffer.clear(); - return response; - } +```java +public String sendMessage(String msg) throws IOException { + byteBuffer = ByteBuffer.wrap(msg.getBytes()); + String response = null; + socketChannel.write(byteBuffer); + byteBuffer.clear(); + socketChannel.read(byteBuffer); + byteBuffer.flip(); + byte[] bytes= new byte[byteBuffer.limit()]; + byteBuffer.get(bytes); + response =new String(bytes).trim(); + byteBuffer.clear(); + return response; +} ``` 向channel中写入消息可以使用write方法,从channel中读取消息可以使用read方法。 @@ -125,13 +119,13 @@ socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527)); 以netty的ServerBootstrap为例,启动的时候需要指定它的group,先来看一下ServerBootstrap的group方法: -``` +```java public ServerBootstrap group(EventLoopGroup group) { - return group(group, group); - } + return group(group, group); +} public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { - ... + // ... } ``` @@ -139,26 +133,26 @@ ServerBootstrap可以接受一个EventLoopGroup或者两个EventLoopGroup,Even EventLoopGroup只是一个接口,我们常用的一个实现就是NioEventLoopGroup,如下所示是一个常用的netty服务器端代码: -``` - EventLoopGroup bossGroup = new NioEventLoopGroup(); - EventLoopGroup workerGroup = new NioEventLoopGroup(); - try { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new FirstServerHandler()); - } - }) - .option(ChannelOption.SO_BACKLOG, 128) - .childOption(ChannelOption.SO_KEEPALIVE, true); +```java +EventLoopGroup bossGroup = new NioEventLoopGroup(); +EventLoopGroup workerGroup = new NioEventLoopGroup(); - // 绑定端口并开始接收连接 - ChannelFuture f = b.bind(port).sync(); - // 等待server socket关闭 - f.channel().closeFuture().sync(); +ServerBootstrap b = new ServerBootstrap(); +b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new FirstServerHandler()); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + +// 绑定端口并开始接收连接 +ChannelFuture f = b.bind(port).sync(); +// 等待server socket关闭 +f.channel().closeFuture().sync(); ``` 这里和NIO相关的有两个类,分别是NioEventLoopGroup和NioServerSocketChannel,事实上在他们的底层还有两个类似的类分别叫做NioEventLoop和NioSocketChannel,接下来我们分别讲解一些他们的底层实现和逻辑关系。 @@ -167,31 +161,31 @@ EventLoopGroup只是一个接口,我们常用的一个实现就是NioEventLoop NioEventLoopGroup和DefaultEventLoopGroup一样都是继承自MultithreadEventLoopGroup: -``` +```java public class NioEventLoopGroup extends MultithreadEventLoopGroup ``` 他们的不同之处在于newChild方法的不同,newChild用来构建Group中的实际对象,NioEventLoopGroup来说,newChild返回的是一个NioEventLoop对象,先来看下NioEventLoopGroup的newChild方法: -``` - protected EventLoop newChild(Executor executor, Object... args) throws Exception { - SelectorProvider selectorProvider = (SelectorProvider) args[0]; - SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; - RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; - EventLoopTaskQueueFactory taskQueueFactory = null; - EventLoopTaskQueueFactory tailTaskQueueFactory = null; +```java +protected EventLoop newChild(Executor executor, Object... args) throws Exception { + SelectorProvider selectorProvider = (SelectorProvider) args[0]; + SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; + RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; + EventLoopTaskQueueFactory taskQueueFactory = null; + EventLoopTaskQueueFactory tailTaskQueueFactory = null; - int argsLength = args.length; - if (argsLength > 3) { - taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; - } - if (argsLength > 4) { - tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; - } - return new NioEventLoop(this, executor, selectorProvider, - selectStrategyFactory.newSelectStrategy(), - rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); + int argsLength = args.length; + if (argsLength > 3) { + taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; } + if (argsLength > 4) { + tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; + } + return new NioEventLoop(this, executor, selectorProvider, + selectStrategyFactory.newSelectStrategy(), + rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); +} ``` 这个newChild方法除了固定的executor参数之外,还可以根据NioEventLoopGroup的构造函数传入的参数来实现更多的功能。 @@ -212,7 +206,7 @@ SelectorProvider是JDK中的类,它提供了一个静态的provider()方法可 SelectStrategyFactory是一个接口,里面只定义了一个方法,用来返回SelectStrategy: -``` +```java public interface SelectStrategyFactory { SelectStrategy newSelectStrategy(); @@ -223,12 +217,12 @@ public interface SelectStrategyFactory { 先看下SelectStrategy中定义了哪些Strategy: -``` - int SELECT = -1; +```java +int SELECT = -1; - int CONTINUE = -2; +int CONTINUE = -2; - int BUSY_WAIT = -3; +int BUSY_WAIT = -3; ``` SelectStrategy中定义了3个strategy,分别是SELECT、CONTINUE和BUSY_WAIT。 @@ -243,7 +237,7 @@ BUSY_WAIT是一个特殊的strategy,是指IO 循环轮询新事件而不阻塞 RejectedExecutionHandler是netty自己的类,和 java.util.concurrent.RejectedExecutionHandler类似,但是是特别针对SingleThreadEventExecutor来说的。这个接口定义了一个rejected方法,用来表示因为SingleThreadEventExecutor容量限制导致的任务添加失败而被拒绝的情况: -``` +```java void rejected(Runnable task, SingleThreadEventExecutor executor); ``` @@ -251,7 +245,7 @@ void rejected(Runnable task, SingleThreadEventExecutor executor); EventLoopTaskQueueFactory是一个接口,用来创建存储提交给EventLoop的taskQueue: -``` +```java Queue newTaskQueue(int maxCapacity); ``` @@ -263,7 +257,7 @@ Queue newTaskQueue(int maxCapacity); 首先NioEventLoop和DefaultEventLoop一样,都是继承自SingleThreadEventLoop: -``` +```java public final class NioEventLoop extends SingleThreadEventLoop ``` @@ -271,17 +265,17 @@ public final class NioEventLoop extends SingleThreadEventLoop 首先作为一个NIO的实现,必须要有selector,在NioEventLoop中定义了两个selector,分别是selector和unwrappedSelector: -``` - private Selector selector; - private Selector unwrappedSelector; +```java +private Selector selector; +private Selector unwrappedSelector; ``` 在NioEventLoop的构造函数中,他们是这样定义的: -``` - final SelectorTuple selectorTuple = openSelector(); - this.selector = selectorTuple.selector; - this.unwrappedSelector = selectorTuple.unwrappedSelector; +```java +final SelectorTuple selectorTuple = openSelector(); +this.selector = selectorTuple.selector; +this.unwrappedSelector = selectorTuple.unwrappedSelector; ``` 首先调用openSelector方法,然后通过返回的SelectorTuple来获取对应的selector和unwrappedSelector。 @@ -290,7 +284,7 @@ public final class NioEventLoop extends SingleThreadEventLoop 在openSelector方法中,首先通过调用provider的openSelector方法返回一个Selector,这个Selector就是unwrappedSelector: -``` +```java final Selector unwrappedSelector; unwrappedSelector = provider.openSelector(); ``` @@ -299,24 +293,24 @@ unwrappedSelector = provider.openSelector(); DISABLE_KEY_SET_OPTIMIZATION表示的是是否对select key set进行优化: -``` +```java if (DISABLE_KEY_SET_OPTIMIZATION) { - return new SelectorTuple(unwrappedSelector); - } + return new SelectorTuple(unwrappedSelector); +} - SelectorTuple(Selector unwrappedSelector) { - this.unwrappedSelector = unwrappedSelector; - this.selector = unwrappedSelector; - } +SelectorTuple(Selector unwrappedSelector) { + this.unwrappedSelector = unwrappedSelector; + this.selector = unwrappedSelector; +} ``` 如果DISABLE_KEY_SET_OPTIMIZATION被设置为false,那么意味着我们需要对select key set进行优化,具体是怎么进行优化的呢? 先来看下最后的返回: -``` +```java return new SelectorTuple(unwrappedSelector, - new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); + new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); ``` 最后返回的SelectorTuple第二个参数就是selector,这里的selector是一个SelectedSelectionKeySetSelector对象。 @@ -324,20 +318,20 @@ return new SelectorTuple(unwrappedSelector, SelectedSelectionKeySetSelector继承自selector,构造函数传入的第一个参数是一个delegate,所有的Selector中定义的方法都是通过调用 delegate来实现的,不同的是对于select方法来说,会首先调用selectedKeySet的reset方法,下面是以isOpen和select方法为例观察一下代码的实现: -``` - public boolean isOpen() { - return delegate.isOpen(); - } +```java +public boolean isOpen() { + return delegate.isOpen(); +} - public int select(long timeout) throws IOException { - selectionKeys.reset(); - return delegate.select(timeout); - } +public int select(long timeout) throws IOException { + selectionKeys.reset(); + return delegate.select(timeout); +} ``` selectedKeySet是一个SelectedSelectionKeySet对象,是一个set集合,用来存储SelectionKey,在openSelector()方法中,使用new来实例化这个对象: -``` +```java final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); ``` @@ -345,32 +339,32 @@ netty实际是想用这个SelectedSelectionKeySet类来管理Selector中的selec 首先判断系统中有没有sun.nio.ch.SelectorImpl的实现: -``` - Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Object run() { - try { - return Class.forName( - "sun.nio.ch.SelectorImpl", - false, - PlatformDependent.getSystemClassLoader()); - } catch (Throwable cause) { - return cause; - } - } - }); +```java +Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Object run() { + try { + return Class.forName( + "sun.nio.ch.SelectorImpl", + false, + PlatformDependent.getSystemClassLoader()); + } catch (Throwable cause) { + return cause; + } + } +}); ``` SelectorImpl中有两个Set字段: -``` +```java private Set publicKeys; private Set publicSelectedKeys; ``` 这两个字段就是我们需要替换的对象。如果有SelectorImpl的话,首先使用Unsafe类,调用PlatformDependent中的objectFieldOffset方法拿到这两个字段相对于对象示例的偏移量,然后调用putObject将这两个字段替换成为前面初始化的selectedKeySet对象: -``` +```java Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); @@ -379,30 +373,30 @@ if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = - PlatformDependent.objectFieldOffset(publicSelectedKeysField); + PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( - unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); + unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( - unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); + unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } ``` 如果系统设置不支持Unsafe,那么就用反射再做一次: -``` - Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); - if (cause != null) { - return cause; - } - cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); - if (cause != null) { - return cause; - } - selectedKeysField.set(unwrappedSelector, selectedKeySet); - publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); +```java +Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); +if (cause != null) { + return cause; +} +cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); +if (cause != null) { + return cause; +} +selectedKeysField.set(unwrappedSelector, selectedKeySet); +publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); ``` 在NioEventLoop中我们需要关注的一个非常重要的重写方法就是run方法,在run方法中实现了如何执行task的逻辑。 @@ -414,42 +408,42 @@ strategy的值来进行对应的处理。 BUSY_WAIT在NIO中是不支持的,如果是SELECT状态,那么会在curDeadlineNanos之后再次进行select操作: -``` +```java strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); - switch (strategy) { - case SelectStrategy.CONTINUE: - continue; - case SelectStrategy.BUSY_WAIT: - // fall-through to SELECT since the busy-wait is not supported with NIO - case SelectStrategy.SELECT: - long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); - if (curDeadlineNanos == -1L) { - curDeadlineNanos = NONE; // nothing on the calendar - } - nextWakeupNanos.set(curDeadlineNanos); - try { - if (!hasTasks()) { - strategy = select(curDeadlineNanos); - } - } finally { - // This update is just to help block unnecessary selector wakeups - // so use of lazySet is ok (no race condition) - nextWakeupNanos.lazySet(AWAKE); - } - // fall through - default: +switch (strategy) { + case SelectStrategy.CONTINUE: + continue; + case SelectStrategy.BUSY_WAIT: + // fall-through to SELECT since the busy-wait is not supported with NIO + case SelectStrategy.SELECT: + long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); + if (curDeadlineNanos == -1L) { + curDeadlineNanos = NONE; // nothing on the calendar + } + nextWakeupNanos.set(curDeadlineNanos); + try { + if (!hasTasks()) { + strategy = select(curDeadlineNanos); + } + } finally { + // This update is just to help block unnecessary selector wakeups + // so use of lazySet is ok (no race condition) + nextWakeupNanos.lazySet(AWAKE); + } + // fall through + default: ``` 如果strategy > 0,表示有拿到了SelectedKeys,那么需要调用processSelectedKeys方法对SelectedKeys进行处理: -``` - private void processSelectedKeys() { - if (selectedKeys != null) { - processSelectedKeysOptimized(); - } else { - processSelectedKeysPlain(selector.selectedKeys()); - } +```java +private void processSelectedKeys() { + if (selectedKeys != null) { + processSelectedKeysOptimized(); + } else { + processSelectedKeysPlain(selector.selectedKeys()); } +} ``` 上面提到了NioEventLoop中有两个selector,还有一个selectedKeys属性,这个selectedKeys存储的就是Optimized SelectedKeys,如果这个值不为空,就调用processSelectedKeysOptimized方法,否则就调用processSelectedKeysPlain方法。 @@ -458,18 +452,18 @@ processSelectedKeysOptimized和processSelectedKeysPlain这两个方法差别不 处理的逻辑是首先拿到selectedKeys的key,然后调用它的attachment方法拿到attach的对象: -``` +```java final SelectionKey k = selectedKeys.keys[i]; - selectedKeys.keys[i] = null; +selectedKeys.keys[i] = null; - final Object a = k.attachment(); +final Object a = k.attachment(); - if (a instanceof AbstractNioChannel) { - processSelectedKey(k, (AbstractNioChannel) a); - } else { - NioTask task = (NioTask) a; - processSelectedKey(k, task); - } +if (a instanceof AbstractNioChannel) { + processSelectedKey(k, (AbstractNioChannel) a); +} else { + NioTask task = (NioTask) a; + processSelectedKey(k, task); +} ``` 如果channel还没有建立连接,那么这个对象可能是一个NioTask,用来处理channelReady和channelUnregistered的事件。 @@ -480,7 +474,7 @@ final SelectionKey k = selectedKeys.keys[i]; 对第一种情况,会调用task的channelReady方法: -``` +```java task.channelReady(k.channel(), k); ```