Python Watchdog库是一款用于监控文件系统事件的强大工具。它可以检测文件和目录的创建、修改、删除等操作,并触发相应的事件处理器。Watchdog广泛应用于自动化脚本、实时数据处理、日志监控等场景。本文将详细介绍Watchdog库的安装、主要功能、基本操作、高级功能及其实践应用,并提供丰富的示例代码。
Watchdog库可以通过pip进行安装。确保Python环境已激活,然后在终端或命令提示符中运行以下命令:
- pip install watchdog
以下示例展示了如何使用Watchdog监控文件和目录:
- import time
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- class MyHandler(FileSystemEventHandler):
- def on_modified(self, event):
- print(f'文件被修改: {event.src_path}')
-
- def on_created(self, event):
- print(f'文件被创建: {event.src_path}')
-
- def on_deleted(self, event):
- print(f'文件被删除: {event.src_path}')
-
- if __name__ == "__main__":
- event_handler = MyHandler()
- observer = Observer()
- observer.schedule(event_handler, path='.', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
在这个示例中,定义了一个自定义事件处理器MyHandler,并在其中实现了文件修改、创建和删除的事件处理函数。然后,创建一个Observer对象,使用schedule方法将事件处理器绑定到当前目录(包括子目录),并启动监控。
用户可以自定义事件处理器,以实现更复杂的事件处理逻辑。
以下示例展示了如何自定义事件处理器:
- from watchdog.events import FileSystemEventHandler
-
- class CustomHandler(FileSystemEventHandler):
- def __init__(self, callback):
- self.callback = callback
-
- def on_modified(self, event):
- self.callback(f'文件被修改: {event.src_path}')
-
- def on_created(self, event):
- self.callback(f'文件被创建: {event.src_path}')
-
- def on_deleted(self, event):
- self.callback(f'文件被删除: {event.src_path}')
在这个示例中,创建了一个CustomHandler类,它接受一个回调函数作为参数,并在文件事件发生时调用这个回调函数。
以下示例展示了如何使用Watchdog监控特定类型的文件:
- import time
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- class MyHandler(FileSystemEventHandler):
- def on_modified(self, event):
- if event.src_path.endswith('.txt'):
- print(f'TXT文件被修改: {event.src_path}')
-
- def on_created(self, event):
- if event.src_path.endswith('.txt'):
- print(f'TXT文件被创建: {event.src_path}')
-
- def on_deleted(self, event):
- if event.src_path.endswith('.txt'):
- print(f'TXT文件被删除: {event.src_path}')
-
- if __name__ == "__main__":
- event_handler = MyHandler()
- observer = Observer()
- observer.schedule(event_handler, path='.', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
在这个示例中,在事件处理器中添加了文件类型判断逻辑,只监控以.txt为后缀的文件。
Watchdog可以递归监控指定目录下的所有子目录。
以下示例展示了如何实现递归监控:
- import time
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- class MyHandler(FileSystemEventHandler):
- def on_modified(self, event):
- print(f'文件被修改: {event.src_path}')
-
- def on_created(self, event):
- print(f'文件被创建: {event.src_path}')
-
- def on_deleted(self, event):
- print(f'文件被删除: {event.src_path}')
-
- if __name__ == "__main__":
- event_handler = MyHandler()
- observer = Observer()
- observer.schedule(event_handler, path='.', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
在这个示例中,将recursive参数设置为True,以实现递归监控。
Watchdog可以同时监控多个目录。
- import time
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- class MyHandler(FileSystemEventHandler):
- def on_modified(self, event):
- print(f'文件被修改: {event.src_path}')
-
- def on_created(self, event):
- print(f'文件被创建: {event.src_path}')
-
- def on_deleted(self, event):
- print(f'文件被删除: {event.src_path}')
-
- if __name__ == "__main__":
- event_handler = MyHandler()
- observer = Observer()
- observer.schedule(event_handler, path='path1', recursive=True)
- observer.schedule(event_handler, path='path2', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
在这个示例中,使用schedule方法多次调用,以实现对多个目录的监控。
Watchdog可以与Python的logging模块结合使用,以实现日志记录。
以下示例展示了如何记录文件系统事件日志:
- import time
- import logging
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- logging.basicConfig(filename='file_system.log', level=logging.INFO)
-
- class MyHandler(FileSystemEventHandler):
- def on_modified(self, event):
- logging.info(f'文件被修改: {event.src_path}')
-
- def on_created(self, event):
- logging.info(f'文件被创建: {event.src_path}')
-
- def on_deleted(self, event):
- logging.info(f'文件被删除: {event.src_path}')
-
- if __name__ == "__main__":
- event_handler = MyHandler()
- observer = Observer()
- observer.schedule(event_handler, path='.', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
在这个示例中,配置了logging模块,将文件系统事件记录到file_system.log文件中。
以下示例展示了如何使用Watchdog实现自动化备份,当目录中的文件发生变化时,自动将其备份到指定目录:
- import time
- import shutil
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- class BackupHandler(FileSystemEventHandler):
- def on_modified(self, event):
- if not event.is_directory:
- shutil.copy(event.src_path, 'backup/')
-
- def on_created(self, event):
- if not event.is_directory:
- shutil.copy(event.src_path, 'backup/')
-
- if __name__ == "__main__":
- event_handler = BackupHandler()
- observer = Observer()
- observer.schedule(event_handler, path='to_backup', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
以下示例展示了如何使用Watchdog实现实时日志监控,当日志文件发生变化时,自动输出最新内容:
- import time
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- class LogHandler(FileSystemEventHandler):
- def on_modified(self, event):
- if event.src_path.endswith('.log'):
- with open(event.src_path, 'r') as f:
- print(f.read())
-
- if __name__ == "__main__":
- event_handler = LogHandler()
- observer = Observer()
- observer.schedule(event_handler, path='logs', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
以下示例展示了如何使用Watchdog实现文件同步,当源目录中的文件发生变化时,自动同步到目标目录:
- import time
- import shutil
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
-
- class SyncHandler(FileSystemEventHandler):
- def on_modified(self, event):
- if not event.is_directory:
- shutil.copy(event.src_path, 'destination/')
-
- def on_created(self, event):
- if not event.is_directory:
- shutil.copy(event.src_path, 'destination/')
-
- def on_deleted(self, event):
- if not event.is_directory:
- dest_path = event.src_path.replace('source', 'destination')
- if os.path.exists(dest_path):
- os.remove(dest_path)
-
- if __name__ == "__main__":
- event_handler = SyncHandler()
- observer = Observer()
- observer.schedule(event_handler, path='source', recursive=True)
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
Python Watchdog库是一个功能强大且易于使用的文件系统监控工具。本文详细介绍了Watchdog的安装、主要功能、基本操作、高级功能及其实践应用,并提供了丰富的示例代码。通过使用Watchdog,用户可以轻松实现文件和目录的实时监控,从而更高效地进行自动化脚本、实时数据处理和日志监控等任务。