wordpress 加ico,贵阳seo网站建设,厦门人才网唯一官网,河北邯郸网站建设公司环境#xff1a;Windows 11、python 3.12.3、Django 4.2.11、 APScheduler 3.10.4
背景#xff1a;工作需要使用且用法较为复杂#xff0c;各种功能基本都使用了
事件#xff1a;20240920
说明#xff1a;记录#xff0c;方便后期自己查找 1、搭建基础环境
文件结构图…环境Windows 11、python 3.12.3、Django 4.2.11、 APScheduler 3.10.4
背景工作需要使用且用法较为复杂各种功能基本都使用了
事件20240920
说明记录方便后期自己查找 1、搭建基础环境
文件结构图
蓝色代表文件黑色代表目录主要是django自动生成的文件以及apscheduler需要的文件 包括Django、APScheduler两个代码如下
新建scheduler文件
创建调度器并配置启动函数
# scheduleJob\scheduler.pyfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from django_apscheduler.jobstores import DjangoJobStore
from pytz import timezone, utc# 第二种方式内嵌
jobstores {default: DjangoJobStore()}
executors {default: ThreadPoolExecutor(20), processpool: ProcessPoolExecutor(5)}
job_defaults { # 该参数既可以在创建scheduler对象时使用也可以用在add_job中对象范围广、优先级低coalesce: True, # 是否合并积压的任务。如果设置为 True当任务运行时间落后时会只运行一次而不是运行多次。默认值为 False。max_instances: 2, # 允许的最大作业实例数。确保同一任务在同一时间不会有多个实例运行。默认值为 1。misfire_grace_time: 30, # 设置任务错过其执行时间的容忍时间以秒为单位。如果任务在这个时间内错过了执行时间将立即执行。如果设置为 None则没有时间限制。replace_existing: True # 如果添加的任务ID已存在是否替换现有任务。默认值为 False。
}
scheduler BackgroundScheduler(jobstoresjobstores, executorsexecutors, job_defaultsjob_defaults, timezonetimezone(Asia/Shanghai))def start():scheduler.start()在settings文件中添加应用并配置数据库
# scheduleJob\settings.pyINSTALLED_APPS [# ...django_apscheduler,testapscheduler,
]
ROOT_URLCONF scheduleJob.urls
DATABASES {default: {ENGINE: django.db.backends.mysql,NAME: scheduler,USER: root,PASSWORD: 123456,HOST: localhost,PORT: 3306,}
}
LANGUAGE_CODE en-us
TIME_ZONE Asia/Shanghai
USE_I18N True
USE_TZ False 在apps中实现启动调度器
# testapscheduler\apps.pyfrom django.apps import AppConfigclass TestapschedulerConfig(AppConfig):default_auto_field django.db.models.BigAutoFieldname testapschedulerprint(启动)def ready(self):from scheduleJob import schedulerscheduler.start() 在url中配置路由
# scheduleJob\urls.pyfrom django.contrib import admin
from django.urls import path
from testapscheduler.views import operate_taskurlpatterns [path(admin/, admin.site.urls),path(operate_task/, operate_task),
] 在views文件中实现真正的逻辑处理
# testapscheduler\views.pyfrom scheduleJob.scheduler import scheduler
from django.http import HttpResponse
from datetime import datetime
from django.views.decorators.csrf import csrf_exemptimport time, json# Create your views here.
csrf_exemptdef operate_task(request):# 动态任务id, 利用时间戳获取整数如一秒内添加两个则会出现bugp request.POST.get(id)print(p, )my_task_id int(datetime.timestamp(datetime.now()))# job参数job_kwargs {func:excute_task, # job的函数id:str(my_task_id), # 添加的jobid, 必须是字符串name:ftask_{my_task_id}, # 添加的job名称kwargs:{info:test}, # job的函数参数本例中excute_task需要的参数next_run_time: datetime.now(), # 添加任务成功后立即执行replace_existing: True, misfire_grace_time: 10, coalesce: True, max_instances: 10, trigger: interval, # 任务执行类型还有date、cronseconds: 10, # 任务执行间隔时间代表每10s执行一次}scheduler.add_job(**job_kwargs)# scheduler.remove_all_jobs()return HttpResponse(add task success)def excute_task(info):time.sleep(3)print(info, --------------------------------, datetime.now())
数据库迁移
运行前先执行数据库迁移
python manage.py makemigrationspython manage.py migrate# 前者是将model层转为迁移文件migration
# 后者将新版本的迁移文件执行更新数据库。 会在数据库生成两个表引用方法如下
from django_apscheduler.models import DjangoJob, DjangoJobExecution
django_apscheduler_djangojob对应Django中的DjangoJob共计三个字段分别为id、next_run_time、job_state默认排序字段为next_run_time
django_apscheduler_djangojobexecution对应Django中的DjangoJobExecution共计八个字段分别是id、status、run_time、duration、finished、exception、traceback、job_id。
postman请求测试
在postman中请求路由代码如下
# postman生成的请求代码import requests
import jsonurl localhost:8000/operate_task/payload json.dumps({action: start,id: 1
})
headers {Content-Type: application/json
}response requests.request(POST, url, headersheaders, datapayload)print(response.text)实现效果
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:13:58.165250
test -------------------------------- 2024-09-20 15:14:04.469140
test -------------------------------- 2024-09-20 15:14:04.469140
任务状态
#: constant indicating a schedulers stopped state
STATE_STOPPED 0
#: constant indicating a schedulers running state (started and processing jobs)
STATE_RUNNING 1
#: constant indicating a schedulers paused state (started but not processing jobs)
STATE_PAUSED 2 2、调度器动态操作
1、查询所有任务
# testapscheduler\views.py# ......csrf_exempt
def query_all_task(request):# 查看所有任务job_list scheduler.get_jobs()return JsonResponse([{x.name:x.id} for x in job_list], safeFalse)
响应
[{task_1726812545: 1726812545},{task_1726816011: 1726816011}
]
2、查询某个任务 csrf_exempt
def get_job(request):# 查询任务是否存在job_id loads(request.body).get(id)msg scheduler.get_job(job_idjob_id)print(msg)return JsonResponse({msg:success}) 3、移除所有任务
csrf_exempt
def remove_all_jobs(request):# 移除所有任务, 事件代码是256scheduler.remove_all_jobs()return JsonResponse({msg:success})
4、移除某个任务
csrf_exempt
def remove_job(request):# 移除某个任务, 事件代码是1024job_id loads(request.body).get(id)scheduler.remove_job(job_idjob_id)return JsonResponse({msg:success})
5、暂停某个任务
csrf_exempt
def pause_job(request):# 暂停某个任务, 事件代码是2048job_id loads(request.body).get(id)scheduler.pause_job(job_idjob_id)return JsonResponse({msg:success})
6、恢复某个任务
csrf_exempt
def resume_job(request):# 恢复某个任务仅能恢复已暂停的任务, 事件代码是2048job_id loads(request.body).get(id)scheduler.resume_job(job_idjob_id)return JsonResponse({msg:success})
7、添加某个任务
csrf_exempt
def add_job(request):# 添加任务, 事件代码是512kwargs loads(request.body)scheduler.add_job(**kwargs)return JsonResponse({msg:success})
8、修改某个任务
csrf_exempt
def modify_job(request):# 恢复某个任务仅能恢复已暂停的任务, 事件代码是2048job_id loads(request.body).get(id)changes loads(request.body).get(changes)scheduler.modify_job(job_idjob_id, changeschanges)return JsonResponse({msg:success})
9、打印所有任务信息
csrf_exempt
def print_jobs(request):# 打印所有任务信息scheduler.print_jobs()return JsonResponse({msg:success})
10、启动调度器
csrf_exempt
def start(request):# 调度程序启动, 事件代码是1scheduler.start()return JsonResponse({msg:success})
11、关闭调度器
csrf_exempt
def shutdown(request):# 调度程序关闭, 事件代码是2scheduler.shutdown()return JsonResponse({msg:success})
12、暂停调度器
csrf_exempt
def pause(request):# 调度程序暂停, 事件代码是4scheduler.pause()return JsonResponse({msg:success})
13、恢复调度器
csrf_exempt
def resume(request):# 调度程序恢复, 事件代码是8scheduler.resume()return JsonResponse({msg:success})
14、添加执行器
csrf_exempt
def add_executor(request):# 调度程序添加执行器, 事件代码是16executors {default: ThreadPoolExecutor(20), processpool: ProcessPoolExecutor(5)}scheduler.add_executor(executorexecutors)
15、删除执行器
csrf_exempt
def remove_executor(request):# 调度程序删除执行器, 事件代码是32alias loads(request.body).get(alias)scheduler.remove_executor(aliasalias)return JsonResponse({msg:success})
16、添加作业存储器
csrf_exempt
def add_jobstore(request):# 调度程序添加作业存储器, 事件代码是64jobstores {default: DjangoJobStore()}scheduler.add_jobstore(jobstoresjobstores)
17、删除作业存储器
csrf_exempt
def remove_jobstore(request):# 调度程序删除作业存储器, 事件代码是128alias loads(request.body).get(alias)scheduler.remove_jobstore(aliasalias)return JsonResponse({msg:success})
18、修改触发器参数
csrf_exempt
def reschedule_job(request):# 调度程序修改触发器参数job_id loads(request.body).get(job_id)trigger_args loads(request.body).get(trigger_args)scheduler.reschedule_job(trigger_argstrigger_args, job_idjob_id)return JsonResponse({msg:success}) 未完待续 ······