怎么找人做动漫视频网站,重庆快速网站建设,跨境电商怎么注册开店,做网站你给推广说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后#xff0c;产生的一个问题恰好也是因为异步产生的。
内容
1 问题描述
我有一个队列 stream1, 对应的worker1需要周期性的获取数据#xff0c;对输入的数据进行模式识别后分流。worker1我设施为10秒运行…说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后产生的一个问题恰好也是因为异步产生的。
内容
1 问题描述
我有一个队列 stream1, 对应的worker1需要周期性的获取数据对输入的数据进行模式识别后分流。worker1我设施为10秒运行一次。然后我就发现输出队列的数据大约是6~7倍于原始数据。
2 分析
在同步执行的状态下前面 一个任务没有结束后面的任务即使到了执行时间也会错过。这个在APS任务里是非常明确的。但由于Celery执行的Worker是异步的这意味着即使前一个任务没有完成后一个任务还是会如期启动另开一个线头。
Worker1之前的模式是采用xrange方式获取数据在处理完成后才将消息删除。
由于模式识别的过程比较复杂层层过滤所以单个worker执行的时间超过了60秒。这样在这批消息删除之前每次启动的worker都取到了相同的数据处理后也会输出到结果队列。
3 解决办法
理论上每次worker的取数应该是采用xfetch比较合理但是对应的xfetch会因为worker的中断导致消息残留。所以就要有另一些worker来进行残余消息的检测和处理。结果就是 xfetch worker residual worker配合显得麻烦。
过去在同步状态下我就偷懒只用一个worker进行xrange这样只有消息被真实消费才会删除。 xfetch是支持多个worker并行的而xrange则智能支持单个worker。 所以本次要做的事就是把xfetch residual 模式搞一下以后该用什么模式就什么模式。
4 实践 为每个worker提供一种获取残余消息(residual)的办法每个小时执行一次即可。普通的worker(fetch)一般是秒级或者分钟级执行的。 当前的QManager是架在RedisAgent服务上封装的对象这个对象极大简化了平时的操作。不过之前并没有完全将QManager与RedisAgent的参数对接采用了较为简单的方式。
本次需要做的是先使用RedisAgent完成对应的任务然后将QManager进行升级。
构造测试队列
test_list [{doc_id:1, content:first}, {doc_id:2, content:ss}]
qm.ensure_group(test.test.test)
qm.parrallel_write_msg(test.test.test, test_list){status: True, msg: ok,add 2 of 2 messages}获取消息
qm.xfetch(test.test.test, count1)
{data: [{_msg_id: 1718984345178-0, doc_id: 1, content: first}],status: True,msg: ok}1 判断是否有延误消息
两个关键参数一个是队列名称一个是延误时间。如果不写延误时间就是看所有的延误。
resp req.post(http://172.17.0.1:24118/get_pending_msg/,json {stream_name:test.test.test , idle_seconds:20}).json()resp req.post(http://172.17.0.1:24118/get_pending_msg/,json {stream_name:test.test.test,idle_seconds:None }).json(){status: True,msg: ok,data: [[1718984345178-0, consumer1, 36675032, 1]]}延误时间的最大作用是避免获取短时间内超时的任务如果任务本身就需要很长时间
如果data字段长度不为0那么就会有延误消息获取最小和最大的id即可。
2 根据起止id获取数据
delay_data resp[data]
start_id delay_data[0][0]
end_id delay_data[-1][0]resp req.post(http://172.17.0.1:24118/xrange/,json {stream_name:test.test.test , start_id: start_id,end_id:end_id}).json()
{status: True,msg: ok,data: [{_msg_id: 1718984345178-0, doc_id: 1, content: first},{_msg_id: 1718984345178-1, doc_id: 2, content: ss}]}所以相应低修改QMananger(version1.3)的xrange方法并增加xpending方法
xrange
...# 批量获取数据def xrange(self, stream_name, count None, start_id - , end_id ):cur_count count or self.batch_size recs_resp req.post(self.redis_agent_host xrange/,json {connection_hash:self.redis_connection_hash, stream_name:stream_name,count:cur_count,start_id:start_id,end_id:end_id}).json()return recs_respxpending。原来的接口似乎有点小bug:如果队列没有延误接口查询会失败
...def xpending(self, stream_name,count None, idle_seconds 3600):cur_count count or self.batch_size # 1 确认是否有延误消息:没有延误消息的情况接口会报错try:resp req.post(self.redis_agent_host get_pending_msg/,json {stream_name: stream_name, idle_seconds: idle_seconds}).json()# 如果没有数据直接返回(标准格式)if len(resp[data]) 0:print(No Pending)return resp except:return {status:True, msg:query pending fail, data:[]}# 2 获取被延误的消息min_id resp[data][0][0]max_id resp[data][-1][0]return self.xrange(stream_name, count cur_count, start_id min_id, end_id max_id)Note: 我们对正常执行的任务感知/容忍的周期为分钟对延误执行(补漏)的任务感知/容忍的周期为小时。
来看改造后的QM
# xfetch但是此时已经无数据可取
qm.xfetch(test.test.test )
{status: True, msg: ok, data: []}
# xpending 此时有两条延误较长时间的消息
qm.xpending(test.test.test , idle_seconds3600)
{status: True,msg: ok,data: [{_msg_id: 1718984345178-0, doc_id: 1, content: first},{_msg_id: 1718984345178-1, doc_id: 2, content: ss}]}
# 用xrange取出处理
data_list qm.xpending(test.test.test , idle_seconds3600)[data]
[{_msg_id: 1718984345178-0, doc_id: 1, content: first},{_msg_id: 1718984345178-1, doc_id: 2, content: ss}]# 假设处理完准备删除消息
data_msg_list qm.extract_msg_id(data_list)
[1718984345178-0, 1718984345178-1]
qm.xdel(test.test.test, data_msg_list)
{data: 2, status: True, msg: ok}# 再次使用xpending
qm.xpending(test.test.test , idle_seconds3600)
{status: True, msg: no source data, data: []}
另外xpending中即使是把pending的消息处理掉了仍然可以读到pending信息所以每次会调用一下xrange查询一个不存在的区间稍微有点浪费。不过考虑到这是补救型的操作一个小时才运行一次就没有关系了。