当前位置: 首页 > news >正文

中国能源建设股份有限公司新网站园区门户网站建设

中国能源建设股份有限公司新网站,园区门户网站建设,wordpress邮件伪造,在线p图编辑器一、概述 1.1 概念 Netty是一个异步的基于事件驱动(即多路复用技术)的网络应用框架#xff0c;用于快速开发可维护、高性能的网络服务器和客户端。 1.2 地位 Netty在Java网络应用框架中的地位就好比#xff0c;Spring框架在JavaEE开发中的地位。 以下的框架都使用了Nett…一、概述 1.1 概念 Netty是一个异步的基于事件驱动(即多路复用技术)的网络应用框架用于快速开发可维护、高性能的网络服务器和客户端。 1.2 地位 Netty在Java网络应用框架中的地位就好比Spring框架在JavaEE开发中的地位。 以下的框架都使用了Netty因为他们有网络通信需求。 Cassandra非关系型数据库Spark大数据分布式计算框架Hadoop大数据分布式存储框架RocketMQ阿里开源的消息队列ElasticSearch搜索引擎gRPCRPC框架DubboRPC框架Spring 5.xflux api完全抛弃了tomcat使用netty作为服务器端Zookeeper分布式协调框架 1.3 优势 Netty同样是基于java nio开发。如果自己使用nio开发工作量大bug 多这是因为Netty已经做好了基础部分 构建协议解决 TCP 传输问题如粘包、半包Linux多路复用的底层是epoll会存在空轮询导致 CPU 100%对应nio中Linux下不阻塞Netty兼容并解决该问题对 API 进行增强使之更易用如 ThreadLocal FastThreadLocal ByteBuffer ByteBuf Netty vs 其它网络应用框架 Mina 由 apache 维护将来 3.x 版本可能会有较大重构破坏 API 向下兼容性Netty 的开发迭代更迅速API 更简洁、文档更优秀久经考验Netty 版本 2.x 20043.x 20084.x 20135.x 已废弃使用了AIO但是Linux的是伪AIO只有Win真正实现了AIO。实际没有明显的性能提升却导致维护成本高 二、入门 2.1 需求 首先需要引入依赖 dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.39.Final/version /dependency使用Netty开发一个简单的服务器端和客户端 客户端向服务器端发送 hello, world服务器仅接收不返回 2.2 实现 服务端 public class HelloServer {public static void main(String[] args) {new ServerBootstrap()//启动器负责组装netty组件协调工作//一个selector和一个thread就叫EventLoop。EventLoopGroup里面既有Boss处理连接也有Worker处理读写.group(new NioEventLoopGroup())//选择服务器netty的ServerSocketChannel具体实现有4种自行看源码.channel(NioServerSocketChannel.class).childHandler(//child即Worker负责读写。决定了Worker能执行哪些操作(Handler)new ChannelInitializerNioSocketChannel() {//代表和客户端进行数据读写的通道主要负责在initChannel里面添加其他的handlerOverrideprotected void initChannel(NioSocketChannel channel) throws Exception {//添加具体的handlerchannel.pipeline().addLast(new StringDecoder());//将netty的ByteBuf转换为字符串channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {//自定义handlerOverridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//此时这个msg就是上一步StringDecoder解码后的结果System.out.println(msg);}});}})//监听端口.bind(8080);} }客户端 public class HelloClient {public static void main(String[] args) throws Exception {//创建启动器new Bootstrap()//添加EventLoop.group(new NioEventLoopGroup())//选择客户端channel实现.channel(NioSocketChannel.class)//添加处理器,连接建立后该初始化器被调用.handler(new ChannelInitializerNioSocketChannel() {Override//在连接建立后(accept)被调用protected void initChannel(NioSocketChannel channel) throws Exception {channel.pipeline().addLast(new StringEncoder());}})//连接到服务器.connect(localhost, 8080)//阻塞方法直到连接建立.sync()//代表着连接对象.channel()//向服务器发送数据.writeAndFlush(hello world);} }针对上述代码对Netty的流程理解 channel 是数据的通道与 jdk nio 中 channel 作用一致msg 是流动的数据。输入是 ByteBuf 输出也是 ByteBuf 。但是中间会经过 pipeline 加工变成其他的类型对象。handler是数据的处理工序 工序有多道合在一起就是 pipeline 。 pipeline 负责发布事件读、读完成等传播给各个 handler handler 对自己感兴趣的事件进行处理handler分为Inbound数据输入时走入站handler和Outbound数据输出时走出站handler两类 eventLoop底层就是一个线程是处理数据的工人 工人可以管理多个 channel 的 io 操作。并且工人和 channel 针对io操作是绑定的这也是从线程安全的角度考虑如果一个 channel 可以被多个线程管理就会存在多个线程一起读写的情况防止出问题可能还要做串行操作工人既可以执行 io 操作也可以进行任务的处理。每位工人有任务队列队列里可以存储该工人绑定的多个 channel 的待处理任务任务分为普通任务、定时任务工人按照 pipeline 顺序依次按照 handler 的代码处理数据可以为每道工序指定不同的工人只适用非io操作。 三、组件 3.1 EventLoop EventLoop事件循环对象 EventLoop 本质是一个单线程执行器同时维护了一个 Selector里面有 run 方法处理 Channel 上源源不断的 io 事件。 它的继承关系比较复杂 一条线是继承自 java.util.concurrent.ScheduledExecutorService 因此包含了线程池中所有的方法 另一条线是继承自io.netty.util.concurrent.OrderedEventExecutor 提供了 boolean inEventLoop() 方法判断当前执行的线程是否属于此EventLoop 提供了 boolean inEventLoop(Thread thread) 方法判断指定线程是否属于此 EventLoop 提供了 parent 方法来看看自己属于哪个 EventLoopGroup 一般我们不会直接使用EventLoop而是使用EventLoopGroup 3.2 EventLoopGroup EventLoopGroup事件循环组 EventLoopGroup 是一组 EventLoopChannel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop后续这个 Channel 上的 io 事件都由此 EventLoop 来处理保证了 io 事件处理时的线程安全 继承自 io.netty.util.concurrent.EventExecutorGroup 实现了 Iterable 接口提供遍历 EventLoop 的能力另有 next 方法获取集合中下一个 EventLoop, next 底层是轮询 使用示例 Slf4j public class TestEventLoop {public static void main(String[] args) {//1. 创建事件循环组。默认线程数跟下源码, 什么都不配置, 默认是cpu线程*2NioEventLoopGroup group new NioEventLoopGroup(2);//可以执行io事件、普通任务、定时任务//new DefaultEventLoopGroup();//执行普通任务、定时任务//2. 获取下一个事件循环对象这个next底层就是轮询System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());//3. 执行普通任务, 在netty中的意义就是执行一些比较耗时的任务group.next().execute(() - {log.info(普通任务);});//4. 执行定时任务。用于keepalive时连接的保活group.next().scheduleAtFixedRate(() - {log.info(定时任务);}, 0L, 2L, TimeUnit.SECONDS);} }3.2.1 完善2.1 将2.1的需求进一步完善代码如下。 服务端 Slf4j public class EventLoopServer {public static void main(String[] args) {/*** 细分2: 如果处理io的操作比较耗时, 这时候是不应该让nioEventLoop阻塞在那里否则会影响后续其他channel的读写* 所以创建一个独立的EventLoopGroup用来执行那些耗时操作*/DefaultEventLoopGroup defaultEventLoopGroup new DefaultEventLoopGroup(2, new ThreadFactory() {private final AtomicInteger atomicInteger new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, calc- atomicInteger.incrementAndGet());}});new ServerBootstrap()//.group(new NioEventLoopGroup())//细分1: netty建议将group划分的更细, 划分为parent和child。parent负责accept, child负责read and write.group(new NioEventLoopGroup(1, new ThreadFactory() {private final AtomicInteger atomicInteger new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, boss- atomicInteger.incrementAndGet());}}), new NioEventLoopGroup(2, new ThreadFactory() {private final AtomicInteger atomicInteger new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, worker- atomicInteger.incrementAndGet());}})).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(handler1, new ChannelInboundHandlerAdapter() {Override//没有编解码那就是ByteBufpublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;log.info(buf.toString(Charset.defaultCharset()) _ ctx.channel().remoteAddress());ctx.fireChannelRead(msg);//将消息传递给下一个handler//上行做法通过直接调用父级该方法一样可以实现往下传递//super.channelRead(ctx, msg);}}).addLast(defaultEventLoopGroup, handler2, new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;TimeUnit.SECONDS.sleep(2L);log.info(buf.toString(Charset.defaultCharset()) _ ctx.channel().remoteAddress());}});}}).bind(8080);} }客户端 public class EventLoopClient {public static void main(String[] args) throws Exception {ChannelFuture future new Bootstrap()//创建启动器//添加EventLoop.group(new NioEventLoopGroup())//选择客户端channel实现.channel(NioSocketChannel.class)//添加处理器,连接建立后该初始化器被调用.handler(new ChannelInitializerNioSocketChannel() {Override//在连接建立后(accept)被调用protected void initChannel(NioSocketChannel channel) throws Exception {channel.pipeline().addLast(new StringEncoder());}})//连接到服务器.connect(localhost, 8080);Channel channel future//阻塞方法直到连接建立.sync().channel();//代表着连接对象while (true) {System.in.read();channel.writeAndFlush(hello world);//channel.write(hello world);}} }建立三个Client每个Client发送一次消息。运行结果如图 由上图可知channel第一次创建时就与线程绑定了不管是处理读写的worker还是处理耗时的calc都是绑定的。 h1与h2对应服务端的handler1与handler2 至于head与tail后面的会提到 3.2.2 切换线程原理 查看源码io.netty.channel.AbstractChannelHandlerContext中的invokeChannelRead static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m next.pipeline.touch(ObjectUtil.checkNotNull(msg, msg), next);// 返回下一个handler的eventLoop,这里是使用了多态的写法EventExecutor executor next.executor();//判断当前handler的线程是否和下一个handler的eventLoop是同一个线程//如果是直接调用否则, 将要处理的消息提交给下一个eventLoopif (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {Overridepublic void run() {next.invokeChannelRead(m);}});} }3.3 Channel channel 的主要作用 close() 可以用来异步关闭 channelcloseFuture() 用来执行 channel 关闭后的善后操作 sync 方法作用是同步等待 channel 关闭而 addListener 方法是异步等待 channel 关闭 pipeline() 方法添加处理器write() 方法将数据写到缓冲区但并不是立即写出 可以直接将3.2.1的代码修改成write方法尝试即可 writeAndFlush() 方法立即将数据写出 3.3.1 ChannelFuture 保证获取到的channel是成功连接后的两种方式 sync 阻塞本线程阻塞直到channel成功建立连接addListener(回调对象) 添加回调其他线程执行channel监听到连接成功后执行回调对象 服务端代码保持不变客户端代码修改 Slf4j public class EventLoopClient {public static void main(String[] args) throws Exception {//带有Future、Promise的都是和异步方法配套使用目的是提高效率且正确处理结果ChannelFuture future new Bootstrap()//创建启动器//添加EventLoop.group(new NioEventLoopGroup())//选择客户端channel实现.channel(NioSocketChannel.class)//添加处理器,连接建立后该初始化器被调用.handler(new ChannelInitializerNioSocketChannel() {Override//在连接建立后(accept)被调用protected void initChannel(NioSocketChannel channel) throws Exception {channel.pipeline().addLast(new StringEncoder());}})//连接到服务器/*** 异步非阻塞* 异步我(当前线程)只负责发起连接至于连接后的结果让别人(NioEventLoopGroup中线程)取* 非阻塞不等待直接下一步*/.connect(localhost, 8080);//方法一使用sync阻塞方法直到连接建立//future.sync();//直接获取channel其实获取到的是个尚未连接的channel//Channel channel future.channel();//代表着连接对象//for (int i 0; i 2; i) {// if (channel.isActive()) {// log.info(已连接);// channel.writeAndFlush(hello world);// } else {// log.info(未连接);// }// TimeUnit.SECONDS.sleep(1);//}//方法二使用addListener(回调对象)方法异步处理结果future.addListener(new ChannelFutureListener() {Override//在nio线程连接建立好之后会调用operationCompletepublic void operationComplete(ChannelFuture future) throws Exception {Channel channel future.channel();for (int i 0; i 2; i) {if (channel.isActive()) {log.info(已连接);channel.writeAndFlush(hello world);} else {log.info(未连接);}TimeUnit.SECONDS.sleep(1);}}});} }3.3.2 CloseFuture Channel通过 closeFuture() 来进行善后操作 sync 方法作用是同步等待 channel 关闭而 addListener 方法是异步等待 channel 关闭 Slf4j public class CloseFutureClient {public static void main(String[] args) throws Exception {NioEventLoopGroup group new NioEventLoopGroup();ChannelFuture future new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)).addLast(new StringEncoder());}}).connect(localhost, 8080);Channel channel future.sync().channel();new Thread(() - {Scanner scanner new Scanner(System.in);while (true) {String line scanner.nextLine();if (q.equals(line)) {//close是个异步方法, 通过添加LoggingHandler可以监视关闭的线程channel.close();log.info(调用close方法);break;}channel.writeAndFlush(line);}}, input).start();//方式一sync//ChannelFuture closeFuture channel.closeFuture();//closeFuture.sync();//log.info(成功关闭);//group.shutdownGracefully();//方式二监听channel.closeFuture().addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.info(成功关闭后的回调);group.shutdownGracefully();}});} }Netty可以通过LoggingHandler打印日志直观的查看Channel连接、收发、断开的过程 3.3.3 为何异步 疑问为什么不在一个线程中去执行建立连接、去执行关闭 channel那样不是也可以吗非要用这么复杂的异步方式比如一个线程发起建立连接另一个线程去真正建立连接一个线程去关闭连接另一个线程真正去关闭连接。 这个问题也很简单就比如多路复用的做法。我只有4个线程来发起长连接如果一线程一长连接那种撑死只能建立4个长连接。但是使用多路复用技术就能处理更多的长连接了。这也就是Netty异步的核心思想了。 3.4 FuturePromise 3.4.1 比较 在异步处理时经常用到这 Future Promise 两个接口 首先要说明 netty 中的 Future 与 jdk 中的 Future 同名但是是两个接口netty 的 Future 继承自 jdk 的 Future而 Promise 又对 netty Future 进行了扩展 jdk Future 只能同步等待任务结束或成功、或失败才能得到结果。比如get方法就是只能同步等待获取结果。netty Future 可以同步等待任务结束得到结果也可以异步方式得到结果比如CloseFuture的addListener但都是要等任务结束netty Promise 不仅有 netty Future 的功能而且脱离了任务独立存在只作为两个线程间传递结果的容器 功能/名称jdk Futurenetty FuturePromisecancel取消任务--isCanceled任务是否取消--isDone任务是否完成不能区分成功失败--get获取任务结果阻塞等待--getNow-获取任务结果非阻塞还未产生结果时返回 null-await-等待任务结束如果任务失败不会抛异常而是通过 isSuccess 判断-sync-等待任务结束如果任务失败抛出异常-isSuccess-判断任务是否成功-cause-获取失败信息非阻塞如果没有失败返回null-addLinstener-添加回调异步接收结果-setSuccess--设置成功结果setFailure--设置失败结果 3.4.2 示例 jdk Future Slf4j public class TestJDKFuture {public static void main(String[] args) throws Exception {ExecutorService executorService Executors.newFixedThreadPool(2);FutureInteger future executorService.submit(new CallableInteger() {Overridepublic Integer call() throws Exception {log.info(calc..);TimeUnit.SECONDS.sleep(2);return ThreadLocalRandom.current().nextInt(1, 10);}});log.info(waiting..);Integer integer future.get();log.info(received..{}, integer);} }netty Future Slf4j public class TestNettyFuture {public static void main(String[] args) throws Exception {NioEventLoopGroup group new NioEventLoopGroup(1);EventLoop eventLoop group.next();FutureInteger future eventLoop.submit(new CallableInteger() {Overridepublic Integer call() throws Exception {log.info(calc..);TimeUnit.SECONDS.sleep(2);return ThreadLocalRandom.current().nextInt(1, 10);}});//方式一同步方式//log.info(waiting..);//Integer integer future.get();//log.info(received..{}, integer);//方式二异步方式future.addListener(new GenericFutureListenerFutureInteger() {Overridepublic void operationComplete(FutureInteger future) throws Exception {log.info(waiting..);Integer integer future.getNow();log.info(received..{}, integer);}});} }netty Promise Slf4j public class TestNettyPromise {public static void main(String[] args) throws Exception {EventLoop eventLoop new NioEventLoopGroup(1).next();//与future不同的是可以主动创建promise对象。而不用像future一样通过提交任务获取对象。//结果容器PromiseInteger promise new DefaultPromise(eventLoop);new Thread(() - {log.info(calc..);try {TimeUnit.SECONDS.sleep(2L);//int i 2 / 0;//线程执行完毕后向promise填充结果promise.setSuccess(ThreadLocalRandom.current().nextInt(1, 10));} catch (InterruptedException e) {e.printStackTrace();promise.setFailure(e);}}).start();//接收结果log.info(waiting..);Integer integer promise.get();log.info(received..{}, integer);} }3.5 HandlerPipeline 3.5.1 Handler在Pipeline中执行顺序 ChannelHandler 用来处理 Channel 上的各种事件分为入站、出站两种。所有 ChannelHandler 被连成一串就是 Pipeline 入站处理器通常是 ChannelInboundHandlerAdapter 的子类主要用来读取客户端数据、写回结果出站处理器通常是 ChannelOutboundHandlerAdapter 的子类主要对写回结果进行加工 打个比喻每个 Channel 是一个产品的加工车间Pipeline 是车间中的流水线ChannelHandler 就是流水线上的各道工序而后面要讲的 ByteBuf 是原材料经过很多工序的加工先经过一道道入站工序再经过一道道出站工序最终变成产品 Slf4j public class TestPipeline {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//通过channel拿到pipelineChannelPipeline pipeline ch.pipeline();//添加处理器。netty会自动添加两个handler分别为head和tail//addLast并不是加到最后而是加到tail之前//channel的执行流程head-In_1-In_2-In_3-Out_4-Out_5-Out_6-tailpipeline.addLast(In_1, new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info(In_1);super.channelRead(ctx, msg);//唤醒下一个入站处理器}});pipeline.addLast(In_2, new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info(In_2);super.channelRead(ctx, msg);//唤醒下一个入站处理器}});pipeline.addLast(In_3, new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info(In_3);super.channelRead(ctx, msg);//唤醒下一个入站处理器。此处已经结尾了所以无所谓。//ctx.writeAndFlush(hello world);//表示从当前处理器往前找 写出 处理器ctx.channel().writeAndFlush(hello world);//表示从channel的tail往前找 写出 处理器}});pipeline.addLast(Out_4, new ChannelOutboundHandlerAdapter() {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info(Out_4);super.write(ctx, msg, promise);}});pipeline.addLast(Out_5, new ChannelOutboundHandlerAdapter() {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info(Out_5);super.write(ctx, msg, promise);}});pipeline.addLast(Out_6, new ChannelOutboundHandlerAdapter() {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info(Out_6);super.write(ctx, msg, promise);}});}}).bind(8080);} }Slf4j class TestPipelineClient {public static void main(String[] args) throws Exception {Channel channel new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder()).addLast(new StringDecoder()).addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info(received{}, msg);}}).addLast(new ChannelOutboundHandlerAdapter() {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info(writed{}, msg);super.write(ctx, msg, promise);}});}}).connect(localhost, 8080).sync().channel();while (true) {System.in.read();channel.writeAndFlush(hello world);}} }可以看到ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。 ChannelPipeline 的实现是一个 ChannelHandlerContext包装了 ChannelHandler 组成的双向链表 3.5.2 更方便地测试Handler执行顺序 Slf4j public class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info(h1);super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info(h2);super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 new ChannelOutboundHandlerAdapter() {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info(h3);super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 new ChannelOutboundHandlerAdapter() {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info(h4);super.write(ctx, msg, promise);}};EmbeddedChannel embeddedChannel new EmbeddedChannel(h1, h2, h3, h4);//测试入站log.info(测试入站);embeddedChannel.writeInbound(inbound);Object o embeddedChannel.readInbound();log.info(测试出站);embeddedChannel.writeOutbound(outbound);Object o1 embeddedChannel.readOutbound();} }3.6 ByteBuf 3.6.1 优势 io.netty.buffer.ByteBuf是对java.nio.ByteBuffer的增强。 支持动态扩容。最大容量不超过Integer最大值。池化思想。对直接内存影响最大保证享受了直接内存的高读写的同时又能有效避免重复开辟内存造成的性能损失。读写指针分离。内部使用两套指针标识读和写。与ByteBuffer相比就能减少不必要的来回切换。零拷贝。比如slice/duplicate/compositeByteBuf方便开发者高效编写。比如链式调用。 3.6.2 组成 ByteBuf 由四部分组成 废弃字节可读字节可写字节可扩容字节 该组成结构使得ByteBuf在使用上比ByteBuffer(如下图所示)方便许多因为节省了人为频繁切换指针位置的操作。 3.6.3 使用 池化 VS 非池化 池化的最大意义在于可以重用 ByteBuf优点有 没有池化则每次都得创建新的 ByteBuf 实例这个操作对直接内存代价昂贵就算是堆内存也会增加 GC 压力有了池化则可以重用池中 ByteBuf 实例并且采用了与 jemalloc 类似的内存分配算法提升分配效率高并发时池化功能更节约内存减少内存溢出的可能 池化功能是否开启可以通过下面的系统环境变量来设置 -Dio.netty.allocator.type{unpooled|pooled}4.1 以后非 Android 平台默认启用池化实现Android 平台启用非池化实现4.1 之前池化功能还不成熟默认是非池化实现 直接内存 VS 堆内存 可以使用下面的代码来创建池化基于堆内存的 ByteBuf ByteBuf buffer ByteBufAllocator.DEFAULT.heapBuffer(10);也可以使用下面的代码来创建池化基于直接内存的 ByteBuf ByteBuf buffer ByteBufAllocator.DEFAULT.directBuffer(10);直接内存 与 堆内存 的比较 直接内存创建和销毁的代价昂贵但读写性能高少一次内存复制适合配合池化功能一起用直接内存对 GC 压力小因为这部分内存不受 JVM 垃圾回收的管理但也要注意及时主动释放 调试工具类 首先创建一个调试工具类 import io.netty.buffer.ByteBuf;import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump; import static io.netty.util.internal.StringUtil.NEWLINE;public class DebugByteBuf {public static void log(ByteBuf buffer) {log(buffer, false);}public static void log(ByteBuf buffer, boolean pretty) {int length buffer.readableBytes();int rows length / 16 (length % 15 0 ? 0 : 1) 4;StringBuilder buf new StringBuilder(rows * 80 * 2).append(read index:).append(buffer.readerIndex()).append( write index:).append(buffer.writerIndex()).append( capacity:).append(buffer.capacity()).append(NEWLINE);if (pretty) {appendPrettyHexDump(buf, buffer);}System.out.println(buf.toString());} }创建 ByteBuf能自动扩容初始值256最大值为Integer最大范围 public class TestByteBuf {public static void main(String[] args) {//创建ByteBuf, 默认创建大小256个字节的最大为Integer最大值ByteBuf buffer ByteBufAllocator.DEFAULT.buffer();System.out.println(buffer.getClass());DebugByteBuf.log(buffer);//初始256StringBuilder sb new StringBuilder();for (int i 0; i 300; i) {sb.append(a);}buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));DebugByteBuf.log(buffer);//由于超过了容量256自动扩容到512} }写入 大小端存储 Big Endian(大端存储)和Little Endian(小端存储)是两种不同的字节存储方式用于表示一个多字节数据类型在内存中的存储顺序。 不要将字节与位的关系混淆。 计算机中用来表示内存储器容量大小的基本单位是字节Byte此处是讲多字节数据类型的存储顺序。 Big Endian(大端存储)是指内存的低地址存储高位字节。 Little Endian(小端存储)是指内存的低地址存储低位字节。 操作系统都采用小端存储模式 通讯协议则采用大端存储模式 测试大小端存储时顺序 public class DebugEndian {public static void debug(long num) {// 将 long 变量转换为大端顺序的字节数组byte[] bigEndian ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN).putLong(num).array();// 将 long 变量转换为小端顺序的字节数组byte[] littleEndian ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN).putLong(num).array();// 打印大端和小端字节数组System.out.println(Big-endian dec: Arrays.toString(bigEndian));System.out.println(Little-endian dec: Arrays.toString(littleEndian));String[] bigEndianHex new String[bigEndian.length];for (int i 0; i bigEndian.length; i) {bigEndianHex[i] Integer.toHexString(bigEndian[i]);}String[] littleEndianHex new String[littleEndian.length];for (int i 0; i bigEndian.length; i) {littleEndianHex[i] Integer.toHexString(littleEndian[i]);}System.out.println(Big-endian hex: Arrays.toString(bigEndianHex));System.out.println(Little-endian hex: Arrays.toString(littleEndianHex));}public static void main(String[] args) {// 声明一个值为 0x112345678 的 long 变量//long num 0x112345678L;long num 0x250L;debug(num);} }方法列表 方法列表省略一些不重要的方法 方法含义备注writeBoolean(boolean value)写入 boolean 值占1字节非零为真writeByte(int value)写入 byte 值占1字节writeShort(int value)写入 short 值占2字节writeInt(int value)写入 int 值占4字节Big Endian(大端写入)如 250写入后16进制表示 00 00 00 fawriteIntLE(int value)写入 int 值占4字节Little Endian(小端写入)如 250写入后16进制表示 fa 00 00 00writeLong(long value)写入 long 值占8字节writeChar(int value)写入 char 值占2字节writeFloat(float value)写入 float 值占4字节writeDouble(double value)写入 double 值占8字节writeBytes(ByteBuf src)写入 netty 的 ByteBufwriteBytes(byte[] src)写入 byte[]writeBytes(ByteBuffer src)写入 nio 的 ByteBufferint writeCharSequence(CharSequence sequence, Charset charset)写入字符串 注意 这些方法的未指明返回值的其返回值都是 ByteBuf意味着可以链式调用网络传输默认习惯是 Big Endian 大端存储CharSequence 是个接口像 String/StringBuilder 都实现该接口 扩容 public class TestByteBufWrite {public static void main(String[] args) {ByteBuf buffer ByteBufAllocator.DEFAULT.buffer(3);buffer.writeBytes(new byte[]{1,2});DebugByteBuf.log(buffer,true);buffer.writeInt(250);DebugByteBuf.log(buffer,true);buffer.writeIntLE(250);DebugByteBuf.log(buffer,true);} }由上图可验证大小端存储相关知识。 不过由上图也发现进行了自动扩容 ByteBuf 的扩容规则是 若容量小于16则扩容后16若容量大于16小于64则扩容后64若容量大于64则每次扩容为当前容量的2倍扩容不能超过 max capacity 会报错 读取 方法列表 方法名含义备注int readByte()读取一个字节会向后移动读指针ByteBuf markReaderIndex()将当前位置定义为读标记。默认是0ByteBuf resetReaderIndex()重置到读标记getXXX读取不会改变读指针 3.6.4 内存释放 释放原理 由于 Netty 中有多种内存的 ByteBuf 实现因此要灵活处理 UnpooledHeapByteBuf 使用的是 JVM 内存只需等 GC 回收内存即可UnpooledDirectByteBuf 使用的就是直接内存了需要特殊的方法来回收内存手动释放PooledByteBuf 和它的子类使用了池化机制需要更复杂的规则来回收内存手动释放还给内存池 不过Netty 为了方便开发者手动释放内存采用了引用计数法来控制回收内存每个 ByteBuf 都实现了 io.netty.util.ReferenceCounted 接口 每个 ByteBuf 对象初始后计数为 1调用 release 方法计数减 1如果计数为 0ByteBuf 内存被回收调用 retain 方法计数加 1表示调用者没用完之前其它 handler 即使调用了 release 也不会造成回收当计数为 0 时底层内存会被回收这时即使 ByteBuf 对象还在其各个方法均无法正常使用 计数为0后的释放逻辑在io.netty.buffer.AbstractReferenceCountedByteBuf#deallocate方法中 谁来负责 release 呢 不是我们想象的一般情况下 ByteBuf buf ... try {... } finally {buf.release(); }请思考因为 pipeline 的存在一般需要将 ByteBuf 传递给下一个 ChannelHandler如果在 finally 中 release 了就失去了传递性当然如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命那么便无须再传递 基本规则是谁是最后使用者谁负责 release。 在pipeline中head与tail两个处理器可以自动做收尾工作 入站msgtail 对 ByteBuf 进行释放出站msghead 对 ByteBuf 进行释放 开发者不能完全依赖head与tail 如果用户在某个handler中并没有将ByteBuf往后面的处理器传这时候收尾的head与tail就失去了作用因为你根本没把资源传递给我我咋释放啊 tail源码 tail只处理入站写出都是通过outHandler执行的所以跟tail也没啥关系。因此代码中也只实现入站处理器。 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {}查看io.netty.channel.DefaultChannelPipeline.TailContext#channelRead方法往下跟找到如下代码 protected void onUnhandledInboundMessage(Object msg) {try {logger.debug(Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration., msg);} finally {ReferenceCountUtil.release(msg);} } //ReferenceCountUtil public static boolean release(Object msg) {if (msg instanceof ReferenceCounted) {return ((ReferenceCounted) msg).release();}return false; }head源码 head既处理入站也处理出站因此两个处理器都实现 final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler{}查看io.netty.channel.DefaultChannelPipeline.HeadContext#write方法往下跟找到如下代码 public final void write(Object msg, ChannelPromise promise) {assertEventLoop();ChannelOutboundBuffer outboundBuffer this.outboundBuffer;if (outboundBuffer null) {// If the outboundBuffer is null we know the channel was closed and so// need to fail the future right away. If it is not null the handling of the rest// will be done in flush0()// See https://github.com/netty/netty/issues/2362safeSetFailure(promise, newClosedChannelException(initialCloseCause));// release message now to prevent resource-leakReferenceCountUtil.release(msg);return;}int size;try {msg filterOutboundMessage(msg);size pipeline.estimatorHandle().size(msg);if (size 0) {size 0;}} catch (Throwable t) {safeSetFailure(promise, t);ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg, size, promise); }3.6.5 深拷贝 会将底层内存数据进行深拷贝因此无论读写都与原始 ByteBuf 无关 public class TestCopy {public static void main(String[] args) {ByteBuf buf1 ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 buf1.copy();//验证深拷贝buf2.setByte(0, 10);log(buf1, true);log(buf2, true);} }3.6.6 零拷贝 零拷贝不进行内存复制使用原有内存其相关应用分为两大类 分 sliceduplicate 合 composite 切片slice 【零拷贝】的体现之一对原始 ByteBuf 进行切片成多个 ByteBuf切片后的 ByteBuf 并没有发生内存复制还是使用原始 ByteBuf 的内存切片后的 ByteBuf 维护独立的 read 和 write 指针 注意 切片后的ByteBuf底层是SlicedByteBuf再写内容对原始数据有影响因此SlicedByteBuf禁止写入。 同样地原内容发生变化SlicedByteBuf也受到影响 所以实际编写代码时需要添加引用和手动释放 验证并没有发生数据复制 public class TestSlice {public static void main(String[] args) {ByteBuf buf ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});log(buf, true);//在切片过程中并未发生数据复制ByteBuf buf1 buf.slice(0, 3);buf1.retain();//引用一次ByteBuf buf2 buf.slice(3, 3);buf2.retain();ByteBuf buf3 buf.slice(6, 4);buf3.retain();log(buf1, true);log(buf2, true);log(buf3, true);//使用如下代码验证并没有进行数据复制//注意toString中的hashCode并非表示地址值只是哈希值而已buf1.setByte(0, 10);log(buf1, true);log(buf, true);System.out.println(引用次数buf.refCnt());//4} }浅拷贝duplicate 【零拷贝】的体现之一就好比截取了原始 ByteBuf 所有内容并且没有 max capacity 的限制也是与原始 ByteBuf 使用同一块底层内存只是读写指针是独立的 实际编写代码时需要添加引用和手动释放 组合compositeBuffer 【零拷贝】的体现之一可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf避免拷贝 public class TestComposite {public static void main(String[] args) {ByteBuf buf1 ByteBufAllocator.DEFAULT.buffer(5);buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 ByteBufAllocator.DEFAULT.buffer(5);buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});发生数据复制的合并//ByteBuf buf ByteBufAllocator.DEFAULT.buffer(10);//buf.writeBytes(buf1).writeBytes(buf2);//log(buf, true);//不发生数据复制的合并CompositeByteBuf buf ByteBufAllocator.DEFAULT.compositeBuffer(10);buf.addComponents(true, buf1, buf2);buf1.retain();//添加引用防止buf1和buf2被释放buf2.retain();//添加引用防止buf1和buf2被释放log(buf, true);//验证未发生数据合并buf1.setByte(0, 10);log(buf, true);System.out.println(buf1.refCnt());System.out.println(buf2.refCnt());} }实际编写代码时需要添加引用和手动释放 Unpooled 是一个工具类类如其名提供了非池化的 ByteBuf 创建、组合、复制等操作 这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法底层即CompositeByteBuf可以用来包装 ByteBuf ByteBuf buf1 ByteBufAllocator.DEFAULT.buffer(5); buf1.writeBytes(new byte[]{1, 2, 3, 4, 5}); ByteBuf buf2 ByteBufAllocator.DEFAULT.buffer(5); buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf ByteBuf buf3 Unpooled.wrappedBuffer(buf1, buf2); System.out.println(ByteBufUtil.prettyHexDump(buf3));输出 -------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... | -------------------------------------------------------------------------也可以用来包装普通字节数组底层也不会有拷贝操作 ByteBuf buf4 Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6}); System.out.println(buf4.getClass()); System.out.println(ByteBufUtil.prettyHexDump(buf4));输出 class io.netty.buffer.CompositeByteBuf-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 01 02 03 04 05 06 |...... | -------------------------------------------------------------------------T.buffer(5); buf1.writeBytes(new byte[]{1, 2, 3, 4, 5}); ByteBuf buf2 ByteBufAllocator.DEFAULT.buffer(5); buf2.writeBytes(new byte[]{6, 7, 8, 9, 10}); // 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf ByteBuf buf3 Unpooled.wrappedBuffer(buf1, buf2); System.out.println(ByteBufUtil.prettyHexDump(buf3)); 输出 -------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |±-------±------------------------------------------------±--------------- |00000000| 01 02 03 04 05 06 07 08 09 0a |… | ±-------±------------------------------------------------±--------------- 也可以用来包装普通字节数组底层也不会有拷贝操作java ByteBuf buf4 Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6}); System.out.println(buf4.getClass()); System.out.println(ByteBufUtil.prettyHexDump(buf4));输出 class io.netty.buffer.CompositeByteBuf-------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f | ------------------------------------------------------------------------- |00000000| 01 02 03 04 05 06 |...... | -------------------------------------------------------------------------
http://www.hkea.cn/news/14428298/

相关文章:

  • 用python做 网站论坛三原网站建设
  • 网站建设功能介绍保定网站建设的过程
  • 十堰百度网站建设c 转网站开发
  • 有没有什么排版的网站基于 wordpress
  • 做模型找三视图那些网站如何利用网络广告进行推广
  • 天水建设银行网站青海公路建设信息服务网站
  • 化工网站建站模板下载免费制作音乐的软件app
  • 网站对联模板友汇网网站建设
  • 网站编排物流托运
  • 电子商务网站建设的核心多选wordpress 网盘 插件
  • 佳城建站 网站做会计公司网站的目录
  • 珠宝首饰商城网站建设无网站网络营销
  • 网站维护费进入哪个科目19年做哪个网站致富
  • flash 如何做游戏下载网站wordpress幻灯片链接
  • 网站建设夬金手指排名壹柒wordpress登陆页插件面
  • 钢管网站模板网站信息安全保障制度建设情况
  • 网站前台设计工具wordpress表单样式
  • 成都网站seo推广今天北京感染了多少人
  • 羊肉口报关做网站wordpress一键生成
  • 辽宁建设工程信息网官网新网站如何进入旅游电子商务网站开发项目进度表
  • 服务器搭建网站域名配置运城小程序开发公司
  • 咸宁网站seo怎样做能直接上传微信的视频网站
  • 北京公司网站制作流程做海外网站交税吗
  • 学校网站建设需求文档中建海峡建设发展有限公司网站
  • dw网站制作的源代码网站建设智能优化
  • 大连网页模板建站wordpress pdo mysql扩展
  • 玉环县企业网站建设门户网站建设 总结
  • 央企网站群建设wordpress区块链快讯模板
  • wordpress绑定网站北京房地产开发商排名
  • 做网站写概要设计wordpress -editor