用于将Excel中的数据和PG数据库中的数据进行联合处理,涉及:
Python3完整源码如下:
- # -*- coding: utf-8 -*-
-
- import xlrd
- import xlwt
- # import pandas as pd
- import psycopg2
- # import openpyxl
-
- # 从数据库中读取的设施字典/类别字典/型号字典/规格字典/厂家字典/设施编码/类别编码{name: id}
- g_facility_dict = {}
- g_category_dict = {}
- g_provider_dict = {}
- g_model_dict = {}
- g_specs_dict = {}
- g_level_dict = {}
- g_facility_code_dict = {}
- g_category_code_dict = {}
- # 从数据库中读取的设备列表
- g_device_asset_in_db = []
- # 从XML文件中读取的设备列表
- g_device_asset_in_xml = []
- # 需要操作的SQL列表
- g_device_to_delete = []
- g_device_to_insert = []
- g_device_to_update = []
- g_device_error = []
- # 缓存当前处理的设施
- g_cache_facility_name = ""
- g_cache_device_order_in_facility = 1
- # 缓存最大ID号
- g_max_dev_id = 0
-
-
- # 源表和目标表
- G_TABLE_SOURCE = "amp_device_asset"
- G_TABLE_TARGET = "amp_device_asset"
-
-
- # 清理缓存
- def make_clean():
- global g_device_asset_in_xml
- global g_device_to_delete
- global g_device_to_insert
- global g_device_to_update
- global g_device_error
- global g_cache_facility_name
- global g_cache_device_order_in_facility
-
- g_device_asset_in_xml = []
- g_device_to_delete = []
- g_device_to_insert = []
- g_device_to_update = []
- g_device_error = []
- g_cache_facility_name = ""
- g_cache_device_order_in_facility = 1
-
-
- # 连接数据库
- def connect_db():
- t_connection = psycopg2.connect(database="my_dbname",
- user="my_username", password="my_passwd",
- host="my_ipaddr", port="5432")
- t_cursor = t_connection.cursor()
- return t_connection, t_cursor
-
-
- # 关闭数据库连接
- def disconnect_db(connection, cursor):
- # 提交事务
- connection.commit()
- # 关闭连接
- cursor.close()
- connection.close()
-
-
- # 读取数据库中的所有设备台账数据和相关字典
- def read_from_db(cursor):
- # 设施字典
- cursor.execute("SELECT id,name FROM amp_facility_asset;")
- # 获取数据,生成字典
- global g_facility_dict
- for item in cursor.fetchall():
- g_facility_dict[item[1]] = item[0]
-
- # 类别字典
- cursor.execute("SELECT id,name FROM amp_category_define;")
- # 获取数据,生成字典
- global g_category_dict
- for item in cursor.fetchall():
- g_category_dict[item[1]] = item[0]
-
- # 厂家字典
- cursor.execute("SELECT id,name FROM amp_service_provider;")
- # 获取数据,生成字典
- global g_provider_dict
- for item in cursor.fetchall():
- g_provider_dict[item[1]] = item[0]
-
- # 型号字典
- cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
- "left join sys_dict_def b on a.dict_id = b.id "
- "where b.name = 'EQ_MODEL' order by a.value;")
- # 获取数据,生成字典
- global g_model_dict
- for item in cursor.fetchall():
- g_model_dict[item[1]] = item[0]
-
- # 规格字典
- cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
- "left join sys_dict_def b on a.dict_id = b.id "
- "where b.name = 'EQ_SPECS' order by a.value;")
- # 获取数据,生成字典
- global g_specs_dict
- for item in cursor.fetchall():
- g_specs_dict[item[1]] = item[0]
-
- # 评级字典
- cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
- "left join sys_dict_def b on a.dict_id = b.id "
- "where b.name = 'EQ_LEVEL' order by a.value;")
- # 获取数据,生成字典
- global g_level_dict
- for item in cursor.fetchall():
- g_level_dict[item[1]] = item[0]
-
- # 设施编码字典
- cursor.execute("select substr(di.code,1,1), f.name from amp_facility_asset f "
- "left join sys_dict_item di on f.facility_type = di.value "
- "left join sys_dict_def dd on dd.id = di.dict_id "
- "where f.facility_type in (1,2,3,4,5) and dd.description = '设施分类';")
- # 获取数据,生成字典
- global g_facility_code_dict
- for item in cursor.fetchall():
- g_facility_code_dict[item[1]] = item[0]
-
- # 类别编码字典
- cursor.execute("select category_code,name from amp_category_define where category_type = 240001 and pid > 0;")
- # 获取数据,生成字典
- global g_category_code_dict
- for item in cursor.fetchall():
- g_category_code_dict[item[1]] = item[0]
-
- # 执行SQL语句
- cursor.execute(f"SELECT a.id,a.name as name,f.name as f_name FROM {G_TABLE_SOURCE} a "
- "left join amp_facility_asset f on a.belong_facility = f.id "
- "order by f.name, a.name, a.id;")
- # 获取数据
- global g_device_asset_in_db
- for item in cursor.fetchall():
- g_device_asset_in_db.append({"id": item[0],
- "name": item[1],
- "f_name": item[2]})
-
-
- # 获取同一设施下的设备数量
- def get_dev_cnt_under_facility(cursor, f_name):
- cursor.execute(f"SELECT count(*) FROM amp_device_asset where belong_facility = (select id from "
- f"amp_facility_asset where name = '{f_name}');")
- for item in cursor.fetchall():
- return item[0]
-
-
- # 读取XML文件中的所有设备台账数据
- def read_xml_file(file_name, flag, fill=False):
- handle = xlrd.open_workbook(file_name)
- table = handle.sheet_by_index(0)
-
- print(f"文件名:{file_name}")
- print(f"总行数:{str(table.nrows)}")
- print(f"总列数:{str(table.ncols)}")
-
- facility_name = table.col_values(0)
- facility_code = table.col_values(1)
- col_2 = table.col_values(2)
- device_name = table.col_values(3)
- category_name = table.col_values(4)
- device_code = table.col_values(5)
- dev_level = table.col_values(6)
- dispatch_no = table.col_values(7)
- specs_name = table.col_values(8)
- model_name = table.col_values(9)
- provider_name = table.col_values(10)
- build_date = table.col_values(11)
- col_12 = table.col_values(12)
- col_13 = table.col_values(13)
- col_14 = table.col_values(14)
- col_15 = table.col_values(15)
- col_16 = table.col_values(16)
- col_17 = table.col_values(17)
- power_scope = ["" for x in range(len(facility_name))]
- if not fill:
- power_scope = table.col_values(18)
- col_19 = table.col_values(19)
- col_20 = table.col_values(20)
-
- list_dict = []
- for i in range(table.nrows):
- if device_name[i] is None or len(str(device_name[i]).strip()) == 0:
- if len(str(category_name[i])) > 0:
- device_name[i] = category_name[i]
- else:
- print(f"{file_name} L: {i}")
- list_dict.append({"facility_name": str(facility_name[i]).strip().replace("\n", "")
- .replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
- "facility_code": str(facility_code[i]).strip().replace("\n", ""),
- "col_2": str(col_2[i]).strip().replace("\n", ""),
- "device_name": str(device_name[i]).strip().replace("\n", ""),
- "category_name": str(category_name[i]).strip().replace("\n", "")
- .replace("主变压器", "变压器").replace("GIS设备", "GIS"),
- "device_code": str(device_code[i]).strip().replace("\n", ""),
- "dev_level": str(dev_level[i]).strip().replace("\n", ""),
- "dispatch_no": str(dispatch_no[i]).strip().replace("\n", "").replace(".0", ""),
- "specs_name": str(specs_name[i]).strip().replace("\n", ""),
- "model_name": str(model_name[i]).strip().replace("\n", ""),
- "provider_name": str(provider_name[i]).strip().replace("\n", ""),
- "build_date": str(build_date[i]).strip().replace("\n", ""),
- "col_12": str(col_12[i]).strip().replace("\n", ""),
- "col_13": str(col_13[i]).strip().replace("\n", ""),
- "col_14": str(col_14[i]).strip().replace("\n", ""),
- "col_15": str(col_15[i]).strip().replace("\n", ""),
- "col_16": str(col_16[i]).strip().replace("\n", ""),
- "col_17": str(col_17[i]).strip().replace("\n", ""),
- "power_scope": str(power_scope[i]).strip().replace("\n", "")
- .replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
- "col_19": str(col_19[i]).strip().replace("\n", ""),
- "col_20": str(col_20[i]).strip().replace("\n", ""),
- "flag": flag})
- return list_dict
-
-
- # 从字典中获取值
- def fetch_from_dict(item, key_name, value_type, dict_name=None):
- current_item_name = key_name
- current_item_str = item[key_name]
- try:
- if value_type == "str":
- return current_item_str
- elif value_type == "num":
- return dict_name[current_item_str] if (current_item_str in dict_name) else "null"
- else:
- return "null"
- except KeyError as kerr:
- print("[Error] when makeup sql: err=%s\n file=%s\t item_name=%s\t item_str=%s"
- % (kerr, item['flag'], current_item_name, current_item_str))
- item["col_20"] = f"{key_name} 在数据库中未找到定义"
- g_device_error.append(item)
- raise Exception("[Error] : item_name=%s\t item_str=%s" % (current_item_name, current_item_str))
-
-
- # 生成SN码
- def generate_sn(cursor, item):
- var_category = fetch_from_dict(item, "category_name", "num", g_category_code_dict)
- var_facility = fetch_from_dict(item, "facility_name", "num", g_facility_code_dict)
- global g_cache_facility_name
- global g_cache_device_order_in_facility
- current_facility_name = item["facility_name"]
- if current_facility_name != g_cache_facility_name:
- g_cache_facility_name = current_facility_name
- g_cache_device_order_in_facility = get_dev_cnt_under_facility(cursor, current_facility_name) + 1
- else:
- g_cache_device_order_in_facility += 1
- sn = f"电-{var_category}-{g_cache_device_order_in_facility}-{item['flag']}{var_facility}01"
- if sn.find("null") >= 0:
- print("[Error] when makeup sn=%s,\t 文件=%s\t 设施名称=%s\t 设备名称=%s"
- % (sn, item['flag'], item['facility_name'], item['device_name']))
- if var_category is None or var_category == 'null':
- item["col_20"] = f"字段 '设备类别' 在数据库中未找到定义"
- if var_facility is None or var_facility == 'null':
- item["col_20"] = f"字段 '设施名称' 在数据库中未找到定义"
- g_device_error.append(item)
- raise Exception("[Error] when makeup sn: err=%s,\t file=%s\t facility=%s\t device=%s"
- % (sn, item['flag'], item['facility_name'], item['device_name']))
- else:
- return sn
-
-
- # 将数据库中的数据和XML文件中的数据进行逐条处理
- def process_data(cursor):
- # 根据设施名称和设备名称,将DB数据中的device.id对应写入到XML数据中
- global g_max_dev_id
- for dev_in_xml in g_device_asset_in_xml:
- if dev_in_xml["facility_name"] == "设施名称":
- continue
- for dev_in_db in g_device_asset_in_db:
- if not dev_in_xml["device_name"] is None and not dev_in_db["name"] is None \
- and dev_in_xml["device_name"] == dev_in_db["name"] \
- and not dev_in_xml["facility_name"] is None and not dev_in_db["f_name"] is None \
- and dev_in_xml["facility_name"] == dev_in_db["f_name"]:
- dev_in_xml["id"] = dev_in_db["id"]
- g_device_asset_in_db.remove(dev_in_db)
- if dev_in_db["id"] > g_max_dev_id:
- g_max_dev_id = dev_in_db["id"]
- print("=======break:01")
- break
-
- # SQL语句
- device_sql_list = []
- # 数据库中有,且XML文件中没有的设备ID,存入列表g_device_asset_in_db_only中
- for dev_in_db in g_device_asset_in_db:
- sql_str = f"delete from {G_TABLE_TARGET} where id = {dev_in_db['id']};"
- g_device_to_delete.append(sql_str)
- device_sql_list.extend(g_device_to_delete)
-
- # XML文件中有,且数据库中没有的设备(ID属性为空),为其分配ID
- global g_device_error
- for dev_in_xml in g_device_asset_in_xml:
- if dev_in_xml["facility_name"] == "设施名称":
- continue
- """
- if 'id' in dev_in_xml:
- print("=======pass")
- pass
- sql_str = f"update {G_TABLE_TARGET} set " \
- f"dispatch_no = '{dev_in_xml['dispatch_no']}', " \
- f"power_scope_text='{dev_in_xml['power_scope']}' " \
- f"where id = {dev_in_xml['id']}; "
- g_device_to_update.append(sql_str)
- device_sql_list.extend(g_device_to_update)
- else:
- """
- g_max_dev_id += 1
- dev_in_xml["id"] = g_max_dev_id
- try:
- var_device = fetch_from_dict(dev_in_xml, "device_name", "str")
- var_qr = fetch_from_dict(dev_in_xml, "device_code", "str")
- var_facility = fetch_from_dict(dev_in_xml, "facility_name", "num", g_facility_dict)
- var_provider = fetch_from_dict(dev_in_xml, "provider_name", "num", g_provider_dict)
- var_category = fetch_from_dict(dev_in_xml, "category_name", "num", g_category_dict)
- var_model = fetch_from_dict(dev_in_xml, "model_name", "num", g_model_dict)
- var_specs = fetch_from_dict(dev_in_xml, "specs_name", "num", g_specs_dict)
- var_dispatch = fetch_from_dict(dev_in_xml, "dispatch_no", "str")
- var_power = fetch_from_dict(dev_in_xml, "power_scope", "str")
- var_level = fetch_from_dict(dev_in_xml, "dev_level", "num", g_level_dict)
- var_build = fetch_from_dict(dev_in_xml, "build_date", "str")
- var_sn = generate_sn(cursor, dev_in_xml)
-
- sql_str = f"insert into {G_TABLE_TARGET}(id, name, belong_facility, sn, mng_user_id, live_state, " \
- f"model, specs, category, manufacture, dispatch_no, power_scope_text, "\
- f"level, build_date, qr_code) values" \
- f"({g_max_dev_id},'{var_device}',{var_facility},'{var_sn}@{dev_in_xml['facility_code']}',1,1," \
- f"{var_model},{var_specs},{var_category},{var_provider},'{var_dispatch}','{var_power}',"\
- f"{var_level},'{var_build}','{var_qr}');"
- except Exception as err:
- print(err)
- continue
- g_device_to_insert.append(sql_str)
- device_sql_list.extend(g_device_to_insert)
- return device_sql_list
-
-
- # 写文本文件
- def write_sql_file(file_name, str_list):
- try:
- with open(file_name, 'w', encoding='utf-8') as sql_file:
- for sql_str in str_list:
- sql_file.write(str(sql_str) + "\n")
- except IOError as err:
- print(f'[Error] when write sql to file: {err}')
-
-
- # 写Excel文件
- def write_xml_file(file_name, str_list=[]):
- handle = xlwt.Workbook()
- sheet = handle.add_sheet('error_data')
-
- title = ['设施名称', '设施编码', '设施类别', '设备名称', '设备类别', '设备编号', '设备评级', '调度号', '设备规格', '设备型号',
- '制造厂家', '投运日期', '出厂编号', '资产所属', '电源接引', '接入方式', '供电方式', '设备分类', '供电范围', '主要参数', '备注']
- for i in range(len(title)):
- sheet.write(0, i, title[i])
-
- for i in range(len(str_list)):
- dict_cell = list(str_list[i].values())
- for j in range(len(dict_cell)):
- if j == len(dict_cell)-2:
- break
- sheet.write(i+1, j, dict_cell[j])
- handle.save(file_name)
-
-
- """
- # 将表格写入Excel
- def export_excel(export):
- # 将字典列表转换为DataFrame
- pf = pd.DataFrame(list(export))
- # 指定字段顺序
- order = ['file', 'facility_name', 'device_name', 'sn']
- pf = pf[order]
- # 将列名替换为中文
- columns_map = {
- 'file': '文件',
- 'facility_name': '设施名称',
- 'device_name': '设备名称',
- 'sn': 'sn'
- }
- pf.rename(columns=columns_map, inplace=True)
- # 指定生成的Excel表格名称
- file_path = pd.ExcelWriter('error.xlsx')
- # 替换空单元格
- pf.fillna(' ', inplace=True)
- # 输出
- pf.to_excel(file_path, encoding='utf-8', index=False)
- # 保存表格
- file_path.save()
- """
-
-
- def print_hi(name):
- # Use a breakpoint in the code line below to debug your script.
- print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
-
-
- # 处理一个文件
- def process_file(file_name, cursor, flag):
- # 从XML文件中读取设备台账信息:name,f_name,dispatch_no,power_scope
- make_clean()
- global g_device_asset_in_xml
- g_device_asset_in_xml = read_xml_file(file_name, flag)
- g_device_asset_in_xml = sorted(g_device_asset_in_xml, key=lambda r: r['facility_name'])
- process_data(cursor)
- write_sql_file(f"output/sql/{flag}-delete.sql", g_device_to_delete)
- write_sql_file(f"output/sql/{flag}-insert.sql", g_device_to_insert)
- write_sql_file(f"output/sql/{flag}-update.sql", g_device_to_update)
- write_xml_file(f"output/errors/{flag}-errors.xls", g_device_error)
-
-
- # Press the green button in the gutter to run the script.
- if __name__ == '__main__':
- # print_hi('PyCharm')
-
- # 从数据库中读取设备台账信息:id,name,f_name
- g_conn, g_curs = connect_db()
- read_from_db(g_curs)
- print(f"find {len(g_device_asset_in_db)} device in db")
-
- global g_max_dev_id
- """
- # 第一批用此代码
- g_max_dev_id = 0
- process_file('input/S-P1.xls', g_curs, 'S')
- process_file('input/N-P1.xls', g_curs, 'N')
- process_file('input/D-P1.xls', g_curs, 'D')
- process_file('input/L-P1.xls', g_curs, 'L')
- """
- # 第二批用此代码
- g_max_dev_id = 9404
- process_file('input/feedback/S-P2.xls', g_curs, 'S')
- process_file('input/feedback/N-P2.xls', g_curs, 'N')
- process_file('input/feedback/D-P2.xls', g_curs, 'D')
- process_file('input/feedback/L-P2', g_curs, 'L')
-
- disconnect_db(g_conn, g_curs)
-
- # See PyCharm help at https://www.jetbrains.com/help/pycharm/
-
-