零售网站建设方案,教人做家务的网站,网站开发本科论文,wordpress 手机 登陆不了目录 1.完善rpcprovider.cc的OnConnection
2.完善rpcprovider.cc的OnMessage
3.完整rpcprovider.h
4.完整rpcprovider.cc 这篇文章主要完成#xff0c;protobuf实现的数据序列化和反序列化。 1.完善rpcprovider.cc的OnConnection
rpc的请求是短连接的#xff0c;请求一次…目录 1.完善rpcprovider.cc的OnConnection
2.完善rpcprovider.cc的OnMessage
3.完整rpcprovider.h
4.完整rpcprovider.cc 这篇文章主要完成protobuf实现的数据序列化和反序列化。 1.完善rpcprovider.cc的OnConnection
rpc的请求是短连接的请求一次完了服务端返回rpc的方法的响应就主动关闭连接了。
//新的socket连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr conn)
{if(!conn-connected()){//和rpc client的连接断开了conn-shutdown();}
}
2.完善rpcprovider.cc的OnMessage
在框架内部RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型怎么商量呢包含service_name method_name args
对应16UserService Login zhang san123456 我们在框架中定义proto的message类型进行数据头的序列化和反序列化 service_name method_name args_size(防止粘包的问题)
怎么去区分哪个是service_name, method_name, args 我们把消息头表示出来 header_size(4个字节) header_str args_str 前面几个字节是服务名和方法名。 为了防止粘包我们还要记录参数的字符串的长度 我们统一一开始读4个字节数据头的长度也就是除了方法参数之外的所有数据服务名字和方法名字
10 “10” 10000 “1000000” 我们要用到std::string insert和copy方法 把header_size按照内存的方式二进制的形式直接存4个字节。 所以我们从字符流解析是按数据头(4字节大小表示service_name method_name args_size的长度service_name method_name args_size(防止粘包的问题)args(参数 我们在src里面创建rpcheader.proto文件
syntax proto3;package mprpc;message RpcHeader
{bytes service_name1;bytes method_name2;uint32 args_size3;
}
我们打开终端进入到src下执行命令。 新增mprpcprovider.cc的OnMessage内容
/*
在框架内部RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型
service_name method_name args 定义proto的message类型进行数据头的序列化和反序列化service_name method_name args_size
16UserServiceLoginzhang san123456header_size(4个字节)header_strargs_str
10 10
10000 10000
std::string insert和copy方法
*/
// 已建立连接用户的读写事件回调 如果远程有一个rpc服务的调用请求那么OnMessage方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr conn,muduo::net::Buffer* buffer,muduo::Timestamp)
{//网络上接收的远程rpc调用请求的字符流 Login argsstd::string recv_bufbuffer-retrieveAllAsString();//从字符流中读取前4个字节的内容uint32_t header_size 0;recv_buf.copy((char*)header_size,4,0);//根据header_size读取数据头的原始字符流反序列化数据得到rpc请求的详细消息std::string rpc_header_strrecv_buf.substr(4,header_size);mprpc::RpcHeader rpcHeader;std::string service_name;std::string method_name;uint32_t args_size;if(rpcHeader.ParseFromString(rpc_header_str)){//数据头反序列化成功service_namerpcHeader.service_name();method_namerpcHeader.method_name();args_sizerpcHeader.args_size();}else{//数据头反序列化失败std::coutrpc_header_str:rpc_header_str parse error!std::endl;return;}//获取rpc方法参数的字符流数据std::string args_strrecv_buf.substr(4header_size,args_size);//打印调试信息std::coutstd::endl;std::coutheader_size:header_sizestd::endl;std::coutrpc_header_str:rpc_header_strstd::endl;std::coutservice_name:service_namestd::endl;std::coutmethod_name:method_namestd::endl;std::coutargs_str:args_strstd::endl;std::coutstd::endl;
}
目前mprpcprovider.cc的完整代码如下
#include rpcprovider.h
#include mprpcapplication.h
#include rpcheader.pb.h/*
service_name service描述一个服务由一个服务名字对应》 service* 记录服务对象method_name method方法对象
json:存储键值对基于文本存储数据有对应的键值
protobuf基于二进制存储存储效率更高紧密存储不携带除数据外的任何信息整体来说protobuf存储效率更高占用的带宽更少同样带宽传输的数据量更大不仅可以提供类型的序列化和反序列化还提供了service rpc方法的描述
*/
//这里是框架提供给外部使用的可以发布rpc方法的函数接口
void RpcProvider::NotifyService(google::protobuf::Service *service)
{ServiceInfo service_info;//获取了服务对象的描述信息const google::protobuf::ServiceDescriptor* pserviceDescservice-GetDescriptor();//获取服务的名字std::string service_namepserviceDesc-name();//获取服务对象service的方法的数量int methodCntpserviceDesc-method_count();std::coutservice_name:service_namestd::endl;for(int i0;imethodCnt;i){//获取了服务对象指定下标的服务方法的描述抽象描述 UserService Loginconst google::protobuf::MethodDescriptor* pmethodDescpserviceDesc-method(i);std::string method_namepmethodDesc-name();service_info.m_methodMap.insert({method_name,pmethodDesc});std::coutmethod_name:method_namestd::endl;}service_info.m_serviceservice;m_serviceMap.insert({service_name,service_info});}// 启动rpc服务节点开始提供rpc远程网络调用服务
void RpcProvider::Run()
{std::string ipMprpcApplication::GetInstance().GetConfig().Load(rpcserverip);uint16_t portatoi(MprpcApplication::GetInstance().GetConfig().Load(rpcserverport).c_str());muduo::net::InetAddress address(ip,port);//创建TcpServer对象muduo::net::TcpServer server(m_eventLoop,address,RpcProvider);//绑定连接回调和消息读写回调方法 muduo库的好处是分离了网络代码和业务代码server.setConnectionCallback(std::bind(RpcProvider::OnConnection, this, std::placeholders::_1));//预留1个参数std::placeholders::_1server.setMessageCallback(std::bind(RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));//预留3个参数std::placeholders::_1,2,3//设置muduo库的线程数量server.setThreadNum(4);std::coutRpcProvider start service at ip:ipport:portstd::endl;//启动网络服务server.start();m_eventLoop.loop();
}//新的socket连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr conn)
{if(!conn-connected()){//和rpc client的连接断开了conn-shutdown();}
}
/*
在框架内部RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型
service_name method_name args 定义proto的message类型进行数据头的序列化和反序列化service_name method_name args_size
16UserServiceLoginzhang san123456header_size(4个字节)header_strargs_str
10 10
10000 10000
std::string insert和copy方法
*/
// 已建立连接用户的读写事件回调 如果远程有一个rpc服务的调用请求那么OnMessage方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr conn,muduo::net::Buffer* buffer,muduo::Timestamp)
{//网络上接收的远程rpc调用请求的字符流 Login argsstd::string recv_bufbuffer-retrieveAllAsString();//从字符流中读取前4个字节的内容uint32_t header_size 0;recv_buf.copy((char*)header_size,4,0);//根据header_size读取数据头的原始字符流反序列化数据得到rpc请求的详细消息std::string rpc_header_strrecv_buf.substr(4,header_size);mprpc::RpcHeader rpcHeader;std::string service_name;std::string method_name;uint32_t args_size;if(rpcHeader.ParseFromString(rpc_header_str)){//数据头反序列化成功service_namerpcHeader.service_name();method_namerpcHeader.method_name();args_sizerpcHeader.args_size();}else{//数据头反序列化失败std::coutrpc_header_str:rpc_header_str parse error!std::endl;return;}//获取rpc方法参数的字符流数据std::string args_strrecv_buf.substr(4header_size,args_size);//打印调试信息std::coutstd::endl;std::coutheader_size:header_sizestd::endl;std::coutrpc_header_str:rpc_header_strstd::endl;std::coutservice_name:service_namestd::endl;std::coutmethod_name:method_namestd::endl;std::coutargs_str:args_strstd::endl;std::coutstd::endl;
} 编译
3.完整rpcprovider.h
#pragma once
#include google/protobuf/service.h
#include muduo/net/TcpServer.h
#include muduo/net/EventLoop.h
#include muduo/net/InetAddress.h
#include muduo/net/TcpConnection.h
#include string
#include functional
#include google/protobuf/descriptor.h
#include unordered_map//框架提供的专门发布rpc服务的网络对象类
class RpcProvider
{
public://这里是框架提供给外部使用的可以发布rpc方法的函数接口void NotifyService(google::protobuf::Service* service);//具体的服务对象类是从Service类继承而来//框架是可以接收各种RPC服务的不能依赖具体的某一个业务。 //基类指针指向子对象 //启动rpc服务节点开始提供rpc远程网络调用服务void Run();private://组合EventLoopmuduo::net::EventLoop m_eventLoop;//service服务类型信息struct ServiceInfo{google::protobuf::Service* m_service;//保存服务对象std::unordered_mapstd::string,const google::protobuf::MethodDescriptor* m_methodMap;//保存服务方法};//存储注册成功的服务对象和其服务方法的所有信息std::unordered_mapstd::string,ServiceInfo m_serviceMap;//新的socket连接回调void OnConnection(const muduo::net::TcpConnectionPtr);//已建立连接用户的读写事件回调void OnMessage(const muduo::net::TcpConnectionPtr,muduo::net::Buffer*,muduo::Timestamp);//Closure的回调操作用于序列化rpc的响应和网络发送void SendRpcResponse(const muduo::net::TcpConnectionPtr,google::protobuf::Message*);
};
4.完整rpcprovider.cc
#include rpcprovider.h
#include mprpcapplication.h
#include rpcheader.pb.h/*
service_name service描述一个服务由一个服务名字对应》 service* 记录服务对象method_name method方法对象
json:存储键值对基于文本存储数据有对应的键值
protobuf基于二进制存储存储效率更高紧密存储不携带除数据外的任何信息整体来说protobuf存储效率更高占用的带宽更少同样带宽传输的数据量更大不仅可以提供类型的序列化和反序列化还提供了service rpc方法的描述
*/
//这里是框架提供给外部使用的可以发布rpc方法的函数接口
void RpcProvider::NotifyService(google::protobuf::Service *service)
{ServiceInfo service_info;//获取了服务对象的描述信息const google::protobuf::ServiceDescriptor* pserviceDescservice-GetDescriptor();//获取服务的名字std::string service_namepserviceDesc-name();//获取服务对象service的方法的数量int methodCntpserviceDesc-method_count();std::coutservice_name:service_namestd::endl;for(int i0;imethodCnt;i){//获取了服务对象指定下标的服务方法的描述抽象描述 UserService Loginconst google::protobuf::MethodDescriptor* pmethodDescpserviceDesc-method(i);std::string method_namepmethodDesc-name();service_info.m_methodMap.insert({method_name,pmethodDesc});std::coutmethod_name:method_namestd::endl;}service_info.m_serviceservice;m_serviceMap.insert({service_name,service_info});}// 启动rpc服务节点开始提供rpc远程网络调用服务
void RpcProvider::Run()
{std::string ipMprpcApplication::GetInstance().GetConfig().Load(rpcserverip);uint16_t portatoi(MprpcApplication::GetInstance().GetConfig().Load(rpcserverport).c_str());muduo::net::InetAddress address(ip,port);//创建TcpServer对象muduo::net::TcpServer server(m_eventLoop,address,RpcProvider);//绑定连接回调和消息读写回调方法 muduo库的好处是分离了网络代码和业务代码server.setConnectionCallback(std::bind(RpcProvider::OnConnection, this, std::placeholders::_1));//预留1个参数std::placeholders::_1server.setMessageCallback(std::bind(RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));//预留3个参数std::placeholders::_1,2,3//设置muduo库的线程数量server.setThreadNum(4);std::coutRpcProvider start service at ip:ipport:portstd::endl;//启动网络服务server.start();m_eventLoop.loop();
}//新的socket连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr conn)
{if(!conn-connected()){//和rpc client的连接断开了conn-shutdown();}
}
/*
在框架内部RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型
service_name method_name args 定义proto的message类型进行数据头的序列化和反序列化service_name method_name args_size
16UserServiceLoginzhang san123456header_size(4个字节)header_strargs_str
10 10
10000 10000
std::string insert和copy方法
*/
// 已建立连接用户的读写事件回调 如果远程有一个rpc服务的调用请求那么OnMessage方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr conn,muduo::net::Buffer* buffer,muduo::Timestamp)
{//网络上接收的远程rpc调用请求的字符流 Login argsstd::string recv_bufbuffer-retrieveAllAsString();//从字符流中读取前4个字节的内容uint32_t header_size 0;recv_buf.copy((char*)header_size,4,0);//根据header_size读取数据头的原始字符流反序列化数据得到rpc请求的详细消息std::string rpc_header_strrecv_buf.substr(4,header_size);mprpc::RpcHeader rpcHeader;std::string service_name;std::string method_name;uint32_t args_size;if(rpcHeader.ParseFromString(rpc_header_str)){//数据头反序列化成功service_namerpcHeader.service_name();method_namerpcHeader.method_name();args_sizerpcHeader.args_size();}else{//数据头反序列化失败std::coutrpc_header_str:rpc_header_str parse error!std::endl;return;}//获取rpc方法参数的字符流数据std::string args_strrecv_buf.substr(4header_size,args_size);//打印调试信息std::coutstd::endl;std::coutheader_size:header_sizestd::endl;std::coutrpc_header_str:rpc_header_strstd::endl;std::coutservice_name:service_namestd::endl;std::coutmethod_name:method_namestd::endl;std::coutargs_str:args_strstd::endl;std::coutstd::endl;//获取service对象和method对象auto itm_serviceMap.find(service_name);if(itm_serviceMap.end()){std::coutservice_name is not exist!std::endl;return;}auto mitit-second.m_methodMap.find(method_name);if(mitit-second.m_methodMap.end()){std::coutservice_name:method_nameis not exist!std::endl;return;}google::protobuf::Service* serviceit-second.m_service;//获取service对象 new UserServiceconst google::protobuf::MethodDescriptor* methodmit-second;//获取method对象 Login//生成rpc方法调用的请求request和响应response参数google::protobuf::Message* requestservice-GetRequestPrototype(method).New();if(!request-ParseFromString(args_str)){std::coutrequest parse error,content:args_strstd::endl;return;}google::protobuf::Message* responseservice-GetResponsePrototype(method).New();//给下面的method方法的调用绑定一个Closure的回调函数google::protobuf::Closure* donegoogle::protobuf::NewCallbackRpcProvider,const muduo::net::TcpConnectionPtr,google::protobuf::Message*(this,RpcProvider::SendRpcResponse,conn,response);//在框架上根据远端rpc请求调用当前rpc节点上发布的方法//new UserService().Login(controller,request,response,done)service-CallMethod(method,nullptr,request,response,done);
}// Closure的回调操作用于序列化rpc的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr conn, google::protobuf::Message *response)
{std::string response_str;if(response-SerializeToString(response_str))//response进行序列化{//序列化成功后通过网络把rpc方法执行的结果发送给rpc的调用方conn-send(response_str);}else{std::coutserialize response_str error!std::endl;}conn-shutdown();//模拟http的短链接服务由rpcprovider主动断开连接
} 通过onmessage,muduo库接受过来远程的字符流以后通过参数的解析拿到响应的service和method然后再绑定一个回调动态的创建这个方法对应的request和response然后由这个框架调用这个方法把响应的参数传到业务层去。
业务层做的事情就是从由框架进行反序列化好的请求中request拿数据做本地业务填响应值再调用回调最后done执行run调用的是绑定的回调SendRpcResponse 响应对象的序列化序列化为字符流后再由网络发送到rpc的调用由rpcprovider主动断开连接。