2025年3月25日 星期二 甲辰(龙)年 月廿四 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

Python Watchdog库:文件系统监控

时间:08-03来源:作者:点击数:42
城东书院 www.cdsy.xyz

Python Watchdog库是一款用于监控文件系统事件的强大工具。它可以检测文件和目录的创建、修改、删除等操作,并触发相应的事件处理器。Watchdog广泛应用于自动化脚本、实时数据处理、日志监控等场景。本文将详细介绍Watchdog库的安装、主要功能、基本操作、高级功能及其实践应用,并提供丰富的示例代码。

安装

Watchdog库可以通过pip进行安装。确保Python环境已激活,然后在终端或命令提示符中运行以下命令:

  • pip install watchdog

主要功能

  1. 文件和目录监控:监控文件和目录的创建、修改、删除、移动等操作。
  2. 事件处理器:提供了多种事件处理器,用户可以自定义事件处理逻辑。
  3. 跨平台支持:支持Linux、Windows、MacOS等多种操作系统。
  4. 高效可靠:采用高效的文件系统监控机制,性能优越,可靠性高。

基本操作

监控文件和目录

以下示例展示了如何使用Watchdog监控文件和目录:

  • import time
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • class MyHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         print(f'文件被修改: {event.src_path}')
  •     def on_created(self, event):
  •         print(f'文件被创建: {event.src_path}')
  •     def on_deleted(self, event):
  •         print(f'文件被删除: {event.src_path}')
  • if __name__ == "__main__":
  •     event_handler = MyHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='.', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()

在这个示例中,定义了一个自定义事件处理器MyHandler,并在其中实现了文件修改、创建和删除的事件处理函数。然后,创建一个Observer对象,使用schedule方法将事件处理器绑定到当前目录(包括子目录),并启动监控。

自定义事件处理器

用户可以自定义事件处理器,以实现更复杂的事件处理逻辑。

以下示例展示了如何自定义事件处理器:

  • from watchdog.events import FileSystemEventHandler
  • class CustomHandler(FileSystemEventHandler):
  •     def __init__(self, callback):
  •         self.callback = callback
  •     def on_modified(self, event):
  •         self.callback(f'文件被修改: {event.src_path}')
  •     def on_created(self, event):
  •         self.callback(f'文件被创建: {event.src_path}')
  •     def on_deleted(self, event):
  •         self.callback(f'文件被删除: {event.src_path}')

在这个示例中,创建了一个CustomHandler类,它接受一个回调函数作为参数,并在文件事件发生时调用这个回调函数。

监控特定文件类型

以下示例展示了如何使用Watchdog监控特定类型的文件:

  • import time
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • class MyHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         if event.src_path.endswith('.txt'):
  •             print(f'TXT文件被修改: {event.src_path}')
  •     def on_created(self, event):
  •         if event.src_path.endswith('.txt'):
  •             print(f'TXT文件被创建: {event.src_path}')
  •     def on_deleted(self, event):
  •         if event.src_path.endswith('.txt'):
  •             print(f'TXT文件被删除: {event.src_path}')
  • if __name__ == "__main__":
  •     event_handler = MyHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='.', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()

在这个示例中,在事件处理器中添加了文件类型判断逻辑,只监控以.txt为后缀的文件。

高级功能

监控子目录

Watchdog可以递归监控指定目录下的所有子目录。

以下示例展示了如何实现递归监控:

  • import time
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • class MyHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         print(f'文件被修改: {event.src_path}')
  •     def on_created(self, event):
  •         print(f'文件被创建: {event.src_path}')
  •     def on_deleted(self, event):
  •         print(f'文件被删除: {event.src_path}')
  • if __name__ == "__main__":
  •     event_handler = MyHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='.', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()

在这个示例中,将recursive参数设置为True,以实现递归监控。

多路径监控

Watchdog可以同时监控多个目录。

  • import time
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • class MyHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         print(f'文件被修改: {event.src_path}')
  •     def on_created(self, event):
  •         print(f'文件被创建: {event.src_path}')
  •     def on_deleted(self, event):
  •         print(f'文件被删除: {event.src_path}')
  • if __name__ == "__main__":
  •     event_handler = MyHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='path1', recursive=True)
  •     observer.schedule(event_handler, path='path2', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()

在这个示例中,使用schedule方法多次调用,以实现对多个目录的监控。

日志记录

Watchdog可以与Python的logging模块结合使用,以实现日志记录。

以下示例展示了如何记录文件系统事件日志:

  • import time
  • import logging
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • logging.basicConfig(filename='file_system.log', level=logging.INFO)
  • class MyHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         logging.info(f'文件被修改: {event.src_path}')
  •     def on_created(self, event):
  •         logging.info(f'文件被创建: {event.src_path}')
  •     def on_deleted(self, event):
  •         logging.info(f'文件被删除: {event.src_path}')
  • if __name__ == "__main__":
  •     event_handler = MyHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='.', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()

在这个示例中,配置了logging模块,将文件系统事件记录到file_system.log文件中。

实践应用

自动化备份

以下示例展示了如何使用Watchdog实现自动化备份,当目录中的文件发生变化时,自动将其备份到指定目录:

  • import time
  • import shutil
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • class BackupHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         if not event.is_directory:
  •             shutil.copy(event.src_path, 'backup/')
  •     def on_created(self, event):
  •         if not event.is_directory:
  •             shutil.copy(event.src_path, 'backup/')
  • if __name__ == "__main__":
  •     event_handler = BackupHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='to_backup', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()
实时日志监控

以下示例展示了如何使用Watchdog实现实时日志监控,当日志文件发生变化时,自动输出最新内容:

  • import time
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • class LogHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         if event.src_path.endswith('.log'):
  •             with open(event.src_path, 'r'as f:
  •                 print(f.read())
  • if __name__ == "__main__":
  •     event_handler = LogHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='logs', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()
文件同步

以下示例展示了如何使用Watchdog实现文件同步,当源目录中的文件发生变化时,自动同步到目标目录:

  • import time
  • import shutil
  • from watchdog.observers import Observer
  • from watchdog.events import FileSystemEventHandler
  • class SyncHandler(FileSystemEventHandler):
  •     def on_modified(self, event):
  •         if not event.is_directory:
  •             shutil.copy(event.src_path, 'destination/')
  •     def on_created(self, event):
  •         if not event.is_directory:
  •             shutil.copy(event.src_path, 'destination/')
  •     def on_deleted(self, event):
  •         if not event.is_directory:
  •             dest_path = event.src_path.replace('source''destination')
  •             if os.path.exists(dest_path):
  •                 os.remove(dest_path)
  • if __name__ == "__main__":
  •     event_handler = SyncHandler()
  •     observer = Observer()
  •     observer.schedule(event_handler, path='source', recursive=True)
  •     observer.start()
  •     try:
  •         while True:
  •             time.sleep(1)
  •     except KeyboardInterrupt:
  •         observer.stop()
  •     observer.join()

总结

Python Watchdog库是一个功能强大且易于使用的文件系统监控工具。本文详细介绍了Watchdog的安装、主要功能、基本操作、高级功能及其实践应用,并提供了丰富的示例代码。通过使用Watchdog,用户可以轻松实现文件和目录的实时监控,从而更高效地进行自动化脚本、实时数据处理和日志监控等任务。

城东书院 www.cdsy.xyz
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐