2025年4月3日 星期四 乙巳(蛇)年 正月初四 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 系统应用 > Linux

python批量拷贝文件

时间:09-15来源:作者:点击数:46

普通批量拷贝文件

  • import os
  • import shutil
  • import logging
  • from logging import handlers
  • from colorama import Fore, Style, init
  • import sys
  • BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  • sys.path.append(BASE_DIR) # 加入环境变量
  • from utils.time_utils import run_time
  • from conf import settings
  • class Colorlog(object):
  • """
  • 记录日志,添加颜色
  • """
  • init(autoreset=True) # 初始化,并且设置颜色设置自动恢复
  • # 根据信息不同设置不同的颜色格式
  • info_color = Fore.GREEN + Style.BRIGHT
  • warn_color = Fore.YELLOW + Style.BRIGHT
  • debug_color = Fore.MAGENTA + Style.BRIGHT
  • error_color = Fore.RED + Style.BRIGHT
  • def __init__(self, name):
  • # 日志格式
  • log_format = '[%(asctime)s - %(levelname)s - %(name)s ] %(message)s '
  • self.logger = logging.getLogger(name)
  • self.logger.setLevel(settings.LOG_LEVEL)
  • console_handler = logging.StreamHandler()
  • # 文件绝对路径
  • logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE)
  • if not os.path.exists(logfile_path):
  • # 创建log目录
  • os.mkdir(os.path.join(settings.LOG_DIR, "log"))
  • # 每天创建一个日志文件,文件数不超过20个
  • file_handler = handlers.TimedRotatingFileHandler(
  • logfile_path, when="D", interval=1, backupCount=20)
  • self.logger.addHandler(console_handler)
  • self.logger.addHandler(file_handler)
  • file_format = logging.Formatter(fmt=log_format)
  • console_format = logging.Formatter(
  • fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')
  • console_handler.setFormatter(console_format)
  • file_handler.setFormatter(file_format)
  • def warn(self, message):
  • self.logger.warning(Colorlog.warn_color + message)
  • def info(self, message):
  • self.logger.info(Colorlog.info_color + message)
  • def error(self, message):
  • self.logger.error(Colorlog.info_color + message)
  • def debug(self, message):
  • self.logger.debug(Colorlog.info_color + message)
  • cp_log = Colorlog("cp")
  • def copy_file(local_file_path, dst_file_path):
  • size = bytes2human(os.path.getsize(local_file_path))
  • # cp_log.debug(
  • # 'copy file {} to {}, file size {}'.format(
  • # local_file_path, dst_file_path, size))
  • shutil.copy(local_file_path, dst_file_path) # copy file
  • @run_time
  • def upload_file(src_path, dst_path):
  • """
  • 上传文件
  • :param src_path:
  • :param dst_path:
  • :return:
  • """
  • cp_log.info('upload_file %s %s' % (src_path, dst_path))
  • # 目标目录是否存在,不存在则创建
  • if not os.path.exists(dst_path):
  • os.makedirs(dst_path)
  • cp_log.info('Create Dest Dir %s' % dst_path)
  • # 判断是否为目录,存在则把文件拷贝到目标目录下
  • if os.path.isdir(src_path):
  • all_file_nums = 0
  • for root, dirs, files in os.walk(src_path):
  • # 遍历目录下所有文件根,目录下的每一个文件夹(包含它自己),
  • # 产生3-元组 (dirpath, dirnames, filenames)【文件夹路径, 文件夹名字, 文件名称】
  • for f in files:
  • local_file_path = os.path.join(root, f) # 本地文件路径 如/src/q.txt
  • dst_file_path = os.path.abspath(
  • local_file_path.replace(
  • src_path, dst_path)) # 目标文件路径 如/dst/q.txt
  • dst_dir = os.path.dirname(dst_file_path) # 目标文件路径文件夹 如/dst/
  • if not os.path.isdir(dst_dir):
  • os.makedirs(dst_dir) # 创建目录
  • cp_log.debug('Create Dest Dir %s' % dst_path)
  • copy_file(local_file_path, dst_file_path) # 拷贝文件
  • cp_log.info('copy file {} complete '.format(local_file_path))
  • all_file_nums += 1
  • cp_log.info(
  • 'copy all files complete , files count = {}'.format(all_file_nums))
  • else:
  • cp_log.warn('Dir is not exists %s' % dst_path)
  • def bytes2human(n):
  • symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
  • prefix = {}
  • for i, s in enumerate(symbols):
  • # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4,
  • # 10位就表示1024 即1 << 10=1024 就是2的n次方
  • prefix[s] = 1 << (i + 1) * 10
  • for s in reversed(symbols):
  • if n >= prefix[s]:
  • value = float(n) / prefix[s]
  • return '%.1f%s' % (value, s)
  • return "%sBytes" % n
  • if __name__ == '__main__':
  • src = 'D://test1'
  • dst = 'D://copytest2'
  • upload_file(src, dst)

输出结果 

  • [2018-06-29 15:14:04 - INFO - cp ] upload_file D://test1 D://copytest2
  • [2018-06-29 15:14:04 - INFO - cp ] Create Dest Dir D://copytest2
  • [2018-06-29 15:14:04 - DEBUG - cp ] Create Dest Dir D://copytest2
  • [2018-06-29 15:14:04 - INFO - cp ] copy file D://test1\20180601\20180601_test.txt complete
  • [2018-06-29 15:14:04 - DEBUG - cp ] Create Dest Dir D://copytest2
  • [2018-06-29 15:14:19 - INFO - cp ] copy file D://test1\20180601\wmv\01文件操作和异常.wmv.pbb complete
  • [2018-06-29 15:14:19 - DEBUG - cp ] Create Dest Dir D://copytest2
  • [2018-06-29 15:14:19 - INFO - cp ] copy file D://test1\20180602\20180602_test.txt complete
  • ……
  • [2018-06-29 15:16:20 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\nobatch complete
  • [2018-06-29 15:16:20 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\segdemo complete
  • [2018-06-29 15:16:20 - INFO - cp ] copy all files complete , files count = 164
  • [2018-06-29 15:16:20 - DEBUG - runtime - time_utils.py - decor- 59 ] func {upload_file} run { 135.2727}s

使用多线程批量拷贝文件

  • #!/usr/bin/python
  • # -*- coding: utf-8 -*-
  • # @Time : 2018/6/29 10:28
  • # @Author : hyang
  • # @File : batch_copy.py
  • # @Software: PyCharm
  • import os
  • import shutil
  • import logging
  • from logging import handlers
  • from colorama import Fore, Style, init
  • from multiprocessing.dummy import Pool as ThreadPool
  • import queue
  • import sys
  • BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  • sys.path.append(BASE_DIR) # 加入环境变量
  • from utils.time_utils import run_time
  • from conf import settings
  • class Colorlog(object):
  • """
  • 记录日志,添加颜色
  • """
  • init(autoreset=True) # 初始化,并且设置颜色设置自动恢复
  • # 根据信息不同设置不同的颜色格式
  • info_color = Fore.GREEN + Style.BRIGHT
  • warn_color = Fore.YELLOW + Style.BRIGHT
  • debug_color = Fore.MAGENTA + Style.BRIGHT
  • error_color = Fore.RED + Style.BRIGHT
  • def __init__(self, name):
  • # 日志格式
  • log_format = '[%(asctime)s - %(levelname)s - %(name)s ] %(message)s '
  • self.logger = logging.getLogger(name)
  • self.logger.setLevel(settings.LOG_LEVEL)
  • console_handler = logging.StreamHandler()
  • # 文件绝对路径
  • logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE)
  • if not os.path.exists(logfile_path):
  • # 创建log目录
  • os.mkdir(os.path.join(settings.LOG_DIR, "log"))
  • # 每天创建一个日志文件,文件数不超过20个
  • file_handler = handlers.TimedRotatingFileHandler(
  • logfile_path, when="D", interval=1, backupCount=20)
  • self.logger.addHandler(console_handler)
  • self.logger.addHandler(file_handler)
  • file_format = logging.Formatter(fmt=log_format)
  • console_format = logging.Formatter(
  • fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')
  • console_handler.setFormatter(console_format)
  • file_handler.setFormatter(file_format)
  • def warn(self, message):
  • self.logger.warning(Colorlog.warn_color + message)
  • def info(self, message):
  • self.logger.info(Colorlog.info_color + message)
  • def error(self, message):
  • self.logger.error(Colorlog.info_color + message)
  • def debug(self, message):
  • self.logger.debug(Colorlog.info_color + message)
  • cp_log = Colorlog("cp")
  • def copy_file(local_file_path, dst_file_path, q):
  • size = bytes2human(os.path.getsize(local_file_path))
  • # cp_log.debug(
  • # 'copy file {} to {}, file size {}'.format(
  • # local_file_path, dst_file_path, size))
  • shutil.copy(local_file_path, dst_file_path) # copy file
  • q.put(local_file_path) # 加入队列
  • @run_time
  • def upload_file(src_path, dst_path):
  • """
  • 上传文件
  • :param src_path:
  • :param dst_path:
  • :return:
  • """
  • pool = ThreadPool(3) # 开启3个线程
  • q = queue.Queue() # 开启一个队列
  • cp_log.info('upload_file %s %s' % (src_path, dst_path))
  • # 目标目录是否存在,不存在则创建
  • if not os.path.exists(dst_path):
  • os.makedirs(dst_path)
  • cp_log.info('Create Dest Dir %s' % dst_path)
  • # 判断是否为目录,存在则把文件拷贝到目标目录下
  • if os.path.isdir(src_path):
  • all_file_nums = 0
  • for root, dirs, files in os.walk(src_path):
  • # 遍历目录下所有文件根,目录下的每一个文件夹(包含它自己),
  • # 产生3-元组 (dirpath, dirnames, filenames)【文件夹路径, 文件夹名字, 文件名称】
  • for f in files:
  • all_file_nums += 1
  • local_file_path = os.path.join(root, f) # 本地文件路径 如/src/q.txt
  • dst_file_path = os.path.abspath(
  • local_file_path.replace(
  • src_path, dst_path)) # 目标文件路径 如/dst/q.txt
  • dst_dir = os.path.dirname(dst_file_path) # 目标文件路径文件夹 如/dst/
  • if not os.path.isdir(dst_dir):
  • os.makedirs(dst_dir) # 创建目录
  • cp_log.debug('Create Dest Dir %s' % dst_path)
  • pool.apply_async(
  • func=copy_file, args=(
  • local_file_path, dst_file_path, q))
  • pool.close() # close()执行后不会有新的进程加入到pool
  • # pool.join() # join函数等待所有子进程结束
  • print('all_file_nums ', all_file_nums)
  • num = 0
  • while True:
  • if not q.empty():
  • item = q.get()
  • cp_log.info('copy file {} complete '.format(item))
  • num += 1
  • copy_rate = float(num / all_file_nums) * 100
  • cp_log.warn("\r 进度为:%.2f%%" % copy_rate)
  • if int(copy_rate) >= 100:
  • break
  • cp_log.info(
  • 'copy all files complete , files count = {}'.format(all_file_nums))
  • else:
  • cp_log.warn('Dir is not exists %s' % dst_path)
  • def bytes2human(n):
  • symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
  • prefix = {}
  • for i, s in enumerate(symbols):
  • # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4,
  • # 10位就表示1024 即1 << 10=1024 就是2的n次方
  • prefix[s] = 1 << (i + 1) * 10
  • for s in reversed(symbols):
  • if n >= prefix[s]:
  • value = float(n) / prefix[s]
  • return '%.1f%s' % (value, s)
  • return "%sBytes" % n
  • if __name__ == '__main__':
  • src = 'D://test1'
  • dst = 'D://copy_thread_test2'
  • upload_file(src, dst)

  输出结果 

  • [2018-06-29 15:26:13 - INFO - cp ] copy file D://test1\20180601\20180601_test.txt complete
  • 进度为:0.61%
  • [2018-06-29 15:26:13 - INFO - cp ] copy file D://test1\20180602\20180602_test.txt complete
  • 进度为:1.22%
  • [2018-06-29 15:26:13 - INFO - cp ] copy file D://test1\20180602\教程目录及说明.txt complete
  • 进度为:1.83%
  • all_file_nums 164
  • [2018-06-29 15:26:15 - INFO - cp ] copy file D://test1\20180602\MongoDB权威指南(中文版).pdf complete
  • 进度为:2.44%
  • [2018-06-29 15:26:15 - INFO - cp ] copy file D://test1\ibooks\AIX_HACMP_40pages.pdf complete
  • 进度为:3.05%
  • ……
  • [2018-06-29 15:29:02 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\nobatch complete
  • 进度为:99.39%
  • [2018-06-29 15:29:02 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\segdemo complete
  • 进度为:100.00%
  • [2018-06-29 15:29:02 - INFO - cp ] copy all files complete , files count = 164
  • [2018-06-29 15:29:02 - DEBUG - runtime - time_utils.py - decor- 59 ] func {upload_file} run { 168.7767}s

使用协程批量拷贝文件

  • #!/usr/bin/env python3
  • # -*- coding: utf-8 -*-
  • from gevent import monkey;monkey.patch_all()
  • import os
  • import shutil
  • import logging
  • import time
  • from functools import wraps
  • from logging import handlers
  • from colorama import Fore, Style, init
  • from multiprocessing.pool import ThreadPool
  • import queue
  • import gevent
  • import sys
  • BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  • sys.path.append(BASE_DIR) # 加入环境变量
  • class Colorlog(object):
  • """
  • 记录日志,添加颜色
  • """
  • init(autoreset=True) # 初始化,并且设置颜色设置自动恢复
  • # 根据信息不同设置不同的颜色格式
  • info_color = Fore.GREEN + Style.BRIGHT
  • warn_color = Fore.YELLOW + Style.BRIGHT
  • debug_color = Fore.MAGENTA + Style.BRIGHT
  • error_color = Fore.RED + Style.BRIGHT
  • def __init__(self, name):
  • # 日志格式
  • log_format = '[%(asctime)s - %(levelname)s - %(name)s ] %(message)s '
  • self.logger = logging.getLogger(name)
  • self.logger.setLevel(logging.DEBUG)
  • console_handler = logging.StreamHandler()
  • # 文件绝对路径
  • logfile_path = 'test.log'
  • # 每天创建一个日志文件,文件数不超过20个
  • file_handler = handlers.TimedRotatingFileHandler(
  • logfile_path, when="D", interval=1, backupCount=20)
  • self.logger.addHandler(console_handler)
  • self.logger.addHandler(file_handler)
  • file_format = logging.Formatter(fmt=log_format)
  • console_format = logging.Formatter(
  • fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')
  • console_handler.setFormatter(console_format)
  • file_handler.setFormatter(file_format)
  • def warn(self, message):
  • self.logger.warning(Colorlog.warn_color + message)
  • def info(self, message):
  • self.logger.info(Colorlog.info_color + message)
  • def error(self, message):
  • self.logger.error(Colorlog.info_color + message)
  • def debug(self, message):
  • self.logger.debug(Colorlog.info_color + message)
  • cp_log = Colorlog("cp")
  • def run_time(func):
  • """
  • 计算程序运行时间的装饰器
  • :param func:
  • :return:
  • """
  • @wraps(func)
  • def decor(*args, **kwargs):
  • start = time.time()
  • res = func(*args, **kwargs)
  • end = time.time()
  • print("func {%s} run {%10.4f}s " % (func.__name__, (end - start)))
  • return res
  • return decor
  • def copy_file(local_file_path, dst_file_path):
  • # size = bytes2human(os.path.getsize(local_file_path))
  • # cp_log.debug(
  • # 'copy file {} to {}, file size {}'.format(
  • # local_file_path, dst_file_path, size))
  • shutil.copy(local_file_path, dst_file_path) # copy file
  • cp_log.info(
  • 'copy file {} , size= {} complete '.format(
  • local_file_path, bytes2human(
  • os.path.getsize(dst_file_path))))
  • def getdirsize(dir):
  • """
  • 获得文件夹中所有文件大小
  • :param dir:
  • :return:
  • """
  • size = 0
  • for root, dirs, files in os.walk(dir):
  • size += sum([os.path.getsize(os.path.join(root, name))
  • for name in files])
  • return bytes2human(size)
  • @run_time
  • def upload_file(src_path, dst_path):
  • """
  • 上传文件
  • :param src_path:
  • :param dst_path:
  • :return:
  • """
  • cp_log.info('upload_file %s %s' % (src_path, dst_path))
  • # 目标目录是否存在,不存在则创建
  • if not os.path.exists(dst_path):
  • os.makedirs(dst_path)
  • cp_log.info('Create Dest Dir %s' % dst_path)
  • tasklist = [] # 任务列表
  • # 判断是否为目录,存在则把文件拷贝到目标目录下
  • if os.path.isdir(src_path):
  • all_file_nums = 0
  • all_file_size = getdirsize(src_path)
  • cp_log.info('all_file_size = %s' % all_file_size)
  • for root, dirs, files in os.walk(src_path):
  • # 遍历目录下所有文件根,目录下的每一个文件夹(包含它自己),
  • # 产生3-元组 (dirpath, dirnames, filenames)【文件夹路径, 文件夹名字, 文件名称】
  • for f in files:
  • all_file_nums += 1
  • local_file_path = os.path.join(root, f) # 本地文件路径 如/src/q.txt
  • dst_file_path = os.path.abspath(
  • local_file_path.replace(
  • src_path, dst_path)) # 目标文件路径 如/dst/q.txt
  • dst_dir = os.path.dirname(dst_file_path) # 目标文件路径文件夹 如/dst/
  • if not os.path.isdir(dst_dir):
  • os.makedirs(dst_dir) # 创建目录
  • cp_log.debug('Create Dest Dir %s' % dst_dir)
  • tasklist.append(
  • gevent.spawn(
  • copy_file,
  • local_file_path,
  • dst_file_path)) # 开启协程
  • gevent.joinall(tasklist) # 阻塞等待所有操作都执行完毕
  • print('all_file_nums ', all_file_nums)
  • cp_log.info(
  • 'copy all files complete , files count = {} , size = {}'.format(all_file_nums, getdirsize(dst_path)))
  • else:
  • cp_log.warn('Dir is not exists %s' % dst_path)
  • def bytes2human(n):
  • symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
  • prefix = {}
  • for i, s in enumerate(symbols):
  • # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4,
  • # 10位就表示1024 即1 << 10=1024 就是2的n次方
  • prefix[s] = 1 << (i + 1) * 10
  • for s in reversed(symbols):
  • if n >= prefix[s]:
  • value = float(n) / prefix[s]
  • return '%.1f%s' % (value, s)
  • return "%sB" % n
  • if __name__ == '__main__':
  • src = 'C://pythonStudy/python爬虫参考资料'
  • dst = 'C://pythonStudy/copy_thread_test2'
  • upload_file(src, dst)

 输出结果 

  • "C:\Program Files\Python36\python.exe" batch_copy.py
  • [2018-06-29 22:50:22 - INFO - cp ] upload_file C://pythonStudy/python爬虫参考资料 C://pythonStudy/copy_thread_test2
  • [2018-06-29 22:50:22 - INFO - cp ] Create Dest Dir C://pythonStudy/copy_thread_test2
  • [2018-06-29 22:50:22 - INFO - cp ] all_file_size = 620.6M
  • [2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master
  • [2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master\chapter1
  • [2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master\chapter10
  • [2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir
  • ……
  • [2018-06-29 22:50:23 - INFO - cp ] copy file C://pythonStudy/python爬虫参考资料\python-scraping-master\chapter12\2-seleniumCookies.py , size= 528B complete
  • [2018-06-29 22:50:23 - INFO - cp ] copy file C://pythonStudy/python爬虫参考资料\python-scraping-master\chapter12\3-honeypotDetection.py , size= 539B complete
  • [2018-06-29 22:50:23 - INFO - cp ] copy file
  • [2018-06-29 22:50:24 - INFO - cp ] copy file C://pythonStudy/python爬虫参考资料\python-scraping-master\chapter9\5-BasicAuth.py , size= 229B complete
  • all_file_nums 130
  • [2018-06-29 22:50:24 - INFO - cp ] copy file C://pythonStudy/python爬虫参考资料\python-scraping-master\files\test.csv , size= 114B complete
  • func {upload_file} run { 1.2971}s
  • [2018-06-29 22:50:24 - INFO - cp ] copy all files complete , files count = 130 , size = 620.6M
  • Process finished with exit code 0
工具文件

time_utils.py

  • def run_time(func):
  • """
  • 计算程序运行时间的装饰器
  • :param func:
  • :return:
  • """
  • @wraps(func)
  • def decor(*args,**kwargs):
  • start = time.time()
  • res = func(*args,**kwargs)
  • end = time.time()
  • log.debug("func {%s} run {%10.4f}s " % (func.__name__,(end - start)))
  • return res
  • return decor
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门