2025年3月14日 星期五 甲辰(龙)年 月十三 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

Python定时任务调度框架APScheduler详解

时间:02-10来源:作者:点击数:27

APScheduler 是 Python 中一个功能强大且灵活的定时任务调度库,允许你以多种方式(如日期、时间间隔等)调度执行 Python 函数或代码块。它支持多种调度器(Schedulers),包括基于日期、间隔、CRON 表达式等多种方式。

APScheduler 官网地址:https://apscheduler.readthedocs.io/en/3.x/

APScheduler 官网

APScheduler 使用方法

安装 APScheduler:

你可以使用 pip 安装 APScheduler:

  • pip install apscheduler

APScheduler 代码示例:

以下示例演示了如何使用 APScheduler 创建一个简单的定时任务:

  • from apscheduler.schedulers.background import BackgroundScheduler
  • def my_job():
  • print("Executing my job!")
  • # 创建一个后台调度器
  • scheduler = BackgroundScheduler()
  • # 添加一个定时任务,每隔5秒钟执行一次 my_job 函数
  • scheduler.add_job(my_job, 'interval', seconds=5)
  • # 开始调度器
  • scheduler.start()
  • # 让程序保持运行,触发定时任务
  • try:
  • # 这里用一个循环来保持主线程的运行
  • while True:
  • pass
  • except (KeyboardInterrupt, SystemExit):
  • # 捕获 Ctrl+C 或者其他退出信号来优雅地关闭调度器
  • scheduler.shutdown()

这段代码创建了一个后台调度器(BackgroundScheduler),然后使用 add_job() 方法添加了一个定时任务,该任务每隔5秒钟执行一次 my_job() 函数。最后,使用 scheduler.start() 启动调度器,并用一个无限循环来保持主线程的运行,以便触发定时任务。

APScheduler 还支持更多高级功能,比如持久化存储、多种不同类型的触发器、多种调度器选择等等,可以根据实际需求进一步扩展和定制。

持久化存储

APScheduler 支持持久化存储来存储任务和调度程序的状态,以便在应用重启后恢复任务。

  • from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  • from apscheduler.executors.pool import ThreadPoolExecutor
  • from apscheduler.schedulers.background import BackgroundScheduler
  • # 使用 SQLAlchemyJobStore 进行持久化存储
  • jobstores = {
  • 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
  • }
  • # 创建一个线程池执行器
  • executors = {
  • 'default': ThreadPoolExecutor(10)
  • }
  • # 创建后台调度器,并指定 jobstores 和 executors
  • scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
  • # ... 添加和启动定时任务的代码

自定义触发器

使用自定义触发器,你可以创建基于自己需求的定时策略。

  • from apscheduler.triggers.base import BaseTrigger
  • from datetime import datetime, timedelta
  • class MyTrigger(BaseTrigger):
  • def get_next_fire_time(self, previous_fire_time, now):
  • return now + timedelta(seconds=5) # 5秒后执行任务
  • # 创建一个后台调度器
  • scheduler = BackgroundScheduler()
  • # 使用自定义触发器创建定时任务
  • scheduler.add_job(my_job, MyTrigger())
  • # ... 启动调度器的代码

监听器

你可以添加监听器来监控任务的执行、异常和其他事件。

  • from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
  • from apscheduler.schedulers.background import BackgroundScheduler
  • def my_listener(event):
  • if event.exception:
  • print('The job crashed :(')
  • else:
  • print('The job worked :)')
  • # 创建一个后台调度器
  • scheduler = BackgroundScheduler()
  • # 添加监听器
  • scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
  • # ... 添加和启动定时任务的代码

这些APScheduler的高级功能可以帮助你更好地管理定时任务、监控任务的状态以及根据需求进行灵活的定制化。

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门