成都网站制作推来客网站系统好吗,一般网站开发的硬件要求,实体行业做分销网站有什么好处,农业银行官网C 网络编程学习二 asio异步写操作asio异步读操作asio 异步echo服务端asio异步服务器中存在的隐患 asio异步写操作
async_write_some是异步写的函数#xff1a;传入buffer和回调函数以及参数以后#xff0c;发送后会调用回调函数。
void Session::WriteToSocketErr(const st… C 网络编程学习二 asio异步写操作asio异步读操作asio 异步echo服务端asio异步服务器中存在的隐患 asio异步写操作
async_write_some是异步写的函数传入buffer和回调函数以及参数以后发送后会调用回调函数。
void Session::WriteToSocketErr(const std::string buf) {// make_shared 延长_send_node 的生命周期。_send_node make_sharedMsgNode(buf.c_str(), buf.length());//异步发送数据因为异步所以不会一下发送完//async_write_some的回调函数要求是两个参数的发送多少以及可能返回的错误码都作为参数传递给回调函数。/*但是自己定义的函数参数为3个通过bind将三个参数转换为两个参数的普通函数。*/this-_socket-async_write_some(asio::buffer(_send_node-_msg, _send_node-_total_len),std::bind(Session::WriteCallBackErr,this, std::placeholders::_1, std::placeholders::_2, _send_node));
}回调函数需要判断是否所有数据都发送完成了如果没有继续调用回调函数进行发送。
void Session::WriteCallBackErr(const boost::system::error_code ec, std::size_t bytes_transferred,std::shared_ptrMsgNode msg_node)
{if (bytes_transferred msg_node-_cur_len msg_node-_total_len) {_send_node-_cur_len bytes_transferred;this-_socket-async_write_some(asio::buffer(_send_node-_msg_send_node-_cur_len,_send_node-_total_len-_send_node-_cur_len),std::bind(Session::WriteCallBackErr,this, std::placeholders::_1, std::placeholders::_2, _send_node));}
}上面的代码中存在的问题异步发送的过程是无序的用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr。那么有可能出现发送数据的顺序和想要的顺序不同。为了确保发送数据的顺序正确性在应用层使用一个队列保证发送顺序。并用一个标志位用来判断当前是否还有数据正在发送过程中。
class Session{
public:// 函数参数和上面的参数并无差别void WriteCallBack(const boost::system::error_code ec, std::size_t bytes_transferred);void WriteToSocket(const std::string buf);
private:std::queuestd::shared_ptrMsgNode _send_queue; // 发送数据顺序队列std::shared_ptrasio::ip::tcp::socket _socket;bool _send_pending;// 判断当前是否有数据正在发送,为true表示一个节点还未发送完。
};发送函数发送的时候先把数据插到队列中回调后将正在发送标志位置为true。
void Session::WriteToSocket(const std::string buf) {// 将要发送的数据插入队列_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));if (_send_pending) { //还有数据在发送的话就先returnreturn;}//异步发送数据因为异步所以不会一下发送完this-_socket-async_write_some(asio::buffer(buf), std::bind(Session::WriteCallBack,this, std::placeholders::_1, std::placeholders::_2));_send_pending true; // 调用一次async_write_some肯定会触发WriteCallBack这个时候将标志位置为true
}回调函数回调函数在执行时会首先判断队首元素是否全部发送出去了如果没有就继续发送队首元素。队首元素发送完毕后继续取出队列中的元素。
void Session::WriteCallBack(const boost::system::error_code ec, std::size_t bytes_transferred, std::shared_ptrMsgNode) {if (ec.value() ! 0) {//出错了std::cout Error , code is ec.value() . Message is ec.message();return;}// 取出队首元素auto send_data _send_queue.front();send_data-_cur_len bytes_transferred;//数据未发送完 则继续发送if (send_data-_cur_len send_data-_total_len) {this-_socket-async_write_some(asio::buffer(send_data-_msg send_data-_cur_len, send_data-_total_len - send_data-_cur_len),std::bind(Session::WriteCallBack,this, std::placeholders::_1, std::placeholders::_2));return;}_send_queue.pop();if (_send_queue.empty()) {_send_pending false;}else {// 队列不为空继续发送。auto send_data _send_queue.front();this-_socket-async_write_some(asio::buffer(send_data-_msg send_data-_cur_len, send_data-_total_len - send_data-_cur_len),std::bind(Session::WriteCallBack,this, std::placeholders::_1, std::placeholders::_2));}}async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度每次都要在回调函数判断发送数据是否完成asio提供了一个更简单的发送函数async_send这个函数在发送的长度未达到我们要求的长度时就不会触发回调所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送所以async_send不能和async_write_some混合使用。
void Session::WriteAllToSocket(const std::string buf) {// 将要发送的数据插入队列_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));if (_send_pending) return;// 异步发送数据数据不会一次发送完但是只会触发一次回调this-_socket-async_send(asio::buffer(buf), std::bind(Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2));_send_pending true;
}
void Session::WriteAllCallBack(const boost::system::error_code ec, std::size_t bytes_transferred) {if (ec.value() ! 0) {std::cout Error occured! Error code ec.value() . Message: ec.message();return;}//如果发送完则pop出队首元素_send_queue.pop();//如果队列为空则说明所有数据都发送完,将pending设置为falseif (_send_queue.empty()) {_send_pending false;}//如果队列不是空则继续将队首元素发送if (!_send_queue.empty()) {auto send_data _send_queue.front();this-_socket-async_send(asio::buffer(send_data-_msg send_data-_cur_len, send_data-_total_len - send_data-_cur_len),std::bind(Session::WriteAllCallBack,this, std::placeholders::_1, std::placeholders::_2));}
}asio异步读操作
异步读操作和异步的写操作类似同样有async_read_some和async_receive函数前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度后者触发的回调函数读取的数据长度等于读取的总长度。同样的async_read_some和async_receive不能混用因为async_receive的底层就是循环调用async_read_some。
void Session::ReadFromSocket() {if (_recv_pending) return;_recv_node std::make_sharedMsgNode(RECVSIZE);// 一块一块接收_socket-async_read_some(asio::buffer(_recv_node-_msg, _recv_node-_total_len), std::bind(Session::ReadCallBack, this,std::placeholders::_1, std::placeholders::_2));_recv_pending true;
}
void Session::ReadCallBack(const boost::system::error_code ec, std::size_t bytes_transferred) {_recv_node-_cur_len bytes_transferred;// 如果没有接受完继续接收if (_recv_node-_cur_len _recv_node-_total_len) {_socket-async_read_some(asio::buffer(_recv_node-_msg _recv_node-_cur_len,_recv_node-_total_len - _recv_node-_cur_len), std::bind(Session::ReadCallBack, this,std::placeholders::_1, std::placeholders::_2));return;}_recv_pending false;//指针置空_recv_node nullptr;
}async_receive接收完数据后才会调用一次回调。
void Session::ReadAllFromSocket() {if (_recv_pending) {return;}_recv_node std::make_sharedMsgNode(RECVSIZE);_socket-async_receive(asio::buffer(_recv_node-_msg, _recv_node-_total_len), std::bind(Session::ReadAllCallBack, this,std::placeholders::_1, std::placeholders::_2));_recv_pending true;
}
void Session::ReadAllCallBack(const boost::system::error_code ec, std::size_t bytes_transferred) {if (ec.value() ! 0) {std::cout Error occured! Error code ec.value() . Message: ec.message();return;}_recv_node-_cur_len bytes_transferred;//将数据投递到队列里交给逻辑线程处理此处略去//如果读完了则将标记置为false_recv_pending false;//指针置空_recv_node nullptr;
}asio 异步echo服务端
#pragma once
#includememory
#includeboost/asio.hpp
#includeiostream
#includequeue
using namespace std;
using namespace boost;
using boost::asio::ip::tcp;// Session类主要是处理客户端消息收发的会话类简单起见不考虑粘包问题也不考虑支持手动调用发送的接口只以应答的方式发送和接收固定长度(1024字节长度)的数据。
class Session {
public:// 上下文初始化Sessionsocket绑定上下文Session(boost::asio::io_context ioc) :_socket(ioc) {}tcp::socket Socket() {return _socket;}void Start();// 在start中监听客户端的读写
private:// handle_read和handle_write分别为服务器的读回调函数和写回调函数。// 当服务器读数据时会调用handle_read在handle_read过程中要把数据回传给客户端要写时会调用handle_write。void handle_read(const boost::system::error_code error, size_t bytes_transfered);void handle_write(const boost::system::error_code error);tcp::socket _socket; //_socket为单独处理客户端读写的socket。enum { max_length 1024 };char _data[max_length]; //_data用来接收客户端传递的数据};//最大报文接收大小
const int RECVSIZE 1024;// Server类是为服务器接收连接的管理类。
class Server {
public:Server(boost::asio::io_context ioc, short port);
private:void start_accept();//启动连接描述符初始化一个acceptor,将要接收连接的acceptor绑定到服务上// 内部就是将accpeptor对应的socket描述符绑定到epoll或iocp模型上实现事件驱动。void handle_accept(Session* new_session, const boost::system::error_code error); // 有连接过来的时候触发回调函数回调session的数据。boost::asio::io_context _ioc;// 上下文不允许被复制被构造。tcp::acceptor _acceptor;
};
void Session::Start() {memset(_data, 0, max_length);// async_read_some 在boost asio底层用的是epoll把_socket的读事件添加到epoll表里。// 当_socket有读事件就绪的时候就会触发handle_read读对端发送数据_socket底层的读tcp缓冲区由空变成有数据。_socket.async_read_some(boost::asio::buffer(_data, max_length),std::bind(Session::handle_read, this, placeholders::_1, placeholders::_2));}// 读的回调函数客户端发送数据过来就会调用这个函数
void Session::handle_read(const boost::system::error_code error, size_t bytes_transfered) {if (!error) {cout server receive data is _data endl; // 收到数据// 将收到的数据发送回客户端就行了。boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered),std::bind(Session::handle_write, this, placeholders::_1)); // 当发送完成后触发handle_write回调函数。}else {cout read error endl;delete this;}
}// 写回调函数写回调之后就要开始读了。
// 触发写回调是因为tcp有空闲空间能够把用户态的数据拷贝到tcp缓冲区。
void Session::handle_write(const boost::system::error_code error) {if (!error) {memset(_data, 0, max_length);// 继续读去吧。_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(Session::handle_read,this, placeholders::_1, placeholders::_2));}else {cout write errorerror.value() endl;delete this;}
}// 构造函数:_ioc是引用变量要用初始化列表的方式进行赋值,_acceptor专门捕获连接
Server::Server(boost::asio::io_context ioc, short port):_ioc(ioc),_acceptor(ioc,tcp::endpoint(tcp::v4(),port)) {cout Server start success, on port: port endl;start_accept();
}void Server::start_accept() {Session* new_session new Session(_ioc); // 定义新的session// 新的连接到来以后调用handle_accept// placeholders::_1 占位符是asio api要求的错误码。// asio就会通过placeholders::_1 这个占位符把错误码发送给handle_accept函数。_acceptor.async_accept(new_session-Socket(),std::bind(Server::handle_accept, this, new_session, placeholders::_1));
}void Server::handle_accept(Session* new_session, const boost::system::error_code error) {if (!error) {new_session-Start();}else {delete new_session;}start_accept();//再次调用start_accept(),继续接收不能接收完新的连接之后就什么都不干了。
}客户端不用写成异步的因为客户端并不是以并发为主。
asio异步服务器中存在的隐患
当服务器即将发送数据前(调用async_write前)此刻客户端中断服务器此时调用async_write会触发发送回调函数判断ec为非0进而执行delete this逻辑回收session。但要注意的是客户端关闭后在tcp层面会触发读就绪事件服务器会触发读事件回调函数。在读事件回调函数中判断错误码ec为非0进而再次执行delete操作从而造成二次析构这是极度危险的。
参考列表 https://www.bilibili.com/video/BV15P411S7fp/?p7spm_id_frompageDriver