无为县建设局网站,建设个电商平台网站需要多少钱,杭州网站建设找思创网络,上海网站建设高端Netty学习笔记#xff08;一#xff09;
在的互联网环境下#xff0c;分布式系统大行其道#xff0c;而分布式系统的根基在于网络编程#xff0c;而 Netty 恰恰是 Java 领域网络编程的王者。如果要致力于开发高性能的服务器程序、高性能的客户端程序#xff0c;必须掌握…Netty学习笔记一
在的互联网环境下分布式系统大行其道而分布式系统的根基在于网络编程而 Netty 恰恰是 Java 领域网络编程的王者。如果要致力于开发高性能的服务器程序、高性能的客户端程序必须掌握 Netty。
视频链接黑马程序员Netty全套教程全网最全Netty深入浅出教程Java网络编程的王者
NIO基础
non-blocking io 非阻塞IO
1、三大组件
Channel、Buffer、Selector
Channel 有一点类似于 stream它就是读写数据的双向通道
常见的Channel有
FileChannelDatagramChannelSocketChannelServerSocketChannel
Buffer则用来缓冲读写数据常见的Buffer有
ByteBufferShortBufferIntBufferLongBufferFloatBufferDoubleBufferCharBuffer
服务器设计——多线程版
❗ 缺点
内存占用高线程上下文切换成本高只适合连接少的场景
服务器设计——线程池版
❗ 缺点
阻塞模式下线程仅能处理一个socket连接仅适合短连接场景
服务器设计——Selector版
Selector的作用就是配合一个线程来管理多个channel获取这些channel上发生的事件这些channel工作在非阻塞模式下不会让线程吊死在一个channel上适合连接数特别多但流量低的场景low traffic
调用selector的select()会阻塞直到channel发生了读写就绪事件这些事件发生select方法就会返回这些事件交给thread来处理
2、ByteBuffer
正确使用姿势
向buffer写入数据例如调用channel.read(buffer)调用flip()切换至读模式从buffer读取数据例如调用buffer.get()调用clear() 或 compact()切换至写模式重复1-4步骤
Slf4j
public class TestByteBuffer {public static void main(String[] args) {// FileChannel// 1、输入输出流 2、RandomAccessFiletry (FileChannel channel new FileInputStream(data.txt).getChannel()) {// 准备缓存区ByteBuffer buffer ByteBuffer.allocate(10);while (true) {// 从channel读取数据向buffer写入int len channel.read(buffer);log.debug(读取到的字节数 {}, len);if (len -1) { // 没有内容break;}// 打印buffer的内容buffer.flip(); // 切换读模式while (buffer.hasRemaining()) { // 是否还有剩余未读数据byte b buffer.get();log.debug(读取到的字节 {}, (char) b);}// 切换写模式buffer.clear();}} catch (IOException e) {e.printStackTrace();}}
}ByteBuffer有以下重要属性
capacitypositionlimit
写模式下position是写入位置limit等于容量
flip动作发生后position切换为读取位置limit切换为读取限制
常见方法
分配空间
可以使用allocate方法为ByteBuffer分配空间 /** class java.nio.HeapByteBuffer java堆内存读写效率较低受到gc影响* class java.nio.DirectByteBuffer 直接内存读写效率高少一次拷贝不受gc影响分配效率低*/
System.out.println(ByteBuffer.allocate(16).getClass());
System.out.println(ByteBuffer.allocateDirect(16).getClass());写入数据
调用channel的read方法调用buffer自己的put方法
读取数据
调用channel的write方法调用buffer自己的get方法
get方法会让position读指针向后走如果想重复读取数据
可以调用rewind方法将position重新置为0或者调用get(int i)方法获取索引i的内容它不会移动读指针
public class TestByteBufferRead {public static void main(String[] args) {ByteBuffer buffer ByteBuffer.allocate(16);buffer.put(new byte[]{a, b, c, d});buffer.flip();// 从头开始读buffer.get(new byte[4]);ByteBufferUtil.debugAll(buffer);buffer.rewind();System.out.println((char) buffer.get());buffer.rewind();// mark reset// mark 做一个标记记录position位置reset是将position重置到mark的位置System.out.println((char) buffer.get());System.out.println((char) buffer.get());buffer.mark(); // 加标记索引为2的位置System.out.println((char) buffer.get());System.out.println((char) buffer.get());buffer.reset(); // 将position重置到上次标记的位置System.out.println((char) buffer.get());System.out.println((char) buffer.get());// get(i) 不会改变读索引的位置System.out.println((char) buffer.get(1));ByteBufferUtil.debugAll(buffer);}
}字符串与ByteBuffer互转
public class TestByteBufferString {public static void main(String[] args) {// 1. 字符串转为ByteBufferByteBuffer buffer ByteBuffer.allocate(16);buffer.put(hello.getBytes());ByteBufferUtil.debugAll(buffer);// 2. CharsetByteBuffer buffer1 StandardCharsets.UTF_8.encode(hello);ByteBufferUtil.debugAll(buffer1);// 3. wrapByteBuffer buffer2 ByteBuffer.wrap(hello.getBytes(StandardCharsets.UTF_8));ByteBufferUtil.debugAll(buffer2);// 转为字符串String s StandardCharsets.UTF_8.decode(buffer2).toString();System.out.println(s);buffer.flip();String s1 StandardCharsets.UTF_8.decode(buffer).toString();System.out.println(s1);}
}Scattering Reads
分散读取有一个文本文件3parts.txt
public class TestScatteringReads {public static void main(String[] args) {try (FileChannel channel new RandomAccessFile(3parts.txt, r).getChannel()) {ByteBuffer b1 ByteBuffer.allocate(3);ByteBuffer b2 ByteBuffer.allocate(3);ByteBuffer b3 ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{b1, b2, b3});b1.flip();b2.flip();b3.flip();ByteBufferUtil.debugAll(b1);ByteBufferUtil.debugAll(b2);ByteBufferUtil.debugAll(b3);} catch (IOException e) {e.printStackTrace();}}
}Gathering Writes
public class TestGatheringWrites {public static void main(String[] args) {ByteBuffer b1 StandardCharsets.UTF_8.encode(hello);ByteBuffer b2 StandardCharsets.UTF_8.encode(world);ByteBuffer b3 StandardCharsets.UTF_8.encode(您好);try (FileChannel channel new RandomAccessFile(words.txt, rw).getChannel()) {channel.write(new ByteBuffer[]{b1, b2, b3});} catch (IOException e) {e.printStackTrace();}}
}ByteBuffer 黏包 半包
需求
/*
网络上有多条数据发送给服务端数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时被进行了重新组合例如原始数据有3条为
Hello,world\n
Im zhangsan\n
How are you?\n
变成了下面的两个ByteBuffer黏包半包
Hello,world\nIm zhangsan\nHo
w are you?\n
现在要求你编写程序将错乱的数据恢复成原始的按 \n 分隔的数据
*/具体实现
package com.example.netty.ch1;import com.example.netty.utils.ByteBufferUtil;import java.nio.ByteBuffer;public class TestByteBufferCase {public static void main(String[] args) {ByteBuffer source ByteBuffer.allocate(32);source.put(Hello,world\nIm zhangsan\nHo.getBytes());split(source);source.put(w are you?\n.getBytes());split(source);}private static void split(ByteBuffer source) {source.flip();for (int i 0; i source.limit(); i) {// 找到一条完整的消息if (source.get(i) \n) {int length i 1 - source.position();// 把完整消息存入新的ByteBufferByteBuffer target ByteBuffer.allocate(length);// 从source读向target写for (int j 0; j length; j) {byte b source.get();target.put(b);}ByteBufferUtil.debugAll(target);}}source.compact();}
}3、文件编程
FileChannel
⚠注意 FileChannel 只能工作在阻塞模式下 获取FileChannel
通过FileInputStream获取的channel只能读通过FileOuputStream获取的channel只能写通过RandomAccessFile是否能读写根据构造RandomAccessFile时的读写模式决定
读取
会从channel读取数据填充ByteBuffer返回值表示读到了多少字节-1表示到达了文件的末尾
int len channel.read(buffer);写入
ByteBuffer buffer ...;
buffer.put(...); // 写入数据
buffer.flip(); // 切换读模式while (buffer.hasRemaining()) {channel.write(buffer);
}关闭
channel必须关闭
channel.close();两个Channel传输数据
public class TestFileChannelTransferTo {public static void main(String[] args) {try (FileChannel from new FileInputStream(data.txt).getChannel();FileChannel to new FileOutputStream(to.txt).getChannel()) {// 效率高底层使用操作系统的零拷贝进行优化2g 数据long size from.size();// i 代表还剩余多少个字节for (long i size; i 0;) {i - from.transferTo((size - i), i, to);}} catch (IOException exception) {exception.printStackTrace();}}
}Path
JDK7 引入了Path和Paths类
Path 用来表示文件路径Paths 是工具类用来获取Path实例
Path source Paths.get(test.txt);
Path source Paths.get(d:\\test.txt);
Path source Paths.get(d:/test.txt);
Path source Paths.get(d:\\data, test.txt);正常化路径path.normalize()
Files
检查文件是否存在Files.exists(path)
创建一级目录Files.createDirectory(path)
创建多级目录Files.createDirectories(path)
拷贝文件Files.copy(source, target)
移动文件Files.move(source, target, StandardCopyOption.ATOMIC_MOVE)
删除文件Files.delete(target)
遍历目录
public class TestFilesWalkFileTree {public static void main(String[] args) throws IOException {AtomicInteger dirCount new AtomicInteger();AtomicInteger fileCount new AtomicInteger();Files.walkFileTree(Paths.get(d:/env/jdk11), new SimpleFileVisitorPath() {Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println( dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);fileCount.incrementAndGet();return super.visitFile(file, attrs);}});System.out.println(dir count: dirCount);System.out.println(file count: fileCount);}
}4、网络编程
阻塞
非阻塞
多路复用
单线程可以配合Selector完成对多个Channel可读写事件的监控这称之为多路复用
多路复用仅针对网络IO、普通文件IO没法利用多路复用如果不用Selector的非阻塞模式线程大部分时间都在做无用功而Selector能够保证 有连接事件时才去连接有可读事件才去读取有可写事件才去写入限于网络传输能力Channel未必时时可写一旦Channel可写会触发Selector的可写事件
监听Channel事件
可以通过下面三种方法来监听是否有事件发送方法的返回值代表有多少channel发生了事件
方法一阻塞直到绑定事件发生
int count selector.select();方法二阻塞直到绑定事件发生或是超时时间单位为ms
int count selector.select(long timeout);方法三不会阻塞也就是不管有没有事件立刻返回自己根据返回值检查是否有事件
int count selector.selectNow();select何时不阻塞 事件发生时 客户端发起连接请求会触发accept事件客户端发送数据过来客户端正常、异常关闭时都会触发read事件另外如果发送地 数据大于buffer缓冲区会触发多次读取事件channel可写会触发write事件在Linux下 nio bug 发生时 调用selector.wakeup() 调用selector.close() selector 所在线程interrupt 利用多线程优化 现在都是多核CPU设计时要充分考虑如何发挥多核CPU的优势 package com.example.netty.ch4;import com.example.netty.utils.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName(boss);ServerSocketChannel ssc ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss Selector.open();SelectionKey bossKey ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));Worker[] workers new Worker[Runtime.getRuntime().availableProcessors()];for (int i 0; i workers.length; i) {workers[i] new Worker(worker- i);}AtomicInteger index new AtomicInteger();while (true) {boss.select();IteratorSelectionKey iter boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc ssc.accept();sc.configureBlocking(false);log.debug(connected...{}, sc.getRemoteAddress());log.debug(before register...{}, sc.getRemoteAddress());// 负载均衡轮询workers[index.getAndIncrement() % workers.length].register(sc);log.debug(after register...{}, sc.getRemoteAddress());}}}}static class Worker implements Runnable {private Thread thread;private Selector selector;private String name;private volatile boolean start false;private ConcurrentLinkedQueueRunnable queue new ConcurrentLinkedQueue();public Worker(String name) {this.name name;}/*** 初始化线程和selector*/public void register(SocketChannel sc) throws IOException {if (!start) {selector Selector.open();thread new Thread(this, name);thread.start();start true;}// 向队列添加了任务但这个任务并没有立刻执行queue.add(() - {try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup(); // 唤醒 select 方法}Overridepublic void run() {while (true) {try {selector.select();Runnable task queue.poll();if (task ! null) {// 执行了 sc.register(selector, SelectionKey.OP_READ, null);task.run();}IteratorSelectionKey iter selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer ByteBuffer.allocate(16);SocketChannel channel (SocketChannel) key.channel();channel.read(buffer);buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}5、NIO vs BIO
stream vs channel
stream 不会自动缓冲数据channel会利用系统提供的发送缓冲区接收缓冲区更为底层stream仅支持阻塞APIchannel同时支持阻塞、非阻塞API网络channel可配合selector实现多路复用两者均为全双工即读写可以同时进行
IO模型
同步阻塞、同步非阻塞、多路复用、异步阻塞没有此情况、异步非阻塞
同步线程自己去获取结果一个线程
异步线程自己不去获取结果而是由其它线程送结果至少两个线程
当调用一个channel.read 或 stream.read 后会切换至操作系统内核态来完成真正数据读取而读取又分为两个阶段分别为
等待数据阶段复制数据阶段
IO模型
阻塞IO非阻塞IO多路复用信号驱动异步IO
零拷贝
仅只发生一次用户态到内核态的切换数据拷贝了2次。所谓的【零拷贝】并不是真正的无拷贝而是在不会拷贝重复数据到JVM内存中零拷贝的优点有
更少的用户态与内核态的切换不利用CPU计算减少CPU缓存伪共享零拷贝适合小文件传输
AIO
AIO 用来解决数据复制阶段的阻塞问题
同步意味着在进行读写操作时线程需要等待结果还是相当于闲置异步意味着在进行读写操作时线程不必等待结果而是将来由操作系统通过回调方式由另外的线程来获得结果 异步模型需要底层操作系统Kernel提供支持 Windows 系统通过IOCP实现了真正得异步IO Linux 系统异步IO在2.6版本引入但其底层实现还是用多路复用模拟了异步IO性能没有优势 参考资料
https://www.bilibili.com/video/BV1py4y1E7oA