当前位置: 首页 > news >正文

首码网站免费推广wordpress视频投稿插件

首码网站免费推广,wordpress视频投稿插件,制作网站哪家便宜,php网站开发代码highlight: arduino-light 源码篇#xff1a;从 Linux 出发深入剖析服务端启动流程 通过前几章课程的学习#xff0c;我们已经对 Netty 的技术思想和基本原理有了初步的认识#xff0c;从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式… highlight: arduino-light 源码篇从 Linux 出发深入剖析服务端启动流程 通过前几章课程的学习我们已经对 Netty 的技术思想和基本原理有了初步的认识从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式让你更加深入理解 Netty 的精髓如 Netty 的设计思想、工程技巧等为之后继续深入研究 Netty 打下坚实的基础。 在课程开始之前我想分享一下关于源码学习的几点经验和建议。 第一很多同学在开始学习源码时面临的第一个问题就是不知道从何下手这个时候一定不能对着源码毫无意义地四处翻看。建议你可以通过 Hello World 或者 TestCase 作为源码学习的入口然后再通过 Debug 断点的方式调试并跑通源码。 第二阅读源码一定要有全局观。首先要把握源码的主流程避免刚开始陷入代码细节的死胡同。 第三源码一定要反复阅读让自己每一次读都有不同的收获。我们可以通过画图、注释的方式帮助自己更容易理解源码的核心流程方便后续的复习和回顾。 作为源码解析的第一节课我们将深入分析 Netty 服务端的启动流程。启动服务的过程中我们可以了解到 Netty 各大核心组件的关系这将是学习 Netty 源码一个非常好的切入点让我们一起看看 Netty 的每个零件是如何运转起来的吧。 调试示例代码 位于netty-example工程下的handler工程的 io.netty.example.echo包下 java ​ package io.netty.example.echo; ​ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; ​ public final class EchoClient { ​    static final boolean SSL System.getProperty(ssl) ! null;    static final String HOST System.getProperty(host, 127.0.0.1);    static final int PORT Integer.parseInt(System.getProperty(port, 8007));    static final int SIZE Integer.parseInt(System.getProperty(size, 256)); ​    public static void main(String[] args) throws Exception {        // Configure SSL.git        final SslContext sslCtx;        if (SSL) {            sslCtx SslContextBuilder.forClient()               .trustManager(InsecureTrustManagerFactory.INSTANCE).build();       } else {            sslCtx null;       } ​        // Configure the client.        EventLoopGroup group new NioEventLoopGroup();        try {            Bootstrap b new Bootstrap();            b.group(group)             .channel(NioSocketChannel.class)             .option(ChannelOption.TCP_NODELAY, true)             .handler(new ChannelInitializerSocketChannel() {                 Override                 public void initChannel(SocketChannel ch) throws Exception {                     System.out.println(客户端bootstrap.handler()方法中指定的处理器被调用);                     ChannelPipeline p ch.pipeline();                     if (sslCtx ! null) {                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));                     }                    // p.addLast(new LoggingHandler(LogLevel.INFO));                     p.addLast(new EchoClientHandler());                 }             }); ​            // Start the client.            ChannelFuture f b.connect(HOST, PORT).sync(); ​            // Wait until the connection is closed.            f.channel().closeFuture().sync();       } finally {            // Shut down the event loop to terminate all threads.            group.shutdownGracefully();       }   } } ​ java ​ package io.netty.example.echo; ​ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; ​ /** * Handler implementation for the echo client. It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { ​    private final ByteBuf firstMessage; ​    /**     * Creates a client-side handler.     */    public EchoClientHandler() {        firstMessage Unpooled.buffer(EchoClient.SIZE);       for (int i 0; i firstMessage.capacity(); i ) {            firstMessage.writeByte((byte) i);       }       firstMessage.writeByte((byte) 98);   } ​    Override    public void channelActive(ChannelHandlerContext ctx) {        System.out.println(EchoClientHandler.channelActive);        ctx.writeAndFlush(firstMessage);   } ​    Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        System.out.println(客户端EchoClientHandler#channelRead被调用);        System.out.println(客户端收到数据 msg);        System.out.println(客户端发送数据 msg);        ctx.writeAndFlush(msg);   } ​    Override    public void channelReadComplete(ChannelHandlerContext ctx) {       ctx.flush();   } ​    Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();   } } ​ java ​ package io.netty.example.echo; ​ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.ClientLoggingHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.ServerLoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; ​ /** * Echoes back any received data from a client. */ public final class EchoServer { ​    static final boolean SSL System.getProperty(ssl) ! null;    static final int PORT Integer.parseInt(System.getProperty(port, 8007)); ​    public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc new SelfSignedCertificate();            sslCtx SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();       } else {            sslCtx null;       } ​        // Configure the server.        EventLoopGroup bossGroup new NioEventLoopGroup(1);        EventLoopGroup workerGroup new NioEventLoopGroup();        final EchoServerHandler serverHandler new EchoServerHandler();        try {            ServerBootstrap b new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .option(ChannelOption.SO_BACKLOG, 100)             .handler(new ServerLoggingHandler(LogLevel.INFO))              //两种设置keepalive风格             .childOption(ChannelOption.SO_KEEPALIVE, true)             .childOption(NioChannelOption.SO_KEEPALIVE, true) ​              //切换到unpooled的方式之一             .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)                   .childHandler(new ChannelInitializerSocketChannel() {                 Override                 public void initChannel(SocketChannel ch) throws Exception {                     ChannelPipeline p ch.pipeline();                     p.addLast(new ClientLoggingHandler(LogLevel.INFO));                     p.addLast(serverHandler);                 }             }); ​            // Start the server.            ChannelFuture f b.bind(PORT).sync(); ​            // Wait until the server socket is closed.            f.channel().closeFuture().sync();       } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();       }   } } ​ java ​ package io.netty.example.echo; ​ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; ​ import java.util.Date; import java.util.concurrent.TimeUnit; ​ /** * Handler implementation for the echo server. */ Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { ​    Override    public void channelRead(final ChannelHandlerContext ctx, Object msg) {        ctx.channel().eventLoop().schedule(new Runnable() {            Override            public void run() {                System.out.println(new Date().toString() :服务器收到消息);                try {                    Thread.sleep(2 * 1000);                    ctx.writeAndFlush(Unpooled.copiedBuffer(hello, 客户端~(^ω^)喵4, CharsetUtil.UTF_8));                    System.out.println(channel code ctx.channel().hashCode());               } catch (Exception ex) {                    System.out.println(发生异常 ex.getMessage());               }           }       }, 5, TimeUnit.SECONDS);        //ctx.write(msg);   } ​    Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();   } ​    Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();   } } ​ java ​ package io.netty.handler.logging; ​ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; import io.netty.util.internal.logging.InternalLogLevel; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; ​ import java.net.SocketAddress; ​ import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump; import static io.netty.util.internal.StringUtil.NEWLINE; ​ /** * A {link ChannelHandler} that logs all events using a logging framework. * By default, all events are logged at ttDEBUG/tt level. */ Sharable SuppressWarnings({ StringConcatenationInsideStringBufferAppend, StringBufferReplaceableByString }) public class ServerLoggingHandler extends ChannelDuplexHandler { ​    private static final LogLevel DEFAULT_LEVEL LogLevel.DEBUG; ​    protected final InternalLogger logger;    protected final InternalLogLevel internalLevel; ​    private final LogLevel level; ​    /**     * Creates a new instance whose logger name is the fully qualified class     * name of the instance with hex dump enabled.     */    public ServerLoggingHandler() {        this(DEFAULT_LEVEL);   } ​    /**     * Creates a new instance whose logger name is the fully qualified class     * name of the instance.     *     * param level the log level     */    public ServerLoggingHandler(LogLevel level) {        if (level null) {            throw new NullPointerException(level);       } ​        logger InternalLoggerFactory.getInstance(getClass());        this.level level;        internalLevel level.toInternalLevel();   } ​    /**     * Creates a new instance with the specified logger name and with hex dump     * enabled.     *     * param clazz the class type to generate the logger for     */    public ServerLoggingHandler(Class? clazz) {        this(clazz, DEFAULT_LEVEL);   } ​    /**     * Creates a new instance with the specified logger name.     *     * param clazz the class type to generate the logger for     * param level the log level     */    public ServerLoggingHandler(Class? clazz, LogLevel level) {        if (clazz null) {            throw new NullPointerException(clazz);       }        if (level null) {            throw new NullPointerException(level);       } ​        logger InternalLoggerFactory.getInstance(clazz);        this.level level;        internalLevel level.toInternalLevel();   } ​    /**     * Creates a new instance with the specified logger name using the default log level.     *     * param name the name of the class to use for the logger     */    public ServerLoggingHandler(String name) {        this(name, DEFAULT_LEVEL);   } ​    /**     * Creates a new instance with the specified logger name.     *     * param name the name of the class to use for the logger     * param level the log level     */    public ServerLoggingHandler(String name, LogLevel level) {        if (name null) {            throw new NullPointerException(name);       }        if (level null) {            throw new NullPointerException(level);       } ​        logger InternalLoggerFactory.getInstance(name);        this.level level;        internalLevel level.toInternalLevel();   } ​    /**     * Returns the {link LogLevel} that this handler uses to log     */    public LogLevel level() {        return level;   } ​    Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, REGISTERED));       }        ctx.fireChannelRegistered();   } ​    Override    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, UNREGISTERED));       }        ctx.fireChannelUnregistered();   } ​    Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, ACTIVE));       }        ctx.fireChannelActive();   } ​    Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, INACTIVE));       }        ctx.fireChannelInactive();   } ​    Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, EXCEPTION, cause), cause);       }        ctx.fireExceptionCaught(cause);   } ​    Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, USER_EVENT, evt));       }        ctx.fireUserEventTriggered(evt);   } ​    Override    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, BIND, localAddress));       }        ctx.bind(localAddress, promise);   } ​    Override    public void connect(            ChannelHandlerContext ctx,            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, CONNECT, remoteAddress, localAddress));       }        ctx.connect(remoteAddress, localAddress, promise);   } ​    Override    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, DISCONNECT));       }        ctx.disconnect(promise);   } ​    Override    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, CLOSE));       }        ctx.close(promise);   } ​    Override    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, DEREGISTER));       }        ctx.deregister(promise);   } ​    Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, READ COMPLETE));       }        ctx.fireChannelReadComplete();   } ​    Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println(服务器端方法中指定的ServerLoggingHandler被调用);        if (logger.isEnabled(internalLevel)) {            //这里被注释了          // logger.log(internalLevel, format(ctx, READ, msg));       }        ctx.fireChannelRead(msg);   } ​    Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, WRITE, msg));       }        ctx.write(msg, promise);   } ​    Override    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, WRITABILITY CHANGED));       }        ctx.fireChannelWritabilityChanged();   } ​    Override    public void flush(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, FLUSH));       }        ctx.flush();   } ​    /**     * Formats an event and returns the formatted message.     *     * param eventName the name of the event     */    protected String format(ChannelHandlerContext ctx, String eventName) {        String chStr ctx.channel().toString();        return new StringBuilder(chStr.length() 1 eventName.length())           .append(chStr)           .append( )           .append(eventName)           .toString();   } ​    /**     * Formats an event and returns the formatted message.     *     * param eventName the name of the event     * param arg       the argument of the event     */    protected String format(ChannelHandlerContext ctx, String eventName, Object arg) {        if (arg instanceof ByteBuf) {            return formatByteBuf(ctx, eventName, (ByteBuf) arg);       } else if (arg instanceof ByteBufHolder) {            return formatByteBufHolder(ctx, eventName, (ByteBufHolder) arg);       } else {            return formatSimple(ctx, eventName, arg);       }   } ​    /**     * Formats an event and returns the formatted message. This method is currently only used for formatting     * {link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}.     *     * param eventName the name of the event     * param firstArg the first argument of the event     * param secondArg the second argument of the event     */    protected String format(ChannelHandlerContext ctx, String eventName, Object firstArg, Object secondArg) {        if (secondArg null) {            return formatSimple(ctx, eventName, firstArg);       } ​        String chStr ctx.channel().toString();        String arg1Str String.valueOf(firstArg);        String arg2Str secondArg.toString();        StringBuilder buf new StringBuilder(                chStr.length() 1 eventName.length() 2 arg1Str.length() 2 arg2Str.length());        buf.append(chStr).append( ).append(eventName).append(: ).append(arg1Str).append(, ).append(arg2Str);        return buf.toString();   } ​    /**     * Generates the default log message of the specified event whose argument is a {link ByteBuf}.     */    private static String formatByteBuf(ChannelHandlerContext ctx, String eventName, ByteBuf msg) {        String chStr ctx.channel().toString();        int length msg.readableBytes();        if (length 0) {            StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 4);            buf.append(chStr).append( ).append(eventName).append(: 0B);            return buf.toString();       } else {            int rows length / 16 (length % 15 0? 0 : 1) 4;            StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 2 10 1 2 rows * 80); ​            buf.append(chStr).append( ).append(eventName).append(: ).append(length).append(B).append(NEWLINE);            appendPrettyHexDump(buf, msg); ​            return buf.toString();       }   } ​    /**     * Generates the default log message of the specified event whose argument is a {link ByteBufHolder}.     */    private static String formatByteBufHolder(ChannelHandlerContext ctx, String eventName, ByteBufHolder msg) {        String chStr ctx.channel().toString();        String msgStr msg.toString();        ByteBuf content msg.content();        int length content.readableBytes();        if (length 0) {            StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 2 msgStr.length() 4);            buf.append(chStr).append( ).append(eventName).append(, ).append(msgStr).append(, 0B);            return buf.toString();       } else {            int rows length / 16 (length % 15 0? 0 : 1) 4;            StringBuilder buf new StringBuilder(                    chStr.length() 1 eventName.length() 2 msgStr.length() 2 10 1 2 rows * 80); ​            buf.append(chStr).append( ).append(eventName).append(: )               .append(msgStr).append(, ).append(length).append(B).append(NEWLINE);            appendPrettyHexDump(buf, content); ​            return buf.toString();       }   } ​    /**     * Generates the default log message of the specified event whose argument is an arbitrary object.     */    private static String formatSimple(ChannelHandlerContext ctx, String eventName, Object msg) {        String chStr ctx.channel().toString();        String msgStr String.valueOf(msg);        StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 2 msgStr.length());        return buf.append(chStr).append( ).append(eventName).append(: ).append(msgStr).toString();   } } ​ java /* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the License); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.handler.logging; ​ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; import io.netty.util.internal.logging.InternalLogLevel; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; ​ import java.net.SocketAddress; ​ import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump; import static io.netty.util.internal.StringUtil.NEWLINE; ​ /** * A {link ChannelHandler} that logs all events using a logging framework. * By default, all events are logged at ttDEBUG/tt level. */ Sharable SuppressWarnings({ StringConcatenationInsideStringBufferAppend, StringBufferReplaceableByString }) public class ClientLoggingHandler extends ChannelDuplexHandler { ​    private static final LogLevel DEFAULT_LEVEL LogLevel.DEBUG; ​    protected final InternalLogger logger;    protected final InternalLogLevel internalLevel; ​    private final LogLevel level; ​    /**     * Creates a new instance whose logger name is the fully qualified class     * name of the instance with hex dump enabled.     */    public ClientLoggingHandler() {        this(DEFAULT_LEVEL);   } ​    /**     * Creates a new instance whose logger name is the fully qualified class     * name of the instance.     *     * param level the log level     */    public ClientLoggingHandler(LogLevel level) {        if (level null) {            throw new NullPointerException(level);       } ​        logger InternalLoggerFactory.getInstance(getClass());        this.level level;        internalLevel level.toInternalLevel();   } ​    /**     * Creates a new instance with the specified logger name and with hex dump     * enabled.     *     * param clazz the class type to generate the logger for     */    public ClientLoggingHandler(Class? clazz) {        this(clazz, DEFAULT_LEVEL);   } ​    /**     * Creates a new instance with the specified logger name.     *     * param clazz the class type to generate the logger for     * param level the log level     */    public ClientLoggingHandler(Class? clazz, LogLevel level) {        if (clazz null) {            throw new NullPointerException(clazz);       }        if (level null) {            throw new NullPointerException(level);       } ​        logger InternalLoggerFactory.getInstance(clazz);        this.level level;        internalLevel level.toInternalLevel();   } ​    /**     * Creates a new instance with the specified logger name using the default log level.     *     * param name the name of the class to use for the logger     */    public ClientLoggingHandler(String name) {        this(name, DEFAULT_LEVEL);   } ​    /**     * Creates a new instance with the specified logger name.     *     * param name the name of the class to use for the logger     * param level the log level     */    public ClientLoggingHandler(String name, LogLevel level) {        if (name null) {            throw new NullPointerException(name);       }        if (level null) {            throw new NullPointerException(level);       } ​        logger InternalLoggerFactory.getInstance(name);        this.level level;        internalLevel level.toInternalLevel();   } ​    /**     * Returns the {link LogLevel} that this handler uses to log     */    public LogLevel level() {        return level;   } ​    Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, REGISTERED));       }        ctx.fireChannelRegistered();   } ​    Override    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, UNREGISTERED));       }        ctx.fireChannelUnregistered();   } ​    Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, ACTIVE));       }        ctx.fireChannelActive();   } ​    Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, INACTIVE));       }        ctx.fireChannelInactive();   } ​    Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, EXCEPTION, cause), cause);       }        ctx.fireExceptionCaught(cause);   } ​    Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, USER_EVENT, evt));       }        ctx.fireUserEventTriggered(evt);   } ​    Override    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, BIND, localAddress));       }        ctx.bind(localAddress, promise);   } ​    Override    public void connect(            ChannelHandlerContext ctx,            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, CONNECT, remoteAddress, localAddress));       }        ctx.connect(remoteAddress, localAddress, promise);   } ​    Override    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, DISCONNECT));       }        ctx.disconnect(promise);   } ​    Override    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, CLOSE));       }        ctx.close(promise);   } ​    Override    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, DEREGISTER));       }        ctx.deregister(promise);   } ​    Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, READ COMPLETE));       }        ctx.fireChannelReadComplete();   } ​    Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println(服务器端方法中指定的ClientLoggingHandler被调用);        if (logger.isEnabled(internalLevel)) {            //这里被注释了          // logger.log(internalLevel, format(ctx, READ, msg));       }        ctx.fireChannelRead(msg);   } ​    Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, WRITE, msg));       }        ctx.write(msg, promise);   } ​    Override    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, WRITABILITY CHANGED));       }        ctx.fireChannelWritabilityChanged();   } ​    Override    public void flush(ChannelHandlerContext ctx) throws Exception {        if (logger.isEnabled(internalLevel)) {            logger.log(internalLevel, format(ctx, FLUSH));       }        ctx.flush();   } ​    /**     * Formats an event and returns the formatted message.     *     * param eventName the name of the event     */    protected String format(ChannelHandlerContext ctx, String eventName) {        String chStr ctx.channel().toString();        return new StringBuilder(chStr.length() 1 eventName.length())           .append(chStr)           .append( )           .append(eventName)           .toString();   } ​    /**     * Formats an event and returns the formatted message.     *     * param eventName the name of the event     * param arg       the argument of the event     */    protected String format(ChannelHandlerContext ctx, String eventName, Object arg) {        if (arg instanceof ByteBuf) {            return formatByteBuf(ctx, eventName, (ByteBuf) arg);       } else if (arg instanceof ByteBufHolder) {            return formatByteBufHolder(ctx, eventName, (ByteBufHolder) arg);       } else {            return formatSimple(ctx, eventName, arg);       }   } ​    /**     * Formats an event and returns the formatted message. This method is currently only used for formatting     * {link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}.     *     * param eventName the name of the event     * param firstArg the first argument of the event     * param secondArg the second argument of the event     */    protected String format(ChannelHandlerContext ctx, String eventName, Object firstArg, Object secondArg) {        if (secondArg null) {            return formatSimple(ctx, eventName, firstArg);       } ​        String chStr ctx.channel().toString();        String arg1Str String.valueOf(firstArg);        String arg2Str secondArg.toString();        StringBuilder buf new StringBuilder(                chStr.length() 1 eventName.length() 2 arg1Str.length() 2 arg2Str.length());        buf.append(chStr).append( ).append(eventName).append(: ).append(arg1Str).append(, ).append(arg2Str);        return buf.toString();   } ​    /**     * Generates the default log message of the specified event whose argument is a {link ByteBuf}.     */    private static String formatByteBuf(ChannelHandlerContext ctx, String eventName, ByteBuf msg) {        String chStr ctx.channel().toString();        int length msg.readableBytes();        if (length 0) {            StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 4);            buf.append(chStr).append( ).append(eventName).append(: 0B);            return buf.toString();       } else {            int rows length / 16 (length % 15 0? 0 : 1) 4;            StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 2 10 1 2 rows * 80); ​            buf.append(chStr).append( ).append(eventName).append(: ).append(length).append(B).append(NEWLINE);            appendPrettyHexDump(buf, msg); ​            return buf.toString();       }   } ​    /**     * Generates the default log message of the specified event whose argument is a {link ByteBufHolder}.     */    private static String formatByteBufHolder(ChannelHandlerContext ctx, String eventName, ByteBufHolder msg) {        String chStr ctx.channel().toString();        String msgStr msg.toString();        ByteBuf content msg.content();        int length content.readableBytes();        if (length 0) {            StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 2 msgStr.length() 4);            buf.append(chStr).append( ).append(eventName).append(, ).append(msgStr).append(, 0B);            return buf.toString();       } else {            int rows length / 16 (length % 15 0? 0 : 1) 4;            StringBuilder buf new StringBuilder(                    chStr.length() 1 eventName.length() 2 msgStr.length() 2 10 1 2 rows * 80); ​            buf.append(chStr).append( ).append(eventName).append(: )               .append(msgStr).append(, ).append(length).append(B).append(NEWLINE);            appendPrettyHexDump(buf, content); ​            return buf.toString();       }   } ​    /**     * Generates the default log message of the specified event whose argument is an arbitrary object.     */    private static String formatSimple(ChannelHandlerContext ctx, String eventName, Object msg) {        String chStr ctx.channel().toString();        String msgStr String.valueOf(msg);        StringBuilder buf new StringBuilder(chStr.length() 1 eventName.length() 2 msgStr.length());        return buf.append(chStr).append( ).append(eventName).append(: ).append(msgStr).toString();   } } 服务器调试示例结果 java 反射创建channelpublic io.netty.channel.socket.nio.NioServerSocketChannel() 创建pipeline 设置非阻塞模式 开始初始化channel 服务器端添加匿名处理器 do Register 注册事件 0 invokeHandlerAddedIfNeeded:添加处理器 invokeHandlerAddedIfNeeded-ChannelInitializer.initChannel(ChannelHandlerContext) 调用服务器端匿名处理器 服务器端添加serverBootstrap.handler()方法中指定的处理器 触发事件:fireChannelRegistered ServerLoggingHandler.channelRegistered 服务器端异步添加ServerBootstrapAcceptor ServerLoggingHandler.bind 触发事件:fireChannelActive ServerLoggingHandler.channelActive doBeginRead注册事件:16 ------------服务器channel初始化完成--------------     创建pipeline 设置非阻塞模式 触发事件:fireChannelRead 服务器端方法中指定的ServerLoggingHandler被调用 服务器端ServerBootstrapAcceptor被调用 do Register 注册事件 0 invokeHandlerAddedIfNeeded:添加处理器 invokeHandlerAddedIfNeeded-ChannelInitializer.initChannel(ChannelHandlerContext) 触发事件:fireChannelRegistered 11:09:25.118 [nioEventLoopGroup-3-1] INFO  i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] REGISTERED 触发事件:fireChannelActive 11:09:25.118 [nioEventLoopGroup-2-1] INFO  i.n.h.logging.ServerLoggingHandler - [id: 0xd5ededb0, L:/0:0:0:0:0:0:0:0:8007] READ COMPLETE 11:09:25.127 [nioEventLoopGroup-3-1] INFO  i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] ACTIVE doBeginRead注册事件:1 ------------客户端channel初始化完成--------------     触发事件:fireChannelRead //服务器收到消息 ClientLoggingHandler#channelRead 11:09:25.187 [nioEventLoopGroup-3-1] INFO  i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] READ COMPLETE ​ 11:09:25.187 [nioEventLoopGroup-3-1] INFO  i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] FLUSH ​ Fri Sep 03 11:09:30 CST 2021:服务器收到消息 ​ 11:09:32.220 [nioEventLoopGroup-3-1] INFO  i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] WRITE: 29B//服务器响应向客户端写消息 ​         -------------------------------------------------         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f | ------------------------------------------------------------------------- |00000000| 68 65 6c 6c 6f 2c 20 e5 ae a2 e6 88 b7 e7 ab af |hello, .........| |00000010| 7e 28 3e 5e cf 89 5e 3c 29 e5 96 b5 34          |~(^..^)...4   | ------------------------------------------------------------------------- ​    11:09:32.235 [nioEventLoopGroup-3-1] INFO  i.n.h.logging.ClientLoggingHandler - [id: 0xb9e20c1b, L:/127.0.0.1:8007 - R:/127.0.0.1:56141] FLUSH ​ 客户端调试示例结果 java 反射创建channelpublic io.netty.channel.socket.nio.NioSocketChannel() 创建pipeline 设置非阻塞模式 开始初始化channel 客户端channel添加bootstrap.handler()方法指定的处理器 do Register 注册事件 0 invokeHandlerAddedIfNeeded:添加处理器 invokeHandlerAddedIfNeeded-ChannelInitializer.initChannel(ChannelHandlerContext) 客户端bootstrap.handler()方法中指定的处理器被调用 触发事件:fireChannelRegistered 完成连接 finishConnect 触发事件:fireChannelActive EchoClientHandler.channelActive//第一次向服务器发送消息 doBeginRead注册事件:1 ------------channel初始化完成-------------- 触发事件:fireChannelRead//客户端收到消息 客户端EchoClientHandler#channelRead被调用 客户端收到数据PooledUnsafeDirectByteBuf(ridx: 0, widx: 29, cap: 1024) 客户端发送数据PooledUnsafeDirectByteBuf(ridx: 0, widx: 29, cap: 1024) addLast方法 java Override public final ChannelPipeline addLast(ChannelHandler... handlers) { //handlers return addLast(null, handlers); } Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers null) { throw new NullPointerException(handlers); } for (ChannelHandler h: handlers) {if (h null) {break;}//handlersaddLast(executor, null, h);}return this; } java Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx newContext(group, filterName(name, handler), handler);addLast0(newCtx);if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}//新添加的handler也会被执行initChannel方法callHandlerAdded0(newCtx);return this; } 创建EventLoopSelector流程 java EventLoopGroup workerGroup new NioEventLoopGroup(); EventLoopGroup bossGroup new NioEventLoopGroup(); java public class NioEventLoopGroup extends MultithreadEventLoopGroup { //默认构造方法 public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null); } } 1.根据系统获取SelectorProvider java public NioEventLoopGroup(int nThreads, Executor executor) { //SelectorProvider.provider() //根据不同的系统创建不同的Selector 或者是说jdk不同 //Linux 下JOK 的下载和安装与Windows 下并没有太大的不同只是对一些环境的设置稍有不同。 //在windows环境下的是 WindowsSelectorProvider this(nThreads, executor, SelectorProvider.provider()); } //SelectorProvider.provider() //1.读取配置根据配置的class获取provider 獲取不到的话分支走到第二步 //2.通过spi获取provider 获取不到到第三步 //3.DefaultSelectorProvider#create创建provider //根据不同的系统创建不同的Selector 或者是说jdk不同 //Linux 下JOK 的下载和安装与Windows 下并没有太大的不同只是对一些环境的设置稍有不同。 //在windows环境下的是 WindowsSelectorProvider public static SelectorProvider provider() { synchronized (lock) { if (provider ! null) return provider; return AccessController.doPrivileged( new PrivilegedAction () { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider sun.nio.ch.DefaultSelectorProvider.create();return provider;}});} } // sun.nio.ch.DefaultSelectorProvider.create(); // 不同的系统根据jdk有不同的实现 public static SelectorProvider create() { return new WindowsSelectorProvider(); } 2.设置线程池数量,默认cpu数量*2 java public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } //默认创建的线程数是 cpu核数 * 2 // DEFAULTEVENTLOOPTHREADS Math.max(1, SystemPropertyUtil.getInt( // io.netty.eventLoopThreads, NettyRuntime.availableProcessors() * 2)); //args是可变参 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads 0 ? DEFAULTEVENTLOOPTHREADS : nThreads, executor, args); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } 3.循环创建NioEventLoop java protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads 0) { throw new IllegalArgumentException (String.format(nThreads: %d (expected: 0), nThreads)); } if (executor null) {//一个task一个threadexecutor new ThreadPerTaskExecutor(newDefaultThreadFactory());}//children是MultithreadEventExecutorGroup的属性//初始化数组 长度为线程数量children new EventExecutor[nThreads];//遍历nThreads 循环创建EventExecutor//EventLoop继承自EventExecutorfor (int i 0; i nThreads; i ) {boolean success false;try {//EventExecutor//其实是1个 NioEventLoopchildren[i] newChild(executor, args);success true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException(failed to create a child event loop, e);} finally {if (!success) {for (int j 0; j i; j ) {children[j].shutdownGracefully();}for (int j 0; j i; j ) {EventExecutor e children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}chooser chooserFactory.newChooser(children);final FutureListenerObject terminationListener new FutureListenerObject() {Overridepublic void operationComplete(FutureObject future) throws Exception {if (terminatedChildren.incrementAndGet() children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}SetEventExecutor childrenSet new LinkedHashSetEventExecutor(children.length);Collections.addAll(childrenSet, children);readonlyChildren Collections.unmodifiableSet(childrenSet); } 4.初始化NioEventLoop打开selector java //返回的是EventLoop 实际是 NioEventLoop //io.netty.channel.nio.NioEventLoopGroup#newChild Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory args.length 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); } //构造函数 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); if (selectorProvider null) {throw new NullPointerException(selectorProvider);}if (strategy null) {throw new NullPointerException(selectStrategy);}provider selectorProvider;//通过provider获取selector//selectorTuple是selector的包装类//获取到selectorTuple final SelectorTuple selectorTuple openSelector();//selectorTuple 获取 selectorselector selectorTuple.selector;unwrappedSelector selectorTuple.unwrappedSelector;selectStrategy strategy; } 5.NioEventLoop的run方法 因为它继承自Excutor类 所以还要关注它的run方法。 在 Netty 中 EventLoop 可以理解为 Reactor 线程模型的事件处理引擎每个 EventLoop 线程都维护一个 Selector 选择器和任务队列 taskQueue。它主要负责处理 I/O 事件、普通任务和定时任务。 Netty 中推荐使用 NioEventLoop 作为实现类那么 Netty 是如何实现 NioEventLoop 的呢首先我们来看 NioEventLoop 最核心的 run() 方法源码本节课我们不会对源码做深入的分析只是先了解 NioEventLoop 的实现结构。 首先会根据 hasTasks() 的结果来决定是执行 selectNow() 还是 select(oldWakenUp)这个应该好理解。如果有任务正在等待那么应该使用无阻塞的 selectNow()如果没有任务在等待那么就可以使用带阻塞的 select 操作。 ioRatio 控制 IO 操作所占的时间比重 如果设置为 100%那么先执行 IO 操作然后再执行任务队列中的任务。 如果不是 100%那么先执行 IO 操作然后执行 taskQueue 中的任务但是需要控制执行任务的总时间。也就是说非 IO 操作可以占用的时间通过 ioRatio 以及这次 IO 操作耗时计算得出。 ​ 我们这里先不要去关心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的细节只要先理解它们分别做什么事情就可以了。 回过神来我们前面在 register 的时候提交了 register 任务给 NioEventLoop这是 NioEventLoop 接收到的第一个任务所以这里会实例化 Thread 并且启动然后进入到 NioEventLoop 中的 run 方法。 当然了实际情况可能是Channel 实例被 register 到一个已经启动线程的 NioEventLoop 实例中。 io.netty.channel.nio.NioEventLoop#run java Override //死循环监听、处理事件 protected void run() { for (;;) { try { try { //hasTasks()判断是否有任务 tailTasks 和 taskQueue 是否为空 //如果有任务 返回的是io事件个数 那么直接进入default 什么也不做跳出switch //如果没有任务 返回的是SelectStrategy.SELECT //队列中有任务则调用selectNow返回当前已就绪IO事件的数量否则继续select switch (selectStrategy.calculateStrategy (selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:// 轮询 I/O 事件select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys 0;needsToSelectAgain false;//初始ioRatio是50final int ioRatio this.ioRatio;//如果ioRatio是100 先处理就绪的ioif (ioRatio 100) {try {//selector选择事件//判断事件类型//处理io事件processSelectedKeys();} finally {//确保我们总是可以执行任务//执行任务runAllTasks();}} else {final long ioStartTime System.nanoTime();try {// 处理 I/O 事件processSelectedKeys();} finally {//确保我们总是可以执行任务//但是这次执行任务是带有超时时间的final long ioTime System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}} } 上述源码的结构比较清晰NioEventLoop 每次循环的处理流程都包含事件轮询 select、事件处理 processSelectedKeys、任务处理 runAllTasks 几个步骤是典型的 Reactor 线程模型的运行机制。而且 Netty 提供了一个参数 ioRatio可以调整 I/O 事件处理和任务处理的时间比例。下面我们将着重从事件处理和任务处理两个核心部分出发详细介绍 Netty EventLoop 的实现原理。 结合 Netty 的整体架构我们一起看下 EventLoop 的事件流转图以便更好地理解 Netty EventLoop 的设计原理。NioEventLoop 的事件处理机制采用的是无锁串行化的设计思路。 BossEventLoopGroup 和 WorkerEventLoopGroup 包含一个或者多个 NioEventLoop。BossEventLoopGroup 负责监听客户端的 Accept 事件当事件触发时将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是线程独立的不同的 NioEventLoop 线程之间不会发生任何交集。NioEventLoop 完成数据读取后会调用绑定的 ChannelPipeline 进行事件传播ChannelPipeline 也是线程安全的数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后将加工完成的数据再传递给下一个 ChannelHandler整个过程是串行化执行不会发生线程上下文切换的问题。 NioEventLoop 无锁串行化的设计不仅使系统吞吐量达到最大化而且降低了用户开发业务逻辑的难度不需要花太多精力关心线程安全问题。 虽然单线程执行避免了线程切换但是它的缺陷就是不能执行时间过长的 I/O 操作一旦某个 I/O 事件发生阻塞那么后续的所有 I/O 事件都无法执行甚至造成事件积压。 在使用 Netty 进行程序开发时我们一定要对 ChannelHandler 的实现逻辑有充分的风险意识。 java Override public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { //判断是否有任务 //有任务唤醒 return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; } /** * 核心思想没有task要做时select阻塞1s如果有task,wakeup去做。 * param oldWakenUp * throws IOException */ private void select(boolean oldWakenUp) throws IOException { Selector selector this.selector; try { int selectCnt 0; long currentTimeNanos System.nanoTime(); //按scheduled的task时间来计算select timeout时间。 long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos); long normalizedDeadlineNanos selectDeadLineNanos - initialNanoTime();if (nextWakeupTime ! normalizedDeadlineNanos) {nextWakeupTime normalizedDeadlineNanos;}for (;;) {long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;if (timeoutMillis 0) { //已经有定时task需要执行了或者超过最长等待时间了if (selectCnt 0) {//非阻塞没有数据返回0selector.selectNow();selectCnt 1;}break;}if (hasTasks() wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt 1;break;}//下面select阻塞中别人唤醒也可以可以的int selectedKeys selector.select(timeoutMillis);selectCnt ;if (selectedKeys ! 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug(Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.);}selectCnt 1;break;}long time System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD 0 selectCnt SELECTOR_AUTO_REBUILD_THRESHOLD) {// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector selectRebuildSelector(selectCnt);selectCnt 1;break;}currentTimeNanos time;}if (selectCnt MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug(Selector.select() returned prematurely {} times in a row for Selector {}.,selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() raised by a Selector {} - JDK bug?,selector, e);}// Harmless exception - log anyway} } 6.空轮询bug解决 NioEventLoop 线程的可靠性至关重要一旦 NioEventLoop 发生阻塞或者陷入空轮询就会导致整个系统不可用。 在 JDK 中 Epoll 的实现是存在漏洞的即使 Selector 轮询的事件列表为空NIO 线程一样可以被唤醒导致 CPU 100% 占用。这就是臭名昭著的 JDK epoll 空轮询的 Bug。 Netty 作为一个高性能、高可靠的网络框架需要保证 I/O 线程的安全性。那么它是如何解决 JDK epoll 空轮询的 Bug 呢 实际上 Netty 并没有从根源上解决该问题而是巧妙地规避了这个问题。 我们抛开其他细枝末节直接定位到事件轮询 select() 方法中的最后一部分代码一起看下 Netty 是如何解决 epoll 空轮询的 Bug。 Netty中的解决思路 对Selector()方法中的阻塞定时 select(timeMIllinois)操作的 次数进行统计每完成一次select操作进行一次计数若在循环周期内 发生N次空轮询如果N值大于BUG阈值(默认为512)就进行空轮询BUG处理。 重建Selector判断是否是其他线程发起的重建请求若不是则将原SocketChannel从旧的Selector上去除注册重新注册到新的 Selector上并将原来的Selector关闭。 https://blog.csdn.net/qq_41884976/article/details/91913820 select方法分三个部分 //第一部分超时处理逻辑 //第二部分定时阻塞select(timeMillins)   //第三部分 解决空轮询 BUG long time System.nanoTime(); //当前时间 - 循环开始时间 定时select的时间timeoutMillis说明已经执行过一次阻塞select() if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) currentTimeNanos) {    //说明发生过一次阻塞式轮询 重置次数    selectCnt 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD 0        selectCnt SELECTOR_AUTO_REBUILD_THRESHOLD) {    // 如果空轮询的次数大于空轮询次数阈值 SELECTOR_AUTO_REBUILD_THRESHOLD(512)    //1.首先创建一个新的Selecor    //2.将旧的Selector上面的键及其一系列的信息放到新的selector上面。    selector selectRebuildSelector(selectCnt);    selectCnt 1;    break; } Netty 提供了一种检测机制判断线程是否可能陷入空轮询具体的实现方式如下 每次执行 Select 操作之前记录当前时间 currentTimeNanos。time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) currentTimeNanos如果事件轮询的持续时间大于等于 timeoutMillis那么说明是正常的否则表明阻塞时间并未达到预期可能触发了空轮询的 Bug。Netty 引入了计数变量 selectCnt。在正常情况下selectCnt 会重置否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTORAUTOREBUILD_THRESHOLD(默认512) 阈值时会触发重建 Selector 对象。 Netty 采用这种方法巧妙地规避了 JDK Bug。异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector 上重建完成之后异常的 Selector 就可以废弃了。
http://www.hkea.cn/news/14541759/

相关文章:

  • 便宜的手机网站建设一个网站做两种产品
  • 网站的建议wordpress表格插件
  • 做网站重要标签营销技巧分享
  • 郑州建立一个网站需要哪些电子商务网站建设体会与收获
  • 合肥微网站外贸网站是怎么做的
  • 上海备案证查询网站查询网站个人网站 icp
  • 网站建设接私活平台快速做网站公司报价
  • 做家电维修网站能接到单吗衡阳网站建设衡阳千度网络
  • 支付公司网站建设会计分录重庆装修贷款利率是多少
  • 利用微博网站做淘客得物app公司
  • windows做网站服务器青浦营销型网站建设
  • 龙岗网站设计信息如何做谷歌优化
  • 新手怎么做网站内容维护做网站不推广
  • 网站icon图标怎么加用别人的电影网站做公众号
  • 手表网站背景素材网站从域名
  • 做推广需要网站吗佛山seo优化排名
  • 凡科做的免费网站做网站客户怎么找
  • 龙岗企业网站建设做静态网站的参考文献
  • 厚瑜网站建设重庆教育建设有限公司网站首页
  • 温州网上商城网站建设中国出口贸易网官网
  • 企业二级网站怎么做开发公司保障员工安全的措施
  • 做网站听的纯音乐诸城做网站收费
  • 公维金如何上传建设局网站做基础工程分包应上什么网站
  • 免费的网站制作郑州室内设计工作室
  • 制作网站难还是编程难京东网站建设费用
  • wap的网站模板可以加微信的交友软件
  • cms全称厦门最快seo
  • 哪有专业做网站住建局官网报名入口
  • 房产网站定制网络课程教学平台有哪些
  • 网站开发中 倒计时 源码代理龙华网站建设