当前位置: 首页 > news >正文

高质量的南京网站建设广东省水利工程建设信息网站

高质量的南京网站建设,广东省水利工程建设信息网站,网站建设资料收集,wordpress 邮件通知 密码本文实现Redis的协议层#xff0c;协议层负责解析指令#xff0c;然后将指令交给核心database执行echo database用来测试协议层的代码https://github.com/csgopher/go-redis RESP协议 RESP是客户端与服务端通信的协议#xff0c;格式有五种#xff1a;正常回复#xff1… 本文实现Redis的协议层协议层负责解析指令然后将指令交给核心database执行echo database用来测试协议层的代码https://github.com/csgopher/go-redis RESP协议 RESP是客户端与服务端通信的协议格式有五种正常回复以“”开头以“\r\n”结尾的字符串形式 错误回复以“-”开头以“\r\n”结尾的字符串形式整数以“:”开头以“\r\n”结尾的字符串形式多行字符串以“$”开头后跟实际发送字节数再以“\r\n”开头和结尾 $3\r\nabc\r\n 数组以“*”开头后跟成员个数 SET key value *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n 客户端和服务器发送的命令或数据一律以 \r\n CRLF作为换行符。 当我们输入*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n这样一串命令服务端接收到的是如下的命令 *3\r\n $3\r\n SET\r\n $3\r\n key\r\n $5\r\n value\r\n interface/resp/conn.go type Connection interface {Write([]byte) errorGetDBIndex() intSelectDB(int) }interface/resp/reply.go type Reply interface {ToBytes() []byte }Connection接口Redis客户端的一个连接Write给客户端回复消息GetDBIndexRedis有16个DBReply接口响应接口 resp/reply/consts.go type PongReply struct{}var pongBytes []byte(PONG\r\n)func (r *PongReply) ToBytes() []byte {return pongBytes }var thePongReply new(PongReply)func MakePongReply() *PongReply {return thePongReply }type OkReply struct{}var okBytes []byte(OK\r\n)func (r *OkReply) ToBytes() []byte {return okBytes }var theOkReply new(OkReply)func MakeOkReply() *OkReply {return theOkReply }var nullBulkBytes []byte($-1\r\n)type NullBulkReply struct{}func (r *NullBulkReply) ToBytes() []byte {return nullBulkBytes }func MakeNullBulkReply() *NullBulkReply {return NullBulkReply{} }var emptyMultiBulkBytes []byte(*0\r\n)type EmptyMultiBulkReply struct{}func (r *EmptyMultiBulkReply) ToBytes() []byte {return emptyMultiBulkBytes }type NoReply struct{}var noBytes []byte()func (r *NoReply) ToBytes() []byte {return noBytes }定义五种回复回复pongoknull空数组空 resp/reply/reply.go type ErrorReply interface {Error() stringToBytes() []byte }ErrorReply定义错误接口 resp/reply/errors.go type UnknownErrReply struct{}var unknownErrBytes []byte(-Err unknown\r\n)func (r *UnknownErrReply) ToBytes() []byte {return unknownErrBytes }func (r *UnknownErrReply) Error() string {return Err unknown }type ArgNumErrReply struct {Cmd string }func (r *ArgNumErrReply) ToBytes() []byte {return []byte(-ERR wrong number of arguments for r.Cmd command\r\n) }func (r *ArgNumErrReply) Error() string {return ERR wrong number of arguments for r.Cmd command }func MakeArgNumErrReply(cmd string) *ArgNumErrReply {return ArgNumErrReply{Cmd: cmd,} }type SyntaxErrReply struct{}var syntaxErrBytes []byte(-Err syntax error\r\n) var theSyntaxErrReply SyntaxErrReply{}func MakeSyntaxErrReply() *SyntaxErrReply {return theSyntaxErrReply }func (r *SyntaxErrReply) ToBytes() []byte {return syntaxErrBytes }func (r *SyntaxErrReply) Error() string {return Err syntax error }type WrongTypeErrReply struct{}var wrongTypeErrBytes []byte(-WRONGTYPE Operation against a key holding the wrong kind of value\r\n)func (r *WrongTypeErrReply) ToBytes() []byte {return wrongTypeErrBytes }func (r *WrongTypeErrReply) Error() string {return WRONGTYPE Operation against a key holding the wrong kind of value }type ProtocolErrReply struct {Msg string }func (r *ProtocolErrReply) ToBytes() []byte {return []byte(-ERR Protocol error: r.Msg \r\n) }func (r *ProtocolErrReply) Error() string {return ERR Protocol error: r.Msg }errors定义5种错误UnknownErrReply 未知错误ArgNumErrReply 参数个数错误SyntaxErrReply 语法错误WrongTypeErrReply 数据类型错误ProtocolErrReply 协议错误 resp/reply/reply.go var (nullBulkReplyBytes []byte($-1)// 协议的结尾CRLF \r\n )type BulkReply struct {Arg []byte }func MakeBulkReply(arg []byte) *BulkReply {return BulkReply{Arg: arg,} }func (r *BulkReply) ToBytes() []byte {if len(r.Arg) 0 {return nullBulkReplyBytes}return []byte($ strconv.Itoa(len(r.Arg)) CRLF string(r.Arg) CRLF) }type MultiBulkReply struct {Args [][]byte }func (r *MultiBulkReply) ToBytes() []byte {argLen : len(r.Args)var buf bytes.Bufferbuf.WriteString(* strconv.Itoa(argLen) CRLF)for _, arg : range r.Args {if arg nil {buf.WriteString($-1 CRLF)} else {buf.WriteString($ strconv.Itoa(len(arg)) CRLF string(arg) CRLF)}}return buf.Bytes() }func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {return MultiBulkReply{Args: args,} }type StatusReply struct {Status string }func MakeStatusReply(status string) *StatusReply {return StatusReply{Status: status,} }func (r *StatusReply) ToBytes() []byte {return []byte( r.Status CRLF) }type IntReply struct {Code int64 }func MakeIntReply(code int64) *IntReply {return IntReply{Code: code,} }func (r *IntReply) ToBytes() []byte {return []byte(: strconv.FormatInt(r.Code, 10) CRLF) }type StandardErrReply struct {Status string }func (r *StandardErrReply) ToBytes() []byte {return []byte(- r.Status CRLF) }func (r *StandardErrReply) Error() string {return r.Status }func MakeErrReply(status string) *StandardErrReply {return StandardErrReply{Status: status,} }func IsErrorReply(reply resp.Reply) bool {return reply.ToBytes()[0] - }BulkReply回复一个字符串MultiBulkReply回复字符串数组StatusReply状态回复IntReply数字回复StandardErrReply标准错误回复IsErrorReply判断是否为错误回复ToBytes将字符串转成RESP协议规定的格式 resp/parser/parser.go type Payload struct {Data resp.ReplyErr error }type readState struct {readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 }func (s *readState) finished() bool {return s.expectedArgsCount 0 len(s.args) s.expectedArgsCount }func ParseStream(reader io.Reader) -chan *Payload {ch : make(chan *Payload)go parse0(reader, ch)return ch }func parse0(reader io.Reader, ch chan- *Payload) {...... }Payload结构体客服端给我们发的数据 Reply客户端与服务端互相发的数据都称为Reply readState结构体 readingMultiLine解析单行还是多行数据expectedArgsCount应该读取的参数个数msgType消息类型args消息内容bulkLen数据长度 finished方法判断解析是否完成ParseStream方法异步解析数据后放入管道返回管道数据 func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {var msg []bytevar err errorif state.bulkLen 0 {msg, err bufReader.ReadBytes(\n)if err ! nil {return nil, true, err}if len(msg) 0 || msg[len(msg)-2] ! \r {return nil, false, errors.New(protocol error: string(msg))}} else {msg make([]byte, state.bulkLen2)_, err io.ReadFull(bufReader, msg)if err ! nil {return nil, true, err}if len(msg) 0 || msg[len(msg)-2] ! \r || msg[len(msg)-1] ! \n {return nil, false, errors.New(protocol error: string(msg))}state.bulkLen 0}return msg, false, nil }readLine一行一行的读取。读正常的行以\n分隔。读正文中包含\r\n字符的行时state.bulkLen加上换行符\r\nstate.bulkLen2 func parseMultiBulkHeader(msg []byte, state *readState) error {var err errorvar expectedLine uint64expectedLine, err strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)if err ! nil {return errors.New(protocol error: string(msg))}if expectedLine 0 {state.expectedArgsCount 0return nil} else if expectedLine 0 {state.msgType msg[0]state.readingMultiLine truestate.expectedArgsCount int(expectedLine)state.args make([][]byte, 0, expectedLine)return nil} else {return errors.New(protocol error: string(msg))} }func parseBulkHeader(msg []byte, state *readState) error {var err errorstate.bulkLen, err strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)if err ! nil {return errors.New(protocol error: string(msg))}if state.bulkLen -1 { // null bulkreturn nil} else if state.bulkLen 0 {state.msgType msg[0]state.readingMultiLine truestate.expectedArgsCount 1state.args make([][]byte, 0, 1)return nil} else {return errors.New(protocol error: string(msg))} }parseMultiBulkHeader解析数组的头部设置期望的行数和相关参数。parseBulkHeader解析多行字符串的头部。 func parseSingleLineReply(msg []byte) (resp.Reply, error) {str : strings.TrimSuffix(string(msg), \r\n)var result resp.Replyswitch msg[0] {case : // status replyresult reply.MakeStatusReply(str[1:])case -: // err replyresult reply.MakeErrReply(str[1:])case :: // int replyval, err : strconv.ParseInt(str[1:], 10, 64)if err ! nil {return nil, errors.New(protocol error: string(msg))}result reply.MakeIntReply(val)}return result, nil }func readBody(msg []byte, state *readState) error {line : msg[0 : len(msg)-2]var err errorif line[0] $ {// bulk replystate.bulkLen, err strconv.ParseInt(string(line[1:]), 10, 64)if err ! nil {return errors.New(protocol error: string(msg))}if state.bulkLen 0 { // null bulk in multi bulksstate.args append(state.args, []byte{})state.bulkLen 0}} else {state.args append(state.args, line)}return nil }parseSingleLineReply解析单行命令readBody读取多行的命令如果是开头设置bulkLen读取下一行时根据这个2不是开头设置bulkLen读取下一行时根据这个2不是开头设置bulkLen读取下一行时根据这个2不是开头则直接添加到args func parse0(reader io.Reader, ch chan- *Payload) {defer func() {if err : recover(); err ! nil {logger.Error(string(debug.Stack()))}}()bufReader : bufio.NewReader(reader)var state readStatevar err errorvar msg []bytefor {var ioErr boolmsg, ioErr, err readLine(bufReader, state)if err ! nil {if ioErr {ch - Payload{Err: err,}close(ch)return}ch - Payload{Err: err,}state readState{}continue}if !state.readingMultiLine {if msg[0] * {// multi bulk replyerr parseMultiBulkHeader(msg, state)if err ! nil {ch - Payload{Err: errors.New(protocol error: string(msg)),}state readState{}continue}if state.expectedArgsCount 0 {ch - Payload{Data: reply.EmptyMultiBulkReply{},}state readState{}continue}} else if msg[0] $ { // bulk replyerr parseBulkHeader(msg, state)if err ! nil {ch - Payload{Err: errors.New(protocol error: string(msg)),}state readState{} // reset statecontinue}if state.bulkLen -1 { // null bulk replych - Payload{Data: reply.NullBulkReply{},}state readState{} // reset statecontinue}} else {// single line replyresult, err : parseSingleLineReply(msg)ch - Payload{Data: result,Err: err,}state readState{} // reset statecontinue}} else {// read bulk replyerr readBody(msg, state)if err ! nil {ch - Payload{Err: errors.New(protocol error: string(msg)),}state readState{} // reset statecontinue}// if sending finishedif state.finished() {var result resp.Replyif state.msgType * {result reply.MakeMultiBulkReply(state.args)} else if state.msgType $ {result reply.MakeBulkReply(state.args[0])}ch - Payload{Data: result,Err: err,}state readState{}}}} }parse0解析指令解析完成后通过channel发出去 resp/connection/conn.go type Connection struct {conn net.ConnwaitingReply wait.Waitmu sync.Mutex // 避免多个协程往客户端中写selectedDB int }func NewConn(conn net.Conn) *Connection {return Connection{conn: conn,} }func (c *Connection) RemoteAddr() net.Addr {return c.conn.RemoteAddr() }func (c *Connection) Close() error {c.waitingReply.WaitWithTimeout(10 * time.Second)_ c.conn.Close()return nil }func (c *Connection) Write(b []byte) error {if len(b) 0 {return nil}c.mu.Lock()c.waitingReply.Add(1)defer func() {c.waitingReply.Done()c.mu.Unlock()}()_, err : c.conn.Write(b)return err }func (c *Connection) GetDBIndex() int {return c.selectedDB }func (c *Connection) SelectDB(dbNum int) {c.selectedDB dbNum }之前写的EchoHandler是用户传过来什么我们传回去什么。现在要写一个RespHandler来代替EchoHandler让解析器来解析。RespHandler中要有一个管理客户端连接的结构体Connection。Connection客户端连接在协议层的handler中会用到 resp/handler/handler.go var (unknownErrReplyBytes []byte(-ERR unknown\r\n) )type RespHandler struct {activeConn sync.Mapdb databaseface.Databaseclosing atomic.Boolean }func MakeHandler() *RespHandler {var db databaseface.Databasedb database.NewEchoDatabase()return RespHandler{db: db,} }func (h *RespHandler) closeClient(client *connection.Connection) {_ client.Close()h.db.AfterClientClose(client)h.activeConn.Delete(client) }func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {if h.closing.Get() {// closing handler refuse new connection_ conn.Close()}client : connection.NewConn(conn)h.activeConn.Store(client, 1)ch : parser.ParseStream(conn)for payload : range ch {if payload.Err ! nil {if payload.Err io.EOF ||payload.Err io.ErrUnexpectedEOF ||strings.Contains(payload.Err.Error(), use of closed network connection) {// connection closedh.closeClient(client)logger.Info(connection closed: client.RemoteAddr().String())return}// protocol errerrReply : reply.MakeErrReply(payload.Err.Error())err : client.Write(errReply.ToBytes())if err ! nil {h.closeClient(client)logger.Info(connection closed: client.RemoteAddr().String())return}continue}if payload.Data nil {logger.Error(empty payload)continue}r, ok : payload.Data.(*reply.MultiBulkReply)if !ok {logger.Error(require multi bulk reply)continue}result : h.db.Exec(client, r.Args)if result ! nil {_ client.Write(result.ToBytes())} else {_ client.Write(unknownErrReplyBytes)}} }func (h *RespHandler) Close() error {logger.Info(handler shutting down...)h.closing.Set(true)// TODO: concurrent waith.activeConn.Range(func(key interface{}, val interface{}) bool {client : key.(*connection.Connection)_ client.Close()return true})h.db.Close()return nil }RespHandler和之前的echo类似加了核心层的db.exec执行解析的指令 interface/database/database.go type CmdLine [][]bytetype Database interface {Exec(client resp.Connection, args [][]byte) resp.ReplyAfterClientClose(c resp.Connection)Close() }type DataEntity struct {Data interface{} }Exec核心层的执行AfterClientClose关闭之后的善后方法CmdLine二维字节数组的指令别名DataEntity表示Redis的数据包括string, list, set等等 database/echo_database.go type EchoDatabase struct { }func NewEchoDatabase() *EchoDatabase {return EchoDatabase{} }func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {return reply.MakeMultiBulkReply(args) }func (e EchoDatabase) AfterClientClose(c resp.Connection) {logger.Info(EchoDatabase AfterClientClose) }func (e EchoDatabase) Close() {logger.Info(EchoDatabase Close) }echo_database测试协议层Exec指令解析后再使用MakeMultiBulkReply包装一下返回去 main.go err : tcp.ListenAndServeWithSignal(tcp.Config{Address: fmt.Sprintf(%s:%d,config.Properties.Bind,config.Properties.Port),},handler.MakeHandler()) if err ! nil {logger.Error(err) }main改成刚才写的handler.MakeHandler()
http://www.hkea.cn/news/14296234/

相关文章:

  • 前端工程师是做网站网站建设玖金手指谷哥二八
  • 专业的画册设计网站wordpress 本地安装插件
  • 南昌企业建站手机做网站用什么软件
  • 微商手机网站模板做个企业网网站怎么做
  • 企业网站建设空间wordpress选项卡插件
  • 做网站是什么样的工作中国建筑官网测评
  • 如何在文本上做网站链接符号使用的电脑做网站的服务器
  • 网站开发与网页设计漯河网站建设服务公司
  • 国外设计素材网站设计页面导航
  • 织梦网站联系我们的地图怎么做医院网站建设报价
  • 做防水怎么注册网站上海解封最新消息
  • 网站首页建设在线制图免费版
  • 评网网站建设wordpress多語言插件
  • 容桂免费网站建设公司晋江 网站建设 推广
  • 如何做漫画赚钱的网站响应式网页设计用什么软件
  • php网站建设入门教程建设一个公司网站要具备什么
  • 做网站首页文字排版技巧wordpress表单编辑插件下载
  • 做个什么类型网站贵州住房和城乡建设厅官网
  • 承德市外贸网站建设网站让女友做网站模特
  • 个人博客网站哈尔滨seo推广
  • 做推送用什么网站网站建设岗位内容
  • 高端品牌网站建设图片一个主机域名可以做多少个网站
  • 建设搜索引擎网站长沙黄页
  • 和田地网站seohtml网页制作大作业范例
  • 网站建设尾款放在什么科目里国内做视频课程的网站有哪些
  • 品牌包装设计公司宁波seo推广开发
  • 四川城乡住房城乡建设厅网站首页一键优化图片
  • 刷赞业务推广网站多用户商城 源码
  • 做网站的高手5 网站建设进度表
  • 网站注册怎么注销网站开发会遇到的问题