APScheduler 是 Python 中一个功能强大且灵活的定时任务调度库,允许你以多种方式(如日期、时间间隔等)调度执行 Python 函数或代码块。它支持多种调度器(Schedulers),包括基于日期、间隔、CRON 表达式等多种方式。
APScheduler 官网地址:https://apscheduler.readthedocs.io/en/3.x/
APScheduler 使用方法
安装 APScheduler:
你可以使用 pip 安装 APScheduler:
- pip install apscheduler
-
APScheduler 代码示例:
以下示例演示了如何使用 APScheduler 创建一个简单的定时任务:
- from apscheduler.schedulers.background import BackgroundScheduler
-
- def my_job():
- print("Executing my job!")
-
- # 创建一个后台调度器
- scheduler = BackgroundScheduler()
-
- # 添加一个定时任务,每隔5秒钟执行一次 my_job 函数
- scheduler.add_job(my_job, 'interval', seconds=5)
-
- # 开始调度器
- scheduler.start()
-
- # 让程序保持运行,触发定时任务
- try:
- # 这里用一个循环来保持主线程的运行
- while True:
- pass
- except (KeyboardInterrupt, SystemExit):
- # 捕获 Ctrl+C 或者其他退出信号来优雅地关闭调度器
- scheduler.shutdown()
-
这段代码创建了一个后台调度器(BackgroundScheduler),然后使用 add_job() 方法添加了一个定时任务,该任务每隔5秒钟执行一次 my_job() 函数。最后,使用 scheduler.start() 启动调度器,并用一个无限循环来保持主线程的运行,以便触发定时任务。
APScheduler 还支持更多高级功能,比如持久化存储、多种不同类型的触发器、多种调度器选择等等,可以根据实际需求进一步扩展和定制。
持久化存储
APScheduler 支持持久化存储来存储任务和调度程序的状态,以便在应用重启后恢复任务。
- from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
- from apscheduler.executors.pool import ThreadPoolExecutor
- from apscheduler.schedulers.background import BackgroundScheduler
-
- # 使用 SQLAlchemyJobStore 进行持久化存储
- jobstores = {
- 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
- }
-
- # 创建一个线程池执行器
- executors = {
- 'default': ThreadPoolExecutor(10)
- }
-
- # 创建后台调度器,并指定 jobstores 和 executors
- scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
-
- # ... 添加和启动定时任务的代码
-
自定义触发器
使用自定义触发器,你可以创建基于自己需求的定时策略。
- from apscheduler.triggers.base import BaseTrigger
- from datetime import datetime, timedelta
-
- class MyTrigger(BaseTrigger):
- def get_next_fire_time(self, previous_fire_time, now):
- return now + timedelta(seconds=5) # 5秒后执行任务
-
- # 创建一个后台调度器
- scheduler = BackgroundScheduler()
-
- # 使用自定义触发器创建定时任务
- scheduler.add_job(my_job, MyTrigger())
-
- # ... 启动调度器的代码
-
监听器
你可以添加监听器来监控任务的执行、异常和其他事件。
- from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
- from apscheduler.schedulers.background import BackgroundScheduler
-
- def my_listener(event):
- if event.exception:
- print('The job crashed :(')
- else:
- print('The job worked :)')
-
- # 创建一个后台调度器
- scheduler = BackgroundScheduler()
-
- # 添加监听器
- scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
-
- # ... 添加和启动定时任务的代码
-
这些APScheduler的高级功能可以帮助你更好地管理定时任务、监控任务的状态以及根据需求进行灵活的定制化。