当前位置: 首页 > news >正文

分站城市网站如何做seo环保网站主题

分站城市网站如何做seo,环保网站主题,如皋官方网站建设什么地铁,网站建设在哪些方面背景描述 有一个需求#xff0c;大概可以描述为#xff1a;有多个websocket连接#xff0c;因此消息会并发地发送过来#xff0c;这些消息中有一个标志可以表明是哪个连接发来的消息#xff0c;但只有收到消息后才能建立channel或写入已有channel#xff0c;在收消息前无…背景描述 有一个需求大概可以描述为有多个websocket连接因此消息会并发地发送过来这些消息中有一个标志可以表明是哪个连接发来的消息但只有收到消息后才能建立channel或写入已有channel在收消息前无法预先创建channel 解决过程可直接阅读最终版 初版直接写入 因为对数据量错误预估(以为数据量不大)一开始我是用的mysql直接写入每次收到ws消息立即处理可测试中发现因数据量过多且都会操作同一行数据出现了资源竞争导致死锁。 第二版增加锁 在发现出现数据竞争后我第一反应是增加读写锁。读写锁的代码类似以下示例 package mainimport (database/sqlfmtsync_ github.com/go-sql-driver/mysql )var (db *sql.DBmu sync.RWMutex )func init() {var err errordb, err sql.Open(mysql, username:passwordtcp(localhost:3306)/dbname)if err ! nil {panic(err)} }func main() {defer db.Close()// 读取数据go readData()// 写入数据go writeData()// 保持主线程运行select {} }func readData() {for {mu.RLock()rows, err : db.Query(SELECT * FROM table_name)mu.RUnlock()if err ! nil {fmt.Println(Error reading data:, err)continue}defer rows.Close()// 处理查询结果// ...// 睡眠一段时间模拟读操作的持续性// 请注意这是一个简单示例实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制} }func writeData() {for {mu.Lock()_, err : db.Exec(INSERT INTO table_name (column1, column2) VALUES (?, ?), value1, value2)mu.Unlock()if err ! nil {fmt.Println(Error writing data:, err)continue}// 睡眠一段时间模拟写操作的持续性// 请注意这是一个简单示例实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制} }但是代码里对数据库的操作非常频繁且混乱加了读写锁后经常出现请求很慢的情况考虑其他方案 第三版 使用事务 使用事务代码忽略最终发现因为事务过长导致出现了重复写的问题考虑其他方案 第四版 map 通过一个二维的map来存储数据每当数据存满10条就处理当然毫不意外的出现了map的竞争。map也是可以用锁的但是这里是二维的map加上两层锁之后使得效率极低而且依旧有概率出现map竞争导致报错 此外还可以考虑使用redis设置锁直接set就行了但是因为环境不支持redis此方案弃用 最终版 动态channel 出现以上问题的根本原因是消费太快其实完全可以把每个ws连接的数据都写到各自的channel里同时设置每个channel都累积10条再消费当然还需要一个处理机制如果超过10s也消费一次。 启动生产者、“消费者” 在当前环境中生产者就是每次从ws中读到数据往动态channel中写入消费者就是不断获取有哪些channel以及从channel中读数据在ws写入时的处理逻辑大概可以简化为如下demo: package testimport (contextencoding/jsongithub.com/gin-gonic/gingithub.com/gorilla/websocketlog github.com/sirupsen/logrusnet/httpsync )// RequestTemplate 请求模板 type RequestTemplate struct {Op string json:op // 操作Id int json:id // 唯一id标识Time string json:time // 时间用秒级时间戳字符串包裹Data *RequestTemplateData json:data // 请求数据Code int json:code // 状态码 }// RequestTemplateData 请求中data包含的部分实际这里是很复杂的结构之前超时/死锁也是因为这里处理逻辑比较复杂但是这篇博客的演示重点不是这个因此简略为id和请求ip type RequestTemplateData struct {ConnIp string json:conn_ip // 请求ipId int json:id // 唯一id标识 }// ConnInfo 具体的连接信息 type ConnInfo struct {Conn *websocket.Conn json:conn // websocket连接Ctx context.Context json:ctx // 连接上下文CtxCancel context.CancelFunc json:cancel // 连接上下文cancel functionIp string json:ip // 连接的手机端ipId int json:id // 唯一id标识 }var AllConns make(map[string]*ConnInfo) //创建字典集合存储连接信息// Start 启动 func Start() {//处理ws的连接http.HandleFunc(/ws, HandleMsg)// //监听7001端口号作为websocket连接的服务log.Info(Server started on :7001)log.Fatal(http.ListenAndServe(:7001, nil)) }// ChannelStorage channel数据 type ChannelStorage struct {sync.RWMutexchannels map[string]chan *RequestTemplateData }var ConnRequestData map[int]*RequestTemplateDatavar upgrader websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true}, }// HandleMsg 处理ws连接每来一个新客户端请求就建立一个新连接 func HandleMsg(w http.ResponseWriter, r *http.Request) {conn, err : upgrader.Upgrade(w, r, nil) // 协议升级这里也可以直连if err ! nil {log.Error(err)return}//获取连接ip这里是为了区分每个连接connIp : conn.RemoteAddr().String()// 这里是为了后续关闭channelrootCtx : context.Background()ctx, cancel : context.WithCancel(rootCtx)//加入连接AllConns[connIp] ConnInfo{Conn: conn, // 客户端ws链接对象Ctx: ctx, // 连接上下文CtxCancel: cancel, // 取消连接上下文}defer func() {// 如果断开连接删除数据if AllConns[connIp] ! nil {AllConns[connIp].CtxCancel()delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 这里对结束做处理}delete(AllConns, conn.RemoteAddr().String())err conn.Close()if err ! nil {return}log.Error(HandleMsg异常开始defer处理:, err)if err : recover(); err ! nil {log.Error(websocket连接异常已断开:, err)}}()log.WithFields(log.Fields{connIp: connIp,}).Info(沙箱已连接)reqCh : ChannelStorage{}go reqCh.ResultConsumer(ctx) // 这里是消费者//循环读取ws客户端的消息for {// 读取消息_, msg, err : conn.ReadMessage()if err ! nil {log.WithFields(log.Fields{connIp: connIp,}).WithError(err).Error(读取websocket的消息失败)if AllConns[connIp] ! nil {delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 连接断开设置状态为结束}// 断开ws连接conn.Close()delete(AllConns, conn.RemoteAddr().String())return}//msg []byte转stringmsgStr : string(msg)log.Info(收到消息为:, msgStr)//反序列化消息为结构体requestData : RequestTemplate{}if err : json.Unmarshal(msg, requestData); err ! nil {conn.WriteJSON(gin.H{id: 未知, op: 未知, error: cmd通信的请求参数有误无法json decode})log.Error(json_decode cmd命令的请求参数时出错:, err)continue}dataInfo : requestData.Data// 这里实际上有很多操作简写为两种if requestData.Op ! {switch requestData.Op {// 收到报告case report:go reqCh.Produce(dataInfo) // 生产者发送一条消息// 已完成case done:go CheckDone(dataInfo, conn) // 做完成的处理default:log.Error(未识别的命令:, msgStr)}}} } 有一个for循环在持续监听ws消息消费者只启动一次这里重点就是生产和消费如何实现 “生产者” “生产者”要做的事就是 1 每当收到ws消息后解析拿到唯一id这个唯一是指这个连接下的所有上报消息的id都是相同的 2 判断这个“唯一id”是否已经创建了channel,若创建了则不需要创建直接写入channel若未创建则新建channel 以下是生产者的demo: // GetChannel 获取通道 func (cs *ChannelStorage) GetChannel(key string) chan *RequestTemplateData {cs.RLock()defer cs.RUnlock()return cs.channels[key] }// CreateChannel 创建通道并存储到 map 中 func (cs *ChannelStorage) CreateChannel(key string) chan *RequestTemplateData {cs.Lock()defer cs.Unlock()if cs.channels nil {cs.channels make(map[string]chan *RequestTemplateData, 800)}ch : make(chan *RequestTemplateData, 10)cs.channels[key] chreturn ch }// Produce 往上报channel中写数据 func (cs *ChannelStorage) Produce(requestData *RequestTemplateData) {defer func() {if err : recover(); err ! nil {log.Info(_____________recover CaseResultAdd error________: , err)}}()// 创建存储通道的结构体实例chanelKey : strconv.Itoa(requestData.Id)channel : cs.GetChannel(chanelKey)if channel nil {channel cs.CreateChannel(chanelKey)}// 直接往channel里面塞if channel ! nil {channel - requestData} }消费者 消费者由于只启动一次但后续可能会有新的channel因此需要增加一个获取所有连接的方法 消费者demo: func (cs *ChannelStorage) ResultConsumer(ctx context.Context) {defer func() {if err : recover(); err ! nil {log.Info(_____________recover CaseResultConsumer error________: , err)}}()for {select {case -ctx.Done():log.Info(websocket断开连接消费者协程退出...)returndefault:cs.processAllChannels(ctx) // 传入 context.Contexttime.Sleep(2 * time.Second) // 控制处理频率}} }// processAllChannels 获取所有channel func (cs *ChannelStorage) processAllChannels(ctx context.Context) {cs.RLock()defer cs.RUnlock()var wg sync.WaitGroup // 用于等待所有通道处理完毕for chName, channel : range cs.channels {wg.Add(1)go func(chName string, channel chan *RequestTemplateData) {defer wg.Done()cs.processChannel(chName, channel, ctx)}(chName, channel)}wg.Wait() // 等待所有通道处理完毕 } func (cs *ChannelStorage) processChannel(chName string, channel chan *RequestTemplateData, ctx context.Context) {const batchSize 10 // 每次处理的数据量var messages []*RequestTemplateDatatargetMsgOverTime : 10 * time.Second // 超时时间for {select {case caseMsg : -channel:messages append(messages, caseMsg) // 将接收到的消息放入 messages 切片中if len(messages) batchSize {tmpMessages : messagesmessages nilprocessMessages(tmpMessages)}case -time.After(targetMsgOverTime):log.Info(Timeout reached. Processing...)if len(messages) 0 {tmpMessages : messagesmessages nillog.Info(Processing remaining messages for channel:, chName)processMessages(tmpMessages)}case -ctx.Done(): // 如果收到上下文取消信号退出函数log.Info(______________________error__________cancel______)return}} }func processMessages(messages []*RequestTemplateData) {// 在这里处理消息就是批量的了 }
http://www.hkea.cn/news/14313243/

相关文章:

  • 做陌陌网站什么做wordpress防止频繁搜索
  • 网站建设不挣钱wordpress手机端发布软件
  • 电商网站建设价格低工商网站查询企业信息武威
  • 南宁百度推广seoseo厂家电话
  • 天津城市建设管理职业学院网站网站开发工程师符号代码
  • 郑州seo网站排名优化公司网站建设资质
  • x网站免费模板湖南省建设信息网站
  • 网站的建设主机费用如何自做自己的网站
  • 电商设计网站模板徐汇集团网站建设
  • 郑州汉狮做网站的大公司网站便民服务平台怎么做
  • 企业网站维护费用网店制作
  • dede静态网站微信网站怎么做
  • 临沂网站建设微信wordpress不能添加用户
  • 新桥做网站wordpress 自动 图片
  • 榆树网站建设wordpress 模板 知乎
  • 外贸网站推广技巧政务网站建设工作方案
  • 定安网站制作广州番禺区邮编
  • 中国电信网站备案建设中网站
  • 网站子站建设wordpress 执行sql update
  • 网站的建设与预算教学网站系统流程图
  • 潮安区建设局网站做网站上传的图片大小
  • 建设网站存在的问题邢台seo外包
  • 个性个人网站2024房地产最新消息
  • 微信云网站用什么做万维网站注册
  • 西宁网站建设多少钱c2c网站建设的需求分析
  • 建设项目环评在什么网站公示游戏推广好做吗
  • 备案号怎么放置到网站网站设计的技能要求
  • 中山网站建设最好的公司只有做推广才能搜索到网站吗
  • 潍坊360做网站怎么样一搜个人网站制作
  • 网站建设服务合同 律师免费建电子商务网站