2025年3月25日 星期二 甲辰(龙)年 月廿四 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

PostgreSQL和Excel的数据合并

时间:04-22来源:作者:点击数:72

用于将Excel中的数据和PG数据库中的数据进行联合处理,涉及:

  • 数据库访问
  • Excel文件访问
  • 字典和列表的使用
  • 文件读写
  • 异常处理

Python3完整源码如下:

  • # -*- coding: utf-8 -*-
  • import xlrd
  • import xlwt
  • # import pandas as pd
  • import psycopg2
  • # import openpyxl
  • # 从数据库中读取的设施字典/类别字典/型号字典/规格字典/厂家字典/设施编码/类别编码{name: id}
  • g_facility_dict = {}
  • g_category_dict = {}
  • g_provider_dict = {}
  • g_model_dict = {}
  • g_specs_dict = {}
  • g_level_dict = {}
  • g_facility_code_dict = {}
  • g_category_code_dict = {}
  • # 从数据库中读取的设备列表
  • g_device_asset_in_db = []
  • # 从XML文件中读取的设备列表
  • g_device_asset_in_xml = []
  • # 需要操作的SQL列表
  • g_device_to_delete = []
  • g_device_to_insert = []
  • g_device_to_update = []
  • g_device_error = []
  • # 缓存当前处理的设施
  • g_cache_facility_name = ""
  • g_cache_device_order_in_facility = 1
  • # 缓存最大ID号
  • g_max_dev_id = 0
  • # 源表和目标表
  • G_TABLE_SOURCE = "amp_device_asset"
  • G_TABLE_TARGET = "amp_device_asset"
  • # 清理缓存
  • def make_clean():
  • global g_device_asset_in_xml
  • global g_device_to_delete
  • global g_device_to_insert
  • global g_device_to_update
  • global g_device_error
  • global g_cache_facility_name
  • global g_cache_device_order_in_facility
  • g_device_asset_in_xml = []
  • g_device_to_delete = []
  • g_device_to_insert = []
  • g_device_to_update = []
  • g_device_error = []
  • g_cache_facility_name = ""
  • g_cache_device_order_in_facility = 1
  • # 连接数据库
  • def connect_db():
  • t_connection = psycopg2.connect(database="my_dbname",
  • user="my_username", password="my_passwd",
  • host="my_ipaddr", port="5432")
  • t_cursor = t_connection.cursor()
  • return t_connection, t_cursor
  • # 关闭数据库连接
  • def disconnect_db(connection, cursor):
  • # 提交事务
  • connection.commit()
  • # 关闭连接
  • cursor.close()
  • connection.close()
  • # 读取数据库中的所有设备台账数据和相关字典
  • def read_from_db(cursor):
  • # 设施字典
  • cursor.execute("SELECT id,name FROM amp_facility_asset;")
  • # 获取数据,生成字典
  • global g_facility_dict
  • for item in cursor.fetchall():
  • g_facility_dict[item[1]] = item[0]
  • # 类别字典
  • cursor.execute("SELECT id,name FROM amp_category_define;")
  • # 获取数据,生成字典
  • global g_category_dict
  • for item in cursor.fetchall():
  • g_category_dict[item[1]] = item[0]
  • # 厂家字典
  • cursor.execute("SELECT id,name FROM amp_service_provider;")
  • # 获取数据,生成字典
  • global g_provider_dict
  • for item in cursor.fetchall():
  • g_provider_dict[item[1]] = item[0]
  • # 型号字典
  • cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
  • "left join sys_dict_def b on a.dict_id = b.id "
  • "where b.name = 'EQ_MODEL' order by a.value;")
  • # 获取数据,生成字典
  • global g_model_dict
  • for item in cursor.fetchall():
  • g_model_dict[item[1]] = item[0]
  • # 规格字典
  • cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
  • "left join sys_dict_def b on a.dict_id = b.id "
  • "where b.name = 'EQ_SPECS' order by a.value;")
  • # 获取数据,生成字典
  • global g_specs_dict
  • for item in cursor.fetchall():
  • g_specs_dict[item[1]] = item[0]
  • # 评级字典
  • cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
  • "left join sys_dict_def b on a.dict_id = b.id "
  • "where b.name = 'EQ_LEVEL' order by a.value;")
  • # 获取数据,生成字典
  • global g_level_dict
  • for item in cursor.fetchall():
  • g_level_dict[item[1]] = item[0]
  • # 设施编码字典
  • cursor.execute("select substr(di.code,1,1), f.name from amp_facility_asset f "
  • "left join sys_dict_item di on f.facility_type = di.value "
  • "left join sys_dict_def dd on dd.id = di.dict_id "
  • "where f.facility_type in (1,2,3,4,5) and dd.description = '设施分类';")
  • # 获取数据,生成字典
  • global g_facility_code_dict
  • for item in cursor.fetchall():
  • g_facility_code_dict[item[1]] = item[0]
  • # 类别编码字典
  • cursor.execute("select category_code,name from amp_category_define where category_type = 240001 and pid > 0;")
  • # 获取数据,生成字典
  • global g_category_code_dict
  • for item in cursor.fetchall():
  • g_category_code_dict[item[1]] = item[0]
  • # 执行SQL语句
  • cursor.execute(f"SELECT a.id,a.name as name,f.name as f_name FROM {G_TABLE_SOURCE} a "
  • "left join amp_facility_asset f on a.belong_facility = f.id "
  • "order by f.name, a.name, a.id;")
  • # 获取数据
  • global g_device_asset_in_db
  • for item in cursor.fetchall():
  • g_device_asset_in_db.append({"id": item[0],
  • "name": item[1],
  • "f_name": item[2]})
  • # 获取同一设施下的设备数量
  • def get_dev_cnt_under_facility(cursor, f_name):
  • cursor.execute(f"SELECT count(*) FROM amp_device_asset where belong_facility = (select id from "
  • f"amp_facility_asset where name = '{f_name}');")
  • for item in cursor.fetchall():
  • return item[0]
  • # 读取XML文件中的所有设备台账数据
  • def read_xml_file(file_name, flag, fill=False):
  • handle = xlrd.open_workbook(file_name)
  • table = handle.sheet_by_index(0)
  • print(f"文件名:{file_name}")
  • print(f"总行数:{str(table.nrows)}")
  • print(f"总列数:{str(table.ncols)}")
  • facility_name = table.col_values(0)
  • facility_code = table.col_values(1)
  • col_2 = table.col_values(2)
  • device_name = table.col_values(3)
  • category_name = table.col_values(4)
  • device_code = table.col_values(5)
  • dev_level = table.col_values(6)
  • dispatch_no = table.col_values(7)
  • specs_name = table.col_values(8)
  • model_name = table.col_values(9)
  • provider_name = table.col_values(10)
  • build_date = table.col_values(11)
  • col_12 = table.col_values(12)
  • col_13 = table.col_values(13)
  • col_14 = table.col_values(14)
  • col_15 = table.col_values(15)
  • col_16 = table.col_values(16)
  • col_17 = table.col_values(17)
  • power_scope = ["" for x in range(len(facility_name))]
  • if not fill:
  • power_scope = table.col_values(18)
  • col_19 = table.col_values(19)
  • col_20 = table.col_values(20)
  • list_dict = []
  • for i in range(table.nrows):
  • if device_name[i] is None or len(str(device_name[i]).strip()) == 0:
  • if len(str(category_name[i])) > 0:
  • device_name[i] = category_name[i]
  • else:
  • print(f"{file_name} L: {i}")
  • list_dict.append({"facility_name": str(facility_name[i]).strip().replace("\n", "")
  • .replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
  • "facility_code": str(facility_code[i]).strip().replace("\n", ""),
  • "col_2": str(col_2[i]).strip().replace("\n", ""),
  • "device_name": str(device_name[i]).strip().replace("\n", ""),
  • "category_name": str(category_name[i]).strip().replace("\n", "")
  • .replace("主变压器", "变压器").replace("GIS设备", "GIS"),
  • "device_code": str(device_code[i]).strip().replace("\n", ""),
  • "dev_level": str(dev_level[i]).strip().replace("\n", ""),
  • "dispatch_no": str(dispatch_no[i]).strip().replace("\n", "").replace(".0", ""),
  • "specs_name": str(specs_name[i]).strip().replace("\n", ""),
  • "model_name": str(model_name[i]).strip().replace("\n", ""),
  • "provider_name": str(provider_name[i]).strip().replace("\n", ""),
  • "build_date": str(build_date[i]).strip().replace("\n", ""),
  • "col_12": str(col_12[i]).strip().replace("\n", ""),
  • "col_13": str(col_13[i]).strip().replace("\n", ""),
  • "col_14": str(col_14[i]).strip().replace("\n", ""),
  • "col_15": str(col_15[i]).strip().replace("\n", ""),
  • "col_16": str(col_16[i]).strip().replace("\n", ""),
  • "col_17": str(col_17[i]).strip().replace("\n", ""),
  • "power_scope": str(power_scope[i]).strip().replace("\n", "")
  • .replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
  • "col_19": str(col_19[i]).strip().replace("\n", ""),
  • "col_20": str(col_20[i]).strip().replace("\n", ""),
  • "flag": flag})
  • return list_dict
  • # 从字典中获取值
  • def fetch_from_dict(item, key_name, value_type, dict_name=None):
  • current_item_name = key_name
  • current_item_str = item[key_name]
  • try:
  • if value_type == "str":
  • return current_item_str
  • elif value_type == "num":
  • return dict_name[current_item_str] if (current_item_str in dict_name) else "null"
  • else:
  • return "null"
  • except KeyError as kerr:
  • print("[Error] when makeup sql: err=%s\n file=%s\t item_name=%s\t item_str=%s"
  • % (kerr, item['flag'], current_item_name, current_item_str))
  • item["col_20"] = f"{key_name} 在数据库中未找到定义"
  • g_device_error.append(item)
  • raise Exception("[Error] : item_name=%s\t item_str=%s" % (current_item_name, current_item_str))
  • # 生成SN码
  • def generate_sn(cursor, item):
  • var_category = fetch_from_dict(item, "category_name", "num", g_category_code_dict)
  • var_facility = fetch_from_dict(item, "facility_name", "num", g_facility_code_dict)
  • global g_cache_facility_name
  • global g_cache_device_order_in_facility
  • current_facility_name = item["facility_name"]
  • if current_facility_name != g_cache_facility_name:
  • g_cache_facility_name = current_facility_name
  • g_cache_device_order_in_facility = get_dev_cnt_under_facility(cursor, current_facility_name) + 1
  • else:
  • g_cache_device_order_in_facility += 1
  • sn = f"电-{var_category}-{g_cache_device_order_in_facility}-{item['flag']}{var_facility}01"
  • if sn.find("null") >= 0:
  • print("[Error] when makeup sn=%s,\t 文件=%s\t 设施名称=%s\t 设备名称=%s"
  • % (sn, item['flag'], item['facility_name'], item['device_name']))
  • if var_category is None or var_category == 'null':
  • item["col_20"] = f"字段 '设备类别' 在数据库中未找到定义"
  • if var_facility is None or var_facility == 'null':
  • item["col_20"] = f"字段 '设施名称' 在数据库中未找到定义"
  • g_device_error.append(item)
  • raise Exception("[Error] when makeup sn: err=%s,\t file=%s\t facility=%s\t device=%s"
  • % (sn, item['flag'], item['facility_name'], item['device_name']))
  • else:
  • return sn
  • # 将数据库中的数据和XML文件中的数据进行逐条处理
  • def process_data(cursor):
  • # 根据设施名称和设备名称,将DB数据中的device.id对应写入到XML数据中
  • global g_max_dev_id
  • for dev_in_xml in g_device_asset_in_xml:
  • if dev_in_xml["facility_name"] == "设施名称":
  • continue
  • for dev_in_db in g_device_asset_in_db:
  • if not dev_in_xml["device_name"] is None and not dev_in_db["name"] is None \
  • and dev_in_xml["device_name"] == dev_in_db["name"] \
  • and not dev_in_xml["facility_name"] is None and not dev_in_db["f_name"] is None \
  • and dev_in_xml["facility_name"] == dev_in_db["f_name"]:
  • dev_in_xml["id"] = dev_in_db["id"]
  • g_device_asset_in_db.remove(dev_in_db)
  • if dev_in_db["id"] > g_max_dev_id:
  • g_max_dev_id = dev_in_db["id"]
  • print("=======break:01")
  • break
  • # SQL语句
  • device_sql_list = []
  • # 数据库中有,且XML文件中没有的设备ID,存入列表g_device_asset_in_db_only中
  • for dev_in_db in g_device_asset_in_db:
  • sql_str = f"delete from {G_TABLE_TARGET} where id = {dev_in_db['id']};"
  • g_device_to_delete.append(sql_str)
  • device_sql_list.extend(g_device_to_delete)
  • # XML文件中有,且数据库中没有的设备(ID属性为空),为其分配ID
  • global g_device_error
  • for dev_in_xml in g_device_asset_in_xml:
  • if dev_in_xml["facility_name"] == "设施名称":
  • continue
  • """
  • if 'id' in dev_in_xml:
  • print("=======pass")
  • pass
  • sql_str = f"update {G_TABLE_TARGET} set " \
  • f"dispatch_no = '{dev_in_xml['dispatch_no']}', " \
  • f"power_scope_text='{dev_in_xml['power_scope']}' " \
  • f"where id = {dev_in_xml['id']}; "
  • g_device_to_update.append(sql_str)
  • device_sql_list.extend(g_device_to_update)
  • else:
  • """
  • g_max_dev_id += 1
  • dev_in_xml["id"] = g_max_dev_id
  • try:
  • var_device = fetch_from_dict(dev_in_xml, "device_name", "str")
  • var_qr = fetch_from_dict(dev_in_xml, "device_code", "str")
  • var_facility = fetch_from_dict(dev_in_xml, "facility_name", "num", g_facility_dict)
  • var_provider = fetch_from_dict(dev_in_xml, "provider_name", "num", g_provider_dict)
  • var_category = fetch_from_dict(dev_in_xml, "category_name", "num", g_category_dict)
  • var_model = fetch_from_dict(dev_in_xml, "model_name", "num", g_model_dict)
  • var_specs = fetch_from_dict(dev_in_xml, "specs_name", "num", g_specs_dict)
  • var_dispatch = fetch_from_dict(dev_in_xml, "dispatch_no", "str")
  • var_power = fetch_from_dict(dev_in_xml, "power_scope", "str")
  • var_level = fetch_from_dict(dev_in_xml, "dev_level", "num", g_level_dict)
  • var_build = fetch_from_dict(dev_in_xml, "build_date", "str")
  • var_sn = generate_sn(cursor, dev_in_xml)
  • sql_str = f"insert into {G_TABLE_TARGET}(id, name, belong_facility, sn, mng_user_id, live_state, " \
  • f"model, specs, category, manufacture, dispatch_no, power_scope_text, "\
  • f"level, build_date, qr_code) values" \
  • f"({g_max_dev_id},'{var_device}',{var_facility},'{var_sn}@{dev_in_xml['facility_code']}',1,1," \
  • f"{var_model},{var_specs},{var_category},{var_provider},'{var_dispatch}','{var_power}',"\
  • f"{var_level},'{var_build}','{var_qr}');"
  • except Exception as err:
  • print(err)
  • continue
  • g_device_to_insert.append(sql_str)
  • device_sql_list.extend(g_device_to_insert)
  • return device_sql_list
  • # 写文本文件
  • def write_sql_file(file_name, str_list):
  • try:
  • with open(file_name, 'w', encoding='utf-8') as sql_file:
  • for sql_str in str_list:
  • sql_file.write(str(sql_str) + "\n")
  • except IOError as err:
  • print(f'[Error] when write sql to file: {err}')
  • # 写Excel文件
  • def write_xml_file(file_name, str_list=[]):
  • handle = xlwt.Workbook()
  • sheet = handle.add_sheet('error_data')
  • title = ['设施名称', '设施编码', '设施类别', '设备名称', '设备类别', '设备编号', '设备评级', '调度号', '设备规格', '设备型号',
  • '制造厂家', '投运日期', '出厂编号', '资产所属', '电源接引', '接入方式', '供电方式', '设备分类', '供电范围', '主要参数', '备注']
  • for i in range(len(title)):
  • sheet.write(0, i, title[i])
  • for i in range(len(str_list)):
  • dict_cell = list(str_list[i].values())
  • for j in range(len(dict_cell)):
  • if j == len(dict_cell)-2:
  • break
  • sheet.write(i+1, j, dict_cell[j])
  • handle.save(file_name)
  • """
  • # 将表格写入Excel
  • def export_excel(export):
  • # 将字典列表转换为DataFrame
  • pf = pd.DataFrame(list(export))
  • # 指定字段顺序
  • order = ['file', 'facility_name', 'device_name', 'sn']
  • pf = pf[order]
  • # 将列名替换为中文
  • columns_map = {
  • 'file': '文件',
  • 'facility_name': '设施名称',
  • 'device_name': '设备名称',
  • 'sn': 'sn'
  • }
  • pf.rename(columns=columns_map, inplace=True)
  • # 指定生成的Excel表格名称
  • file_path = pd.ExcelWriter('error.xlsx')
  • # 替换空单元格
  • pf.fillna(' ', inplace=True)
  • # 输出
  • pf.to_excel(file_path, encoding='utf-8', index=False)
  • # 保存表格
  • file_path.save()
  • """
  • def print_hi(name):
  • # Use a breakpoint in the code line below to debug your script.
  • print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
  • # 处理一个文件
  • def process_file(file_name, cursor, flag):
  • # 从XML文件中读取设备台账信息:name,f_name,dispatch_no,power_scope
  • make_clean()
  • global g_device_asset_in_xml
  • g_device_asset_in_xml = read_xml_file(file_name, flag)
  • g_device_asset_in_xml = sorted(g_device_asset_in_xml, key=lambda r: r['facility_name'])
  • process_data(cursor)
  • write_sql_file(f"output/sql/{flag}-delete.sql", g_device_to_delete)
  • write_sql_file(f"output/sql/{flag}-insert.sql", g_device_to_insert)
  • write_sql_file(f"output/sql/{flag}-update.sql", g_device_to_update)
  • write_xml_file(f"output/errors/{flag}-errors.xls", g_device_error)
  • # Press the green button in the gutter to run the script.
  • if __name__ == '__main__':
  • # print_hi('PyCharm')
  • # 从数据库中读取设备台账信息:id,name,f_name
  • g_conn, g_curs = connect_db()
  • read_from_db(g_curs)
  • print(f"find {len(g_device_asset_in_db)} device in db")
  • global g_max_dev_id
  • """
  • # 第一批用此代码
  • g_max_dev_id = 0
  • process_file('input/S-P1.xls', g_curs, 'S')
  • process_file('input/N-P1.xls', g_curs, 'N')
  • process_file('input/D-P1.xls', g_curs, 'D')
  • process_file('input/L-P1.xls', g_curs, 'L')
  • """
  • # 第二批用此代码
  • g_max_dev_id = 9404
  • process_file('input/feedback/S-P2.xls', g_curs, 'S')
  • process_file('input/feedback/N-P2.xls', g_curs, 'N')
  • process_file('input/feedback/D-P2.xls', g_curs, 'D')
  • process_file('input/feedback/L-P2', g_curs, 'L')
  • disconnect_db(g_conn, g_curs)
  • # See PyCharm help at https://www.jetbrains.com/help/pycharm/
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门