当前位置: 首页 > news >正文

怎么找人做动漫视频网站重庆快速网站建设

怎么找人做动漫视频网站,重庆快速网站建设,跨境电商怎么注册开店,做网站你给推广说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后#xff0c;产生的一个问题恰好也是因为异步产生的。 内容 1 问题描述 我有一个队列 stream1, 对应的worker1需要周期性的获取数据#xff0c;对输入的数据进行模式识别后分流。worker1我设施为10秒运行…说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后产生的一个问题恰好也是因为异步产生的。 内容 1 问题描述 我有一个队列 stream1, 对应的worker1需要周期性的获取数据对输入的数据进行模式识别后分流。worker1我设施为10秒运行一次。然后我就发现输出队列的数据大约是6~7倍于原始数据。 2 分析 在同步执行的状态下前面 一个任务没有结束后面的任务即使到了执行时间也会错过。这个在APS任务里是非常明确的。但由于Celery执行的Worker是异步的这意味着即使前一个任务没有完成后一个任务还是会如期启动另开一个线头。 Worker1之前的模式是采用xrange方式获取数据在处理完成后才将消息删除。 由于模式识别的过程比较复杂层层过滤所以单个worker执行的时间超过了60秒。这样在这批消息删除之前每次启动的worker都取到了相同的数据处理后也会输出到结果队列。 3 解决办法 理论上每次worker的取数应该是采用xfetch比较合理但是对应的xfetch会因为worker的中断导致消息残留。所以就要有另一些worker来进行残余消息的检测和处理。结果就是 xfetch worker residual worker配合显得麻烦。 过去在同步状态下我就偷懒只用一个worker进行xrange这样只有消息被真实消费才会删除。 xfetch是支持多个worker并行的而xrange则智能支持单个worker。 所以本次要做的事就是把xfetch residual 模式搞一下以后该用什么模式就什么模式。 4 实践 为每个worker提供一种获取残余消息(residual)的办法每个小时执行一次即可。普通的worker(fetch)一般是秒级或者分钟级执行的。 当前的QManager是架在RedisAgent服务上封装的对象这个对象极大简化了平时的操作。不过之前并没有完全将QManager与RedisAgent的参数对接采用了较为简单的方式。 本次需要做的是先使用RedisAgent完成对应的任务然后将QManager进行升级。 构造测试队列 test_list [{doc_id:1, content:first}, {doc_id:2, content:ss}] qm.ensure_group(test.test.test) qm.parrallel_write_msg(test.test.test, test_list){status: True, msg: ok,add 2 of 2 messages}获取消息 qm.xfetch(test.test.test, count1) {data: [{_msg_id: 1718984345178-0, doc_id: 1, content: first}],status: True,msg: ok}1 判断是否有延误消息 两个关键参数一个是队列名称一个是延误时间。如果不写延误时间就是看所有的延误。 resp req.post(http://172.17.0.1:24118/get_pending_msg/,json {stream_name:test.test.test , idle_seconds:20}).json()resp req.post(http://172.17.0.1:24118/get_pending_msg/,json {stream_name:test.test.test,idle_seconds:None }).json(){status: True,msg: ok,data: [[1718984345178-0, consumer1, 36675032, 1]]}延误时间的最大作用是避免获取短时间内超时的任务如果任务本身就需要很长时间 如果data字段长度不为0那么就会有延误消息获取最小和最大的id即可。 2 根据起止id获取数据 delay_data resp[data] start_id delay_data[0][0] end_id delay_data[-1][0]resp req.post(http://172.17.0.1:24118/xrange/,json {stream_name:test.test.test , start_id: start_id,end_id:end_id}).json() {status: True,msg: ok,data: [{_msg_id: 1718984345178-0, doc_id: 1, content: first},{_msg_id: 1718984345178-1, doc_id: 2, content: ss}]}所以相应低修改QMananger(version1.3)的xrange方法并增加xpending方法 xrange ...# 批量获取数据def xrange(self, stream_name, count None, start_id - , end_id ):cur_count count or self.batch_size recs_resp req.post(self.redis_agent_host xrange/,json {connection_hash:self.redis_connection_hash, stream_name:stream_name,count:cur_count,start_id:start_id,end_id:end_id}).json()return recs_respxpending。原来的接口似乎有点小bug:如果队列没有延误接口查询会失败 ...def xpending(self, stream_name,count None, idle_seconds 3600):cur_count count or self.batch_size # 1 确认是否有延误消息:没有延误消息的情况接口会报错try:resp req.post(self.redis_agent_host get_pending_msg/,json {stream_name: stream_name, idle_seconds: idle_seconds}).json()# 如果没有数据直接返回(标准格式)if len(resp[data]) 0:print(No Pending)return resp except:return {status:True, msg:query pending fail, data:[]}# 2 获取被延误的消息min_id resp[data][0][0]max_id resp[data][-1][0]return self.xrange(stream_name, count cur_count, start_id min_id, end_id max_id)Note: 我们对正常执行的任务感知/容忍的周期为分钟对延误执行(补漏)的任务感知/容忍的周期为小时。 来看改造后的QM # xfetch但是此时已经无数据可取 qm.xfetch(test.test.test ) {status: True, msg: ok, data: []} # xpending 此时有两条延误较长时间的消息 qm.xpending(test.test.test , idle_seconds3600) {status: True,msg: ok,data: [{_msg_id: 1718984345178-0, doc_id: 1, content: first},{_msg_id: 1718984345178-1, doc_id: 2, content: ss}]} # 用xrange取出处理 data_list qm.xpending(test.test.test , idle_seconds3600)[data] [{_msg_id: 1718984345178-0, doc_id: 1, content: first},{_msg_id: 1718984345178-1, doc_id: 2, content: ss}]# 假设处理完准备删除消息 data_msg_list qm.extract_msg_id(data_list) [1718984345178-0, 1718984345178-1] qm.xdel(test.test.test, data_msg_list) {data: 2, status: True, msg: ok}# 再次使用xpending qm.xpending(test.test.test , idle_seconds3600) {status: True, msg: no source data, data: []} 另外xpending中即使是把pending的消息处理掉了仍然可以读到pending信息所以每次会调用一下xrange查询一个不存在的区间稍微有点浪费。不过考虑到这是补救型的操作一个小时才运行一次就没有关系了。
http://www.hkea.cn/news/14392423/

相关文章:

  • 网站开发工程师绩效考核表网站外包费用怎么做分录
  • cdr 做网站页面统一手机网站
  • 厚街镇网站建设贵州建设厅网站
  • 模块化网站建设 局域网wordpress改dz
  • 刚做的网站 为啥搜不到织梦网站备份几种方法
  • 网站建设得步骤互联网服务行业有哪些工作
  • 网站模板整站资源建设三轮摩托车官方网站
  • 论吉林省网站职能建设天眼查企业入口免费
  • 阿里云虚拟主机网站wordpress 博客 主题
  • 企业建站的作用是什么seo与网站优化 pdf
  • 安徽省建设工程质量安全监督总站网站seo规范培训
  • 手机软件下载网站高大上的公司网站
  • 网站都是用什么编写的搜索引擎排名的三大指标
  • 咨询公司网站设计效果图网站哪个好
  • 电子工程网官网wordpress 4 优化
  • 网站改版合同wordpress 4.9.7 中文
  • 做网站项目团队口号太仓手机网站建设
  • 傻瓜式网站建设软件有哪些网页设计报价表
  • 黄金网站app视频域名 网址 网站名称
  • 营销型网站方案书数字营销工具
  • 只做女性的网站威海市网站建设
  • 辽宁网站建设电话百度企业网站建设
  • 建设互联网站的目的热门的网页设计工具有哪些
  • 上海网站推广找哪家成都展示型网站开发
  • 网站建设是前端的吗谁做的四虎网站是多少
  • 建站公司新闻资讯成都网络推广公司
  • 客户网站留言有用unity做网站的吗
  • 贵港有网站建设的公司吗北滘大良网站制作
  • wordpress 网站重置电商网站建设的步骤
  • 做首图的网站免费制作自己的网站