2025年4月12日 星期六 乙巳(蛇)年 正月十三 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

Python FTP文件传输

时间:09-14来源:作者:点击数:41

FTP Server

  • import socket
  • import struct
  • from concurrent.futures import ThreadPoolExecutor
  • import json
  • import hashlib
  • import os
  • import time
  • from demo import common_utils
  • PUT_FILE_DIR = r'C:\x\LuffyFTP\sharefile\server\put'
  • GET_FILE_DIR = r'C:\x\LuffyFTP\sharefile\server\get'
  • IP_PORT = ('127.0.0.1', 9999)
  • def run_forever():
  • """
  • 启动socket
  • :return:
  • """
  • server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • server_socket.bind(IP_PORT)
  • server_socket.listen(5)
  • print('Server Start,IP:%s, LISTENING PORT: %s.' %
  • IP_PORT)
  • pool = ThreadPoolExecutor(10)
  • while True:
  • conn, client_addr = server_socket.accept()
  • print('创建一个新的线程,和客户端{}通信'.format(client_addr))
  • pool.submit(take_over_connection, conn, client_addr)
  • def take_over_connection(conn, client_addr):
  • """
  • 用来接管socket链接,每个线程接管一个链接
  • :param conn:
  • :param client_address:
  • :return:
  • """
  • print('MyServer')
  • server = MyServer(conn, client_addr)
  • server.handle_cmd()
  • class MyServer(object):
  • """
  • 处理客户端所有的交互socket server
  • """
  • STATUS = {
  • 300: 'File not exist !',
  • 301: 'File exist , and the msg include the file size!',
  • 302: 'File not exist !!!'
  • }
  • def __init__(self, conn, client_addr):
  • self.conn = conn
  • self.client_addr = client_addr
  • def handle_cmd(self):
  • """
  • 处理用户命令交互
  • :return:
  • """
  • print('handle_cmd')
  • while True:
  • try:
  • # 收到报头长度
  • recv_pack = self.conn.recv(4)
  • if not recv_pack:
  • print(
  • 'connect {} is lost ……'.format(
  • self.client_addr))
  • break
  • # 解析报头
  • recv_length = struct.unpack('i', recv_pack)[0]
  • header_data = self.conn.recv(recv_length)
  • # json_data
  • json_data = json.loads(header_data.decode('utf-8'))
  • print('recv data >>> {}'.format(json_data))
  • action_type = json_data.get('action_type')
  • if action_type:
  • # 使用反射
  • if hasattr(self, '_{}'.format(action_type)):
  • func = getattr(self, '_{}'.format(action_type))
  • func(json_data)
  • else:
  • print('invalid command')
  • except ConnectionResetError: # 适用于windows操作系统
  • break
  • def send_response(self, status_code, **kwargs):
  • """
  • 向客户端发送响应吗
  • :param status:
  • :return:
  • """
  • # 构造消息头
  • message = {
  • 'status': status_code,
  • 'status_message': self.STATUS.get(status_code)
  • }
  • message.update(kwargs) # 更新消息
  • message_json = json.dumps(message)
  • # 为防止粘包,封装消息包
  • header_byte = message_json.encode('utf-8')
  • # 先发送报头的长度
  • self.conn.send(struct.pack('i', len(message_json)))
  • print('发送response报头的长度: {}'.format(len(message_json)))
  • print('发送response报头内容:{}'.format(message))
  • # 发送报头
  • self.conn.send(header_byte)
  • def _get(self, data):
  • """
  • 下载文件,如果文件存在,发送状态码+文件大小+md5,发送文件
  • 不存在,发送状态码
  • :param data:
  • :return:
  • """
  • print('_get {}'.format(data))
  • file_path = os.path.join(
  • GET_FILE_DIR,
  • data.get('file_name'))
  • if os.path.isfile(file_path):
  • file_size = os.path.getsize(file_path)
  • print(
  • 'file_path: {} file_size: {} '.format(
  • file_path, file_size))
  • self.send_response(301, file_size=file_size, md5=common_utils.get_md5(
  • file_path), server_file_dir=os.path.dirname(file_path))
  • print('read to send file >>>', data.get('file_name'))
  • with open(file_path, 'rb') as f:
  • for line in f:
  • self.conn.send(line)
  • else:
  • print('send file {} done'.format(file_path))
  • else:
  • self.send_response(302)
  • def _put(self, data):
  • """
  • 拿到文件名和大小,检测本地是否存在相同文件
  • 如果存在,创建新文件local_file_name+timestamp
  • 如果存在,创建新文件local_file_name
  • :param data:
  • :return:
  • """
  • print('_put {}'.format(data))
  • file_size = data.get('file_size')
  • file_name = data.get('file_name')
  • file_path = os.path.join(
  • PUT_FILE_DIR,
  • file_name)
  • client_md5 = data.get('md5')
  • if os.path.isfile(file_path):
  • print('file is exist')
  • file_path = '{}.{}'.format(file_path, str(int(time.time())))
  • tmp_file = '{}.down'.format(file_path)
  • print('tmp_file:', tmp_file)
  • f = open(tmp_file, 'wb')
  • recv_size = 0
  • print('put file {} start >>> '.format(file_path))
  • while recv_size < file_size:
  • data = self.conn.recv(8192) # 接收文件内容
  • f.write(data)
  • recv_size += len(data)
  • else:
  • print("\n")
  • print(
  • '-- file [{}] put done, received size [{}]'.format(file_name, common_utils.bytes2human(
  • os.path.getsize(tmp_file))))
  • f.close()
  • os.rename(tmp_file, file_path)
  • server_md5 = common_utils.get_md5(file_path)
  • if server_md5 == client_md5:
  • print('文件上传完整与客户端一致')
  • if __name__ == '__main__':
  • run_forever()

FTP Client

  • import argparse
  • import socket
  • import json
  • import struct
  • import sys
  • import os
  • BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  • sys.path.append(BASE_DIR)
  • from demo import common_utils
  • PUT_FILE_PATH = r'C:\x\LuffyFTP\sharefile\client\put'
  • GET_FILE_PATH = r'C:\x\LuffyFTP\sharefile\client\get'
  • IP_PORT = ('127.0.0.1', 9999)
  • class FtpClient():
  • """
  • ftp客户端
  • """
  • def __init__(self):
  • self.client_sock = None
  • self.make_connect()
  • def make_connect(self):
  • """
  • 连接服务器
  • :return:
  • """
  • try:
  • self.client_sock = socket.socket(
  • socket.AF_INET, socket.SOCK_STREAM)
  • print('连接服务器')
  • self.client_sock.connect(IP_PORT) # 连接服务器
  • except Exception as e:
  • print('连接服务器异常', e)
  • def interactive(self):
  • """
  • 交互
  • :return:
  • """
  • menu = """
  • 1. 下载文件 get 1.txt
  • 2. 上传文件 put 1.txt
  • 3. 退出 bye
  • """
  • print(menu)
  • while True:
  • user_input = input('请输入 >>> ').strip()
  • if not user_input:
  • continue
  • cmd_list = user_input.split()
  • if hasattr(self, '_{}'.format(cmd_list[0])):
  • func = getattr(self, '_{}'.format(cmd_list[0]))
  • func(cmd_list) # get
  • def send_msg(self, action_type, **kwargs):
  • """
  • 打包消息,发送到服务器
  • :param action_type:
  • :param kwargs:
  • :return:
  • """
  • cmd = {
  • 'action_type': action_type,
  • }
  • cmd.update(kwargs) # 更新字典
  • cmd_json = json.dumps(cmd)
  • # 为防止粘包,封装包
  • header_byte = cmd_json.encode('utf-8')
  • # 先发送报头的长度
  • self.client_sock.send(struct.pack('i', len(cmd_json)))
  • print('发送auth报头的长度: {}'.format(len(cmd_json)))
  • print('发送auth报头内容:{}'.format(cmd_json))
  • # 发送报头
  • self.client_sock.send(header_byte)
  • def arg_check(self, cmd_args, len_args):
  • if len(cmd_args) != len_args:
  • print(
  • 'must provide {} parameters but received {}'.format(len_args,
  • len(cmd_args)))
  • return False
  • else:
  • return True
  • def get_response(self):
  • """
  • 收到服务器向客户端发送的响应
  • :return:
  • """
  • # 收到报头长度
  • recv_pack = self.client_sock.recv(4)
  • if recv_pack:
  • # 解析报头
  • recv_length = struct.unpack('i', recv_pack)[0]
  • header_data = self.client_sock.recv(recv_length)
  • # json_data
  • json_data = json.loads(header_data.decode('utf-8'))
  • print('recv response >>> {}'.format(json_data))
  • return json_data
  • else:
  • print('recv_pack is null !!!')
  • return None
  • def _get(self, cmd_args):
  • """
  • 得到文件,发送到远程,等待返回消息,
  • 等待文件,循环收文件
  • :param cmd_args:
  • :return:
  • """
  • if self.arg_check(cmd_args, 2):
  • file_name = cmd_args[1] # get filename
  • self.send_msg('get', file_name=file_name)
  • response_data = self.get_response()
  • if response_data.get('status') == 301:
  • file_size = response_data.get('file_size')
  • server_md5 = response_data.get('md5')
  • file_path = os.path.join(
  • GET_FILE_PATH, file_name)
  • recv_size = 0
  • p = self.progress_bar(file_size) # 进度条
  • p.send(None)
  • print('get file {} start >>> '.format(file_name))
  • tmp_file = '{}.down'.format(file_path)
  • with open(tmp_file, 'wb') as f: # 写下载文件
  • # 序列化保存数据
  • while recv_size < file_size:
  • data = self.client_sock.recv(8192)
  • f.write(data)
  • recv_size += len(data)
  • p.send(recv_size)
  • else:
  • print("\n")
  • print(
  • '-- file [{}] recv done, received size [{}]'.format(file_name, file_size))
  • if os.path.isfile(file_path): # 如果文件存在,删除后覆盖文件
  • os.remove(file_path)
  • os.rename(tmp_file, file_path)
  • client_md5 = common_utils.get_md5(file_path)
  • if server_md5 == client_md5:
  • print('文件下载完整与服务端一致')
  • else:
  • print(response_data.get('status_message'))
  • def _put(self, cmd_args):
  • """
  • 1.上传本地文件到服务器
  • 2.确保本地文件存在
  • 3.把文件名和文件大小发送到远程
  • 4.发送文件内容
  • :return:
  • """
  • if self.arg_check(cmd_args, 2):
  • local_file_name = cmd_args[1] # put filename
  • full_path = os.path.join(PUT_FILE_PATH, local_file_name)
  • if os.path.isfile(full_path):
  • total_size = os.path.getsize(full_path)
  • self.send_msg(
  • 'put',
  • file_name=local_file_name,
  • file_size=total_size, md5=common_utils.get_md5(full_path))
  • p = self.progress_bar(total_size)
  • p.send(None)
  • upload_size = 0
  • with open(full_path, 'rb') as f: # 发送文件
  • for line in f:
  • self.client_sock.send(line)
  • upload_size += len(line)
  • p.send(upload_size)
  • else:
  • print("\n")
  • print('file upload done'.center(50, '-'))
  • else:
  • print(
  • 'file [{}] is not exist !!!'.format(local_file_name))
  • def _bye(self, cmd_args):
  • """
  • 退出
  • :return:
  • """
  • print("bye")
  • self.client_sock.close()
  • exit(0)
  • @staticmethod
  • def progress_bar(total_size):
  • """
  • 显示进度条
  • :param total_size:
  • :return:
  • """
  • current_percent = 0
  • last_percent = 0
  • while True:
  • recv_size = yield current_percent
  • current_percent = int(recv_size / total_size * 100)
  • print("#" * int(current_percent / 4) + '{percent}%'.format(percent=int(current_percent)), end="\r",
  • flush=True)
  • if __name__ == '__main__':
  • c = FtpClient()
  • c.interactive()

common_util

  • import logging
  • from logging import handlers
  • import os
  • from tkinter import Tk, filedialog
  • import os
  • import hashlib
  • def bytes2human(n):
  • # 文件大小字节单位转换
  • symbols = ('K', 'M', 'G', 'T', 'P', 'E')
  • prefix = {}
  • for i, s in enumerate(symbols):
  • # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4,
  • # 10位就表示1024 即1 << 10=1024 就是2的n次方
  • prefix[s] = 1 << (i + 1) * 10
  • for s in reversed(symbols):
  • if n >= prefix[s]:
  • value = float(n) / prefix[s]
  • return '%.2f%s' % (value, s)
  • return "%sB" % n
  • def get_md5(file_path):
  • """
  • 得到文件MD5
  • :param file_path:
  • :return:
  • """
  • if os.path.isfile(file_path):
  • file_size = os.stat(file_path).st_size
  • md5_obj = hashlib.md5() # hashlib
  • f = open(file_path, 'rb') # 打开文件
  • read_size = 0
  • while read_size < file_size:
  • read_byte = f.read(8192)
  • md5_obj.update(read_byte) # update md5
  • read_size += len(read_byte)
  • hash_code = md5_obj.hexdigest() # get md5 hexdigest
  • f.close()
  • print('file: [{}] \nsize: [{}] \nmd5: [{}]'.format(
  • file_path, bytes2human(read_size), hash_code))
  • return str(hash_code)
  • def get_dir_size_count(dir):
  • """
  • 获得文件夹中所有文件大小和文件个数
  • :param dir:
  • :return:
  • """
  • size = 0
  • count = 0
  • for root, dirs, files in os.walk(dir):
  • size_li = [os.path.getsize(os.path.join(root, name))
  • for name in files]
  • size += sum(size_li)
  • count += len(size_li)
  • print('目录{} 文件个数{}, 总共大小约{}'.format(dir, count, bytes2human(size)))
  • return count, size
  • def brows_local_filename(title='choose a file', force=False):
  • """
  • Select a local file by filedialog of tkinter. Return an exist file path.
  • :param title:
  • :param force: If force is True user must choose a file.
  • :return:
  • """
  • tk = Tk()
  • tk.withdraw()
  • tk.wm_attributes('-topmost', 1)
  • while True:
  • filename = filedialog.askopenfilename(title=title)
  • if not force or filename:
  • break
  • tk.destroy()
  • return filename
  • def brows_save_filename(title='save as'):
  • """
  • Select a local path to save a file by filedialog of tkinter.Return a path for saving file.
  • :param title:
  • :return:
  • """
  • tk = Tk()
  • tk.withdraw()
  • tk.wm_attributes('-topmost', 1)
  • filename = filedialog.asksaveasfilename(title=title)
  • tk.destroy()
  • return filename
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门