APScheduler 是 Python 中一个功能强大且灵活的定时任务调度库,允许你以多种方式(如日期、时间间隔等)调度执行 Python 函数或代码块。它支持多种调度器(Schedulers),包括基于日期、间隔、CRON 表达式等多种方式。
APScheduler 官网地址:https://apscheduler.readthedocs.io/en/3.x/
APScheduler 使用方法
安装 APScheduler:
你可以使用 pip 安装 APScheduler:
pip install apscheduler
APScheduler 代码示例:
以下示例演示了如何使用 APScheduler 创建一个简单的定时任务:
from apscheduler.schedulers.background import BackgroundScheduler
def my_job():
print("Executing my job!")
# 创建一个后台调度器
scheduler = BackgroundScheduler()
# 添加一个定时任务,每隔5秒钟执行一次 my_job 函数
scheduler.add_job(my_job, 'interval', seconds=5)
# 开始调度器
scheduler.start()
# 让程序保持运行,触发定时任务
try:
# 这里用一个循环来保持主线程的运行
while True:
pass
except (KeyboardInterrupt, SystemExit):
# 捕获 Ctrl+C 或者其他退出信号来优雅地关闭调度器
scheduler.shutdown()
这段代码创建了一个后台调度器(BackgroundScheduler),然后使用 add_job() 方法添加了一个定时任务,该任务每隔5秒钟执行一次 my_job() 函数。最后,使用 scheduler.start() 启动调度器,并用一个无限循环来保持主线程的运行,以便触发定时任务。
APScheduler 还支持更多高级功能,比如持久化存储、多种不同类型的触发器、多种调度器选择等等,可以根据实际需求进一步扩展和定制。
持久化存储
APScheduler 支持持久化存储来存储任务和调度程序的状态,以便在应用重启后恢复任务。
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
# 使用 SQLAlchemyJobStore 进行持久化存储
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 创建一个线程池执行器
executors = {
'default': ThreadPoolExecutor(10)
}
# 创建后台调度器,并指定 jobstores 和 executors
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
# ... 添加和启动定时任务的代码
自定义触发器
使用自定义触发器,你可以创建基于自己需求的定时策略。
from apscheduler.triggers.base import BaseTrigger
from datetime import datetime, timedelta
class MyTrigger(BaseTrigger):
def get_next_fire_time(self, previous_fire_time, now):
return now + timedelta(seconds=5) # 5秒后执行任务
# 创建一个后台调度器
scheduler = BackgroundScheduler()
# 使用自定义触发器创建定时任务
scheduler.add_job(my_job, MyTrigger())
# ... 启动调度器的代码
监听器
你可以添加监听器来监控任务的执行、异常和其他事件。
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.schedulers.background import BackgroundScheduler
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
# 创建一个后台调度器
scheduler = BackgroundScheduler()
# 添加监听器
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# ... 添加和启动定时任务的代码
这些APScheduler的高级功能可以帮助你更好地管理定时任务、监控任务的状态以及根据需求进行灵活的定制化。