江门微信网站建设,郑州新闻发布会最新消息今天,浏览器怎样屏蔽网站,服装定制店名怎么取文章目录 前言一、为什么需要设计应用层缓冲区必须要有 output buffer目的问题output buffer的解决方案#xff1a; 必须要有 input buffer总结 二、设计要点三、buffer设计思路基础函数关于iovec与readv readfd如何实现动态扩容 问题 前言
在上一个博客#xff0c;我们介绍… 文章目录 前言一、为什么需要设计应用层缓冲区必须要有 output buffer目的问题output buffer的解决方案 必须要有 input buffer总结 二、设计要点三、buffer设计思路基础函数关于iovec与readv readfd如何实现动态扩容 问题 前言
在上一个博客我们介绍到什么是缓冲区出发然后也分析了epoll 两个模式使用阻塞与非阻塞缓冲区的区别。 epoll与socket缓冲区的恩恩怨怨 本文介绍如何设计一个合理的内部逻辑稳定的读写缓冲区。基于Muduo库的设计思想。
一、为什么需要设计应用层缓冲区
基于Muduo库的应用缓冲区源码以及陈硕大神的博客进行实现与总结。
大多数的网络模型是非阻塞IO模型即每次send() 不一定全发完没发完的数据要用一个容器进行接收,所以必须要实现应用层缓冲区.
如果是水平触发那么套接字会一直处于可读状态io多路复用函数会一直认为这个套接字被激活也就是说如果第一次触发后没有将tcp缓冲区中的数据全部读出那么下次进行到poll函数时会立即返回因为套接字一直是可读的。这会导致了busy loop问题。
如果是边缘触发那么就只会触发一次即使第一次触发没有将所有数据都读走下次进行到poll也不会再触发套接字的可读状态直到下次又有一批数据送至tcp缓冲区中才会再次触发可读。所以有可能存在漏读数据的问题万一不会再有数据到来呢此时tcp缓冲区中仍然有数据而应用程序却不知道。
这样一来应用层的缓冲是必须的每个 TCP socket 都要有 stateful 的 input buffer 和 output buffer。
必须要有 output buffer
目的
网络库需要为每个TCP连接配置输出缓冲区以便处理数据的发送和缓冲并且需要根据套接字的可写状态进行相应的处理和调度。这样可以实现高效的数据发送和事件处理使程序能够快速返回事件循环提高整体的性能和响应能力。
问题
程序想通过 TCP 连接发送 100k 字节的数据但是在 write() 调用中操作系统只接受了 80k 字节受 TCP advertised window 的控制细节见 TCPv1你肯定不想在原地等待因为不知道会等多久取决于对方什么时候接受数据然后滑动 TCP 窗口。程序应该尽快交出控制权返回 event loop。在这种情况下剩余的 20k 字节数据怎么办
output buffer的解决方案
1、对于应用程序而言它只管生成数据它不应该关心到底数据是一次性发送还是分成几次发送这些应该由网络库来操心程序只要调用 TcpConnection::send() 就行了网络库会负责到底。网络库应该接管这剩余的 20k 字节数据把它保存在该 TCP connection 的 output buffer 里然后注册 POLLOUT 事件一旦 socket 变得可写就立刻发送数据。当然这第二次 write() 也不一定能完全写入 20k 字节如果还有剩余网络库应该继续关注 POLLOUT 事件如果写完了 20k 字节网络库应该停止关注 POLLOUT以免造成 busy loop。 2、如果在发送过程中输出缓冲区仍然有待发送的数据而程序又要写入新的数据网络库应该将新的数据追加到输出缓冲区的末尾等待下次套接字可写时再发送。这样可以避免频繁的写入操作导致的性能下降。 3、如果程序想要关闭连接时但输出缓冲区中仍有待发送的数据网络库不能立即关闭连接。相反它应该等待数据发送完毕后再关闭连接以确保数据不会丢失。
必须要有 input buffer
TcpConnection必须要有input buffer TCP是一个无边界的字节流协议接收方必须要处理“收到的数据尚不构成一条完整的消息”和“一次收到两条消息的数据”等等情况。一个常见的场景是发送方send了两条10k字节的消息共20k接收方收到数据的情况可能是
一次性收到20k数据 分两次收到第一次5k第二次15k 分三次收到第一次6k第二次8k第三次6k 等等任何可能 以上情况俗称“粘包”问题。
网络库在处理“socket可读”事件的时候必须一次性把socket中数据读完从操作系统buffer搬到应用层buffer否则会反复触发POLLIN事件造成busy loop。 如何处理 接收到数据存在input buffer通知上层的应用程序OnMessage(buffer)回调根据应用层协议判定是否是一个完整的包进行codec解码如果不是一条完整的消息不会取走数据也不会进行相应的处理。如果是一条完整的消息将取走这条消息并进行相应的处理。如何处理就是上层应用程序的职责了。
总结
Non-blocking IO 的核心思想是避免阻塞在 read() 或 write() 或其他 IO 系统调用上这样可以最大限度地复用 thread-of-control让一个线程能服务于多个 socket 连接。IO 线程只能阻塞在 IO-multiplexing 函数上如 select()/poll()/epoll_wait()。这样一来应用层的缓冲是必须的每个 TCP socket 都要有 stateful 的 input buffer 和 output buffer。muduo库都是带缓冲的I/O不会自己去read()或write()某个socket只会操作TcpConnection的input buffer和output buffer。更确切的说是在OnMessage()回调里读取input buffer调用TcpConnection::send()来间接操作output buffer一般不会直接操作output buffer。 所以设计应用层自己的缓冲区是很有必要的也就是由应用程序来管理缓冲区问题
二、设计要点
陈硕大神的总结如下 应用缓冲区对外表现为一块连续的内存(char, len)以方便客户代码的编写。其 size() 可以自动增长以适应不同大小的消息。它不是一个 fixed size array (即 char buf[8192])。内部以 vector of char 来保存数据并提供相应的访问函数。* 要点 1、应用层缓冲区通常很大也可以初始很小但可以通过动态调整改变大小vector 2、当用户想要调用write/send写入数据给对端如果数据可以全部写入那么写入就好了。如果写入了部分数据或者根本一点数据都写不进去此时表明内核缓冲区已满为了不阻塞当前线程应用层写缓冲区会接管这些数据等到内核缓冲区可以写入的时候自动帮用户写入。 3、当有数据到达内核缓冲区应用层的读缓冲区会自动将这些数据读到自己那里当用户调用read/recv想要读取数据时应用层读缓冲区将已经从内核缓冲区取出的数据返回给用户实际上就是用户从应用层读缓冲区读取数据 4、应用层缓冲区对用户而言是隐藏的用户可能根本不知道有应用层缓冲区的存在只需读/取数据而且也不会阻塞当前线程
三、buffer设计思路
/*1-----2---3-------4------51是begin2是kCheapPrepend 表示8字节头部3是prependableBytes也就是readerIndex_ 4是writerIndex_5是buffer_.size()1-2是 头部信息大小2-3是 已经读过来的 缓冲区 空闲的prependableBytes() - kCheapPrepend3-4是 readableBytes 要读的空间 也就是writerIndex_ - readerIndex_4-5是 writableBytes 可写的空间 也就是是buffer_.size() - writerIndex_prependableBytes() - kCheapPrepend 就是已经读了的 空闲出来的
加上可以写的就是中共能够写入的如果不够就要resize
如果够那么 就需要挪一下 把已经读的了与可以写的拼在一起
*/muduo应用层缓冲区的设计采用std::vector数据结构一方面内存是连续的方便管理另一方面vector自带的增长模式足以应对动态调整大小的任务 缓冲区Buffer的定义如下只列出了一些重要部分 主要就是利用两个指针readerIndexwriterIndex分别记录着缓冲区中数据的起点和终点写入数据的时候追加到writeIndex后面读出数据时从readerIndex开始读。在readerIndex前面预留了几个字节大小的空间方便日后为数据追加头部信息。缓冲区在使用的过程中会动态调整readerIndex和writerIndex的位置初始缓冲区为空readerIndex writerIndex。
Muduo Buffer 的 size() 是自适应的它一开始的初始值是 1k如果程序里边经常收发 10k 的数据那么用几次之后它的 size() 会自动增长到 10k然后就保持不变。这样一方面避免浪费内存有的程序可能只需要 4k 的缓冲另一方面避免反复分配内存。当然客户代码可以手动 shrink() buffer size()。
以下是别人的总结 1.相比之下采用vector连续内存更容易管理同时利用std::vector自带的内存增长方式可以减少扩充的次数capacity和size一般不同2.记录缓冲区数据起始位置和结束位置写入时写到已有数据的后面读出时从数据起始位置读出3.起始/结束位置如上图的readerIndex/writeIndex其中readerIndex为缓冲区数据的起始索引下标writeIndex为结束位置下标。采用下标而不是迭代器的原因是删除(erase)数据时迭代器可能失效4.开头部分(readerIndex以前)是预留空间通常只有几个字节的大小可以用来写入数据的长度解决粘包问题5.读出和写入数据时会动态调整readerIndex/writeIndex如果没有数据二者相等基础函数
成员变量 static const size_t kCheapPrepend 8; //默认预留8个字节static const size_t kInitialSize 1024; //初始大小
private:std::vectorchar buffer_; //vector用于替代固定数组size_t readerIndex_; //读位置size_t writerIndex_; //写位置
Buffer获取各个长度的方法
//可读大小size_t readableBytes() const{ return writerIndex_ - readerIndex_; }//可写大小size_t writableBytes() const{ return buffer_.size() - writerIndex_; }//预留大小size_t prependableBytes() const{ return readerIndex_; }获取可读下标
//读的下标const char* peek() const{ return begin() readerIndex_; 返回缓冲区中可读数据的起始地址
const char* peek() const{return begin() readerIndex_;}把onMessage函数上报的Buffer数据转成string类型的数据返回
// 把onMessage函数上报的Buffer数据转成string类型的数据返回std::string retrieveAllAsString(){// 应用缓存区可读取长度writerIndex_ - readerIndex_数据的长度return retrieveAsString(readableBytes());}std::string retrieveAsString(size_t len){// 可读数据的 地址以及长度 构造出ret把readable的数据全部读取std::string result(peek(), len);// 上面一句把缓冲区中可读的数据已经读取出来这里肯定要对缓冲区进行复位操作retrieve(len);return result;}关于iovec与readv
引用博客 使用read()将数据读到不连续的内存要经过多次的调用read。如果要从文件中读一片连续的数据至进程的不同区域有两种方案 ①使用read()一次将它们读至一个较大的缓冲区中然后将它们分成若干部分复制到不同的区域 ②调用r©adO若干次分批将它们读至不同区域。同样如果想将程序中不同区域的数据块连续地写至文件也必须进行类似的处理。 缺点执行系统调用必然使得性能降低。
UNIX提供了另外两个函数—readv()它们只需一次系统调用就可以实现多个缓冲区之间传送数据免除了多次系统调用或复制数据的开销。readv()称为散布读即将文件中若干连续的数据块读入内存分散的缓冲区中。
这里为什么要用readv 因为我们预先不知道内核缓冲区的数据大小 在某些情况下应用缓冲区可能无法存储全部的读取数据需要额外的缓冲区进行存储。通过使用栈上的内存空间extrabuf存储额外的读取数据。 这样就带来了另外一个问题可能需要把内核缓冲区的数据保存到这个两个不同的内存区域中。 通过一次 readv 函数调用读入内存分散的缓冲区中。就能大大提高数据读取效率。
主要是为了解决应用缓冲区内存不够的情况下保证只是进行一次系统调用。
readfd
用户自定义缓冲区Buffer是有大小限制的我们一开始不知道TCP接收缓冲区中的数据量有多少如果一次性读出来会不会导致Buffer装不下而溢出。所以在readFd( )函数中会在栈上创建一个临时空间extrabuf然后使用readv的分散读特性将TCP缓冲区中的数据先拷贝到Buffer中如果Buffer容量不够就把剩余的数据都拷贝到extrabuf中然后再调整Buffer的容量(动态扩容)再把extrabuf的数据拷贝到Buffer中。当这个函数结束后extrabuf也会被释放。另外extrabuf是在栈上开辟的空间速度比在堆上开辟还要快。
ssize_t Buffer::readFd(int fd, int* saveErrno)
{/*在某些情况下应用缓冲区可能无法存储全部的读取数据需要额外的缓冲区进行存储。通过使用栈上的内存空间extrabuf存储额外的读取数据。需要将文件套接字接收缓冲中的数据读入不同位置时可以不必多次调用 read 函数而是通过一次 readv 函数调用就能大大提高数据读取效率。*/char extrabuf[65536] {0}; // 栈上的内存空间 64Kstruct iovec vec[2];// 这是Buffer底层缓冲区剩余的可写空间大小const size_t writable writableBytes();vec[0].iov_base begin() writerIndex_;vec[0].iov_len writable;vec[1].iov_base extrabuf;vec[1].iov_len sizeof extrabuf;// 保证缓冲区刚刚好 能够一次性读完const int iovcnt (writable sizeof extrabuf) ? 2 : 1;const ssize_t n ::readv(fd, vec, iovcnt);if (n 0){*saveErrno errno;}else if (n writable) // Buffer的可写缓冲区已经够存储读出来的数据了{writerIndex_ n;}else // extrabuf里面也写入了数据 {// writerIndex_开始写 n - writable大小的数据writerIndex_ buffer_.size();append(extrabuf, n - writable); }return n;
}readFd巧妙的设计可以让用户一次性把所有TCP接收缓冲区的所有数据全部都读出来并放到用户自定义的缓冲区Buffer中。
如何实现动态扩容
上面介绍到了如果用户自定义的缓冲区Buffer内存不够需要把extrabuf中的数据加入到我们的应用缓冲区中去这个时候我们的应用缓冲区就需要动态扩容了。主要是通过两种方式一种是直接扩容一种是内部腾挪的方式
在追加函数中 想要确保有足够的空间ensureWriteableBytes。 // 把[data, datalen]内存上的数据添加到writable缓冲区当中void append(const char *data, size_t len){// 追加到 beginWrite 后面 也就是 3-4是 readableBytes 要读的空间// 然后writerIndex_ 往后面挪ensureWriteableBytes(len);std::copy(data, datalen, beginWrite());writerIndex_ len;}如果writableBytes可写入的空间小雨将要存入数据的带下就需要makeSpace扩容 // 可写部分 是buffer_.size() - writerIndex_ // 要写 len 这么长需要对比一下可写缓存区的 长度// 如果太小要扩容void ensureWriteableBytes(size_t len){if (writableBytes() len){makeSpace(len); // 扩容函数}}prependableBytes() - kCheapPrepend 就是已经读了的 空闲出来的加上可以写的就是总共能够写入的如果不够就要resize 如果够那么 就需要挪一下 把已经读的了与可以写的拼在一起。 void makeSpace(size_t len){if (writableBytes() prependableBytes() - kCheapPrepend len ){// 腾不出这个大小 就要resizebuffer_.resize(writerIndex_ len);}else{size_t readalbe readableBytes();std::copy(begin() readerIndex_, begin() writerIndex_,begin() kCheapPrepend);readerIndex_ kCheapPrepend;writerIndex_ readerIndex_ readalbe;}}问题
为什么不在Buffer构造时就开辟足够大的缓冲区 1.每个tcp连接都有输入/输出缓冲区如果连接过多则内存消耗会很大 2.防止客户端与服务器端数据交互比较少造成缓冲区的浪费 3.当缓冲区大小不足时利用vector内存增长的优势扩充缓冲区
为什么不在读数据之前判断一下应用层缓冲区是否可以容纳内核缓冲区的全部数据 1.采用这种方式就会调用一次recv传入MSG_PEEK,即recv(sockfd, extrabuf, sizeof(extrabuf), MSG_PEEK)可根据返回值判断缓冲区还有多少数据没有接收然后再调用一次recv从内核冲读取数据 2.但是这样会执行两次系统调用得不偿失尽量使用一次系统调用就将所有数据读出这就需要一个很大的空间