当前位置: 首页 > news >正文

在百度做网站销售baidu 百度一下

在百度做网站销售,baidu 百度一下,网站要做手机版怎么做的,vscode网页设计教程Celery的任务流 在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此…

在这里插入图片描述

Celery的任务流

在之前调用任务的时候只是使用delay()和apply_async()方法。但是有时我们并不想简单的执行单个异步任务,比如说需要将某个异步任务的结果作为另一个异步任务的参数或者需要将多个异步任务并行执行,返回一组返回值,为了实现此目标,Celery使用一种叫做signatures的东西

celery的简单使用

signature的引入

signature官方文档

可以简单理解为signature是将之前的异步任务以某种方式包装,包装后的异步任务仍可以使用之前的delay()和apply_async()方法,并且包装后的异步任务就可以以多种方式组合成复杂的工作流程

先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c# 减
@app.task
def subtract_num(a, b):print(f'{a}-{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a - breturn c# 乘
@app.task
def multiply_num(a, b):print(f'{a}*{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a * breturn c# 除
@app.task
def divide_num(a, b):print(f'{a}/{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a / breturn c@app.task
def test(args):print(args)return args

运行celery消费者

# 格式为:celery -A app对象所在的文件 worker -l 日志级别 -Q 队列名称(也可以不指定,默认为celry)
celery -A tasks worker -l info -Q test

在这里插入图片描述

用signature对上面的add_num包装

from celery import signature
from tasks import add_num, subtract_num, multiply_num, divide_num# 方法1
sign = add_num.signature((1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法2
sign = signature('tasks.add_num', (1, 1), queue='test')
ret = sign.delay()
print(ret.get())# 方法3
sign = signature(add_num, (1, 1), queue='test')
ret = sign.delay()
print(ret.get())

chain的使用

chain官方链接

chain可以将signature包装的任务函数一个一个执行,一个执行完将执行return结果传递给下一个任务函数

from celery import signature, chain
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (2,), queue='test')
multiply_sign = signature(multiply_num, (2,), queue='test')
divide_sign = signature(divide_num, (2,), queue='test')# 对某个数依次做加减乘除处理
chain1 = chain(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = chain1.delay()
print(ret.get())

在这里插入图片描述

可以看到异步任务依次执行,并将上一个异步任务的结果作为参数传递给下一个,形成一个链条

group的使用

group官方链接

group可以将signature包装的任务函数并行执行,返回一组返回值

from celery import signature, chain, group
from tasks import add_num, subtract_num, multiply_num, divide_numadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')# 对某个数分别做加减乘除处理group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)
ret = group1.delay()
print(ret.get())
#[8, 4, 12, 3.0]

在这里插入图片描述
可以看到相比于chain,group里的任务是同时执行

chord的使用

chord官方链接
依赖一个group任务,group任务结束后,将所有子任务的返回值作为参数传递给chord的回调函数,即chord由group任务组与回调函数组成

上代码

from celery import signature, chain, group, chord
from tasks import add_num, subtract_num, multiply_num, divide_num, testadd_sign = signature(add_num, (6, 2), queue='test')
subtract_sign = signature(subtract_num, (6, 2), queue='test')
multiply_sign = signature(multiply_num, (6, 2), queue='test')
divide_sign = signature(divide_num, (6, 2), queue='test')group1 = group(add_sign, subtract_sign, multiply_sign, divide_sign)#包装test异步任务函数
test_sign = signature(test, queue='test')c1 = chord(group1, test_sign)
c1.delay()

在这里插入图片描述
可以看出,在执行完加减乘除所有异步任务后,chord会将任务组的结果作为list交给test函数,这里的test有点像回调函数

PS:根据我的观察,chain,group,chord在执行完后都会返回一个任务id,其中chain的任务id为任务链里最后一个任务的id,group的任务id是一个临时的任务id(group任务都结束后就会消失),chord的任务id是回调函数的任务id。因此chain和chord在任务结束后,任务结果还是可以查到的,而group则查询不到,因此group的任务结果可能无法用AsyncResult查询到

最后附上celery关于任务工作流的官方链接
celery工作流

PS

有的时候我们可能需要在celery的task函数中调用其他的celery函数,并且需要同步的获取结果(其实着本质上就是把异步的celery函数变成同步运行),具体如下,先创建一个tasks.py

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():#在test函数里调用add_num函数,并且同步获取结果,将结果作为test函数的返回值ret = add_num.delay(1,2)ret = ret.get()return ret
#启动消费者
celery -A tasks worker -l info

调用test异步函数

from tasks import test
ret = test.delay()

在这里插入图片描述

结果就是出错了,因为官方不建议在一个异步任务中区等待另一个异步任务的返回结果,所以这个时候就可以通过上面的chain方法实现这个需求。当然还有一种不建议的方法就是在同步获取celery任务结果的get方法中添加参数disable_sync_subtasks=False,具体如下

import timeimport celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('app', backend=backend, broker=broker)# 加
@app.task
def add_num(a, b):print(f'{a}+{b}')time.sleep(3)  # 做延时处理,方便后面查看任务执行顺序c = a + breturn c@app.task
def test():ret = add_num.delay(1, 2)ret = ret.get(disable_sync_subtasks=False)#在这添加disable_sync_subtasks=Falsereturn ret

再调用一次test方法

在这里插入图片描述
成功调用

详见celery官方链接
链接传送门

结语

写这些,仅记录自己学习使用celery的过程。如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。

在这里插入图片描述

http://www.hkea.cn/news/891912/

相关文章:

  • 网站优化qq群山东做网站
  • wordpress icomoon太原seo快速排名
  • 中华建设杂志网站记者数据指数
  • 网站开发测试情况南召seo快速排名价格
  • 上海仓储公司小红书seo优化
  • 南京建设公司网站网络营销整合推广
  • wordpress更改语言沈阳seo优化
  • wordpress免费网站世界大学排名
  • 做网站的属于什么专业?百度爱采购竞价推广
  • 网站建设一年多少恰东莞网站到首页排名
  • 新企业网站应该怎么做SEO优化广告联盟有哪些
  • 手机app开发网站建设软文推广文章案例
  • 网站自然排名百度经验官网登录
  • dz网站模板沧州网站优化公司
  • 桂林论坛天涯社区培训行业seo整站优化
  • 做伊瑞尔竞技场的网站搜索引擎简称seo
  • 46云虚拟主机股票发行ipo和seo是什么意思
  • 新泰做网站菏泽seo
  • 网站建设排名东莞seo收费
  • 做网站前后端的发布流程自己如何制作网站
  • 网站营销与推广策略百度一下官网首页百度
  • 网站建设张世勇100个免费推广b站
  • 网络营销的常用工具百度关键词优化点击 教程
  • 公司网站要怎么做少儿编程培训机构排名前十
  • 一个好的网站是什么样的商家联盟营销方案
  • 网站解除域名绑定网站广告收费标准
  • 郑州的建设网站有哪些手续免费发布推广信息的平台有哪些
  • 手机做网站软件优化服务平台
  • 网站图片装修的热切图怎么做营销技巧培训
  • 可以上传图片的网站怎么做百度关键词点击