您当前的位置:首页 > 计算机 > 编程开发 > Python

Python Watchdog库:文件系统监控

时间:08-03来源:作者:点击数:
城东书院 www.cdsy.xyz

Python Watchdog库是一款用于监控文件系统事件的强大工具。它可以检测文件和目录的创建、修改、删除等操作,并触发相应的事件处理器。Watchdog广泛应用于自动化脚本、实时数据处理、日志监控等场景。本文将详细介绍Watchdog库的安装、主要功能、基本操作、高级功能及其实践应用,并提供丰富的示例代码。

安装

Watchdog库可以通过pip进行安装。确保Python环境已激活,然后在终端或命令提示符中运行以下命令:

pip install watchdog

主要功能

  1. 文件和目录监控:监控文件和目录的创建、修改、删除、移动等操作。
  2. 事件处理器:提供了多种事件处理器,用户可以自定义事件处理逻辑。
  3. 跨平台支持:支持Linux、Windows、MacOS等多种操作系统。
  4. 高效可靠:采用高效的文件系统监控机制,性能优越,可靠性高。

基本操作

监控文件和目录

以下示例展示了如何使用Watchdog监控文件和目录:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        print(f'文件被修改: {event.src_path}')

    def on_created(self, event):
        print(f'文件被创建: {event.src_path}')

    def on_deleted(self, event):
        print(f'文件被删除: {event.src_path}')

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path='.', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

在这个示例中,定义了一个自定义事件处理器MyHandler,并在其中实现了文件修改、创建和删除的事件处理函数。然后,创建一个Observer对象,使用schedule方法将事件处理器绑定到当前目录(包括子目录),并启动监控。

自定义事件处理器

用户可以自定义事件处理器,以实现更复杂的事件处理逻辑。

以下示例展示了如何自定义事件处理器:

from watchdog.events import FileSystemEventHandler

class CustomHandler(FileSystemEventHandler):
    def __init__(self, callback):
        self.callback = callback

    def on_modified(self, event):
        self.callback(f'文件被修改: {event.src_path}')

    def on_created(self, event):
        self.callback(f'文件被创建: {event.src_path}')

    def on_deleted(self, event):
        self.callback(f'文件被删除: {event.src_path}')

在这个示例中,创建了一个CustomHandler类,它接受一个回调函数作为参数,并在文件事件发生时调用这个回调函数。

监控特定文件类型

以下示例展示了如何使用Watchdog监控特定类型的文件:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.src_path.endswith('.txt'):
            print(f'TXT文件被修改: {event.src_path}')

    def on_created(self, event):
        if event.src_path.endswith('.txt'):
            print(f'TXT文件被创建: {event.src_path}')

    def on_deleted(self, event):
        if event.src_path.endswith('.txt'):
            print(f'TXT文件被删除: {event.src_path}')

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path='.', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

在这个示例中,在事件处理器中添加了文件类型判断逻辑,只监控以.txt为后缀的文件。

高级功能

监控子目录

Watchdog可以递归监控指定目录下的所有子目录。

以下示例展示了如何实现递归监控:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        print(f'文件被修改: {event.src_path}')

    def on_created(self, event):
        print(f'文件被创建: {event.src_path}')

    def on_deleted(self, event):
        print(f'文件被删除: {event.src_path}')

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path='.', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

在这个示例中,将recursive参数设置为True,以实现递归监控。

多路径监控

Watchdog可以同时监控多个目录。

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        print(f'文件被修改: {event.src_path}')

    def on_created(self, event):
        print(f'文件被创建: {event.src_path}')

    def on_deleted(self, event):
        print(f'文件被删除: {event.src_path}')

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path='path1', recursive=True)
    observer.schedule(event_handler, path='path2', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

在这个示例中,使用schedule方法多次调用,以实现对多个目录的监控。

日志记录

Watchdog可以与Python的logging模块结合使用,以实现日志记录。

以下示例展示了如何记录文件系统事件日志:

import time
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

logging.basicConfig(filename='file_system.log', level=logging.INFO)

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        logging.info(f'文件被修改: {event.src_path}')

    def on_created(self, event):
        logging.info(f'文件被创建: {event.src_path}')

    def on_deleted(self, event):
        logging.info(f'文件被删除: {event.src_path}')

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path='.', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

在这个示例中,配置了logging模块,将文件系统事件记录到file_system.log文件中。

实践应用

自动化备份

以下示例展示了如何使用Watchdog实现自动化备份,当目录中的文件发生变化时,自动将其备份到指定目录:

import time
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class BackupHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            shutil.copy(event.src_path, 'backup/')

    def on_created(self, event):
        if not event.is_directory:
            shutil.copy(event.src_path, 'backup/')

if __name__ == "__main__":
    event_handler = BackupHandler()
    observer = Observer()
    observer.schedule(event_handler, path='to_backup', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
实时日志监控

以下示例展示了如何使用Watchdog实现实时日志监控,当日志文件发生变化时,自动输出最新内容:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class LogHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.src_path.endswith('.log'):
            with open(event.src_path, 'r') as f:
                print(f.read())

if __name__ == "__main__":
    event_handler = LogHandler()
    observer = Observer()
    observer.schedule(event_handler, path='logs', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
文件同步

以下示例展示了如何使用Watchdog实现文件同步,当源目录中的文件发生变化时,自动同步到目标目录:

import time
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class SyncHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            shutil.copy(event.src_path, 'destination/')

    def on_created(self, event):
        if not event.is_directory:
            shutil.copy(event.src_path, 'destination/')

    def on_deleted(self, event):
        if not event.is_directory:
            dest_path = event.src_path.replace('source', 'destination')
            if os.path.exists(dest_path):
                os.remove(dest_path)

if __name__ == "__main__":
    event_handler = SyncHandler()
    observer = Observer()
    observer.schedule(event_handler, path='source', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

总结

Python Watchdog库是一个功能强大且易于使用的文件系统监控工具。本文详细介绍了Watchdog的安装、主要功能、基本操作、高级功能及其实践应用,并提供了丰富的示例代码。通过使用Watchdog,用户可以轻松实现文件和目录的实时监控,从而更高效地进行自动化脚本、实时数据处理和日志监控等任务。

城东书院 www.cdsy.xyz
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐