用于将Excel中的数据和PG数据库中的数据进行联合处理,涉及:
Python3完整源码如下:
# -*- coding: utf-8 -*-
import xlrd
import xlwt
# import pandas as pd
import psycopg2
# import openpyxl
# 从数据库中读取的设施字典/类别字典/型号字典/规格字典/厂家字典/设施编码/类别编码{name: id}
g_facility_dict = {}
g_category_dict = {}
g_provider_dict = {}
g_model_dict = {}
g_specs_dict = {}
g_level_dict = {}
g_facility_code_dict = {}
g_category_code_dict = {}
# 从数据库中读取的设备列表
g_device_asset_in_db = []
# 从XML文件中读取的设备列表
g_device_asset_in_xml = []
# 需要操作的SQL列表
g_device_to_delete = []
g_device_to_insert = []
g_device_to_update = []
g_device_error = []
# 缓存当前处理的设施
g_cache_facility_name = ""
g_cache_device_order_in_facility = 1
# 缓存最大ID号
g_max_dev_id = 0
# 源表和目标表
G_TABLE_SOURCE = "amp_device_asset"
G_TABLE_TARGET = "amp_device_asset"
# 清理缓存
def make_clean():
global g_device_asset_in_xml
global g_device_to_delete
global g_device_to_insert
global g_device_to_update
global g_device_error
global g_cache_facility_name
global g_cache_device_order_in_facility
g_device_asset_in_xml = []
g_device_to_delete = []
g_device_to_insert = []
g_device_to_update = []
g_device_error = []
g_cache_facility_name = ""
g_cache_device_order_in_facility = 1
# 连接数据库
def connect_db():
t_connection = psycopg2.connect(database="my_dbname",
user="my_username", password="my_passwd",
host="my_ipaddr", port="5432")
t_cursor = t_connection.cursor()
return t_connection, t_cursor
# 关闭数据库连接
def disconnect_db(connection, cursor):
# 提交事务
connection.commit()
# 关闭连接
cursor.close()
connection.close()
# 读取数据库中的所有设备台账数据和相关字典
def read_from_db(cursor):
# 设施字典
cursor.execute("SELECT id,name FROM amp_facility_asset;")
# 获取数据,生成字典
global g_facility_dict
for item in cursor.fetchall():
g_facility_dict[item[1]] = item[0]
# 类别字典
cursor.execute("SELECT id,name FROM amp_category_define;")
# 获取数据,生成字典
global g_category_dict
for item in cursor.fetchall():
g_category_dict[item[1]] = item[0]
# 厂家字典
cursor.execute("SELECT id,name FROM amp_service_provider;")
# 获取数据,生成字典
global g_provider_dict
for item in cursor.fetchall():
g_provider_dict[item[1]] = item[0]
# 型号字典
cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
"left join sys_dict_def b on a.dict_id = b.id "
"where b.name = 'EQ_MODEL' order by a.value;")
# 获取数据,生成字典
global g_model_dict
for item in cursor.fetchall():
g_model_dict[item[1]] = item[0]
# 规格字典
cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
"left join sys_dict_def b on a.dict_id = b.id "
"where b.name = 'EQ_SPECS' order by a.value;")
# 获取数据,生成字典
global g_specs_dict
for item in cursor.fetchall():
g_specs_dict[item[1]] = item[0]
# 评级字典
cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
"left join sys_dict_def b on a.dict_id = b.id "
"where b.name = 'EQ_LEVEL' order by a.value;")
# 获取数据,生成字典
global g_level_dict
for item in cursor.fetchall():
g_level_dict[item[1]] = item[0]
# 设施编码字典
cursor.execute("select substr(di.code,1,1), f.name from amp_facility_asset f "
"left join sys_dict_item di on f.facility_type = di.value "
"left join sys_dict_def dd on dd.id = di.dict_id "
"where f.facility_type in (1,2,3,4,5) and dd.description = '设施分类';")
# 获取数据,生成字典
global g_facility_code_dict
for item in cursor.fetchall():
g_facility_code_dict[item[1]] = item[0]
# 类别编码字典
cursor.execute("select category_code,name from amp_category_define where category_type = 240001 and pid > 0;")
# 获取数据,生成字典
global g_category_code_dict
for item in cursor.fetchall():
g_category_code_dict[item[1]] = item[0]
# 执行SQL语句
cursor.execute(f"SELECT a.id,a.name as name,f.name as f_name FROM {G_TABLE_SOURCE} a "
"left join amp_facility_asset f on a.belong_facility = f.id "
"order by f.name, a.name, a.id;")
# 获取数据
global g_device_asset_in_db
for item in cursor.fetchall():
g_device_asset_in_db.append({"id": item[0],
"name": item[1],
"f_name": item[2]})
# 获取同一设施下的设备数量
def get_dev_cnt_under_facility(cursor, f_name):
cursor.execute(f"SELECT count(*) FROM amp_device_asset where belong_facility = (select id from "
f"amp_facility_asset where name = '{f_name}');")
for item in cursor.fetchall():
return item[0]
# 读取XML文件中的所有设备台账数据
def read_xml_file(file_name, flag, fill=False):
handle = xlrd.open_workbook(file_name)
table = handle.sheet_by_index(0)
print(f"文件名:{file_name}")
print(f"总行数:{str(table.nrows)}")
print(f"总列数:{str(table.ncols)}")
facility_name = table.col_values(0)
facility_code = table.col_values(1)
col_2 = table.col_values(2)
device_name = table.col_values(3)
category_name = table.col_values(4)
device_code = table.col_values(5)
dev_level = table.col_values(6)
dispatch_no = table.col_values(7)
specs_name = table.col_values(8)
model_name = table.col_values(9)
provider_name = table.col_values(10)
build_date = table.col_values(11)
col_12 = table.col_values(12)
col_13 = table.col_values(13)
col_14 = table.col_values(14)
col_15 = table.col_values(15)
col_16 = table.col_values(16)
col_17 = table.col_values(17)
power_scope = ["" for x in range(len(facility_name))]
if not fill:
power_scope = table.col_values(18)
col_19 = table.col_values(19)
col_20 = table.col_values(20)
list_dict = []
for i in range(table.nrows):
if device_name[i] is None or len(str(device_name[i]).strip()) == 0:
if len(str(category_name[i])) > 0:
device_name[i] = category_name[i]
else:
print(f"{file_name} L: {i}")
list_dict.append({"facility_name": str(facility_name[i]).strip().replace("\n", "")
.replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
"facility_code": str(facility_code[i]).strip().replace("\n", ""),
"col_2": str(col_2[i]).strip().replace("\n", ""),
"device_name": str(device_name[i]).strip().replace("\n", ""),
"category_name": str(category_name[i]).strip().replace("\n", "")
.replace("主变压器", "变压器").replace("GIS设备", "GIS"),
"device_code": str(device_code[i]).strip().replace("\n", ""),
"dev_level": str(dev_level[i]).strip().replace("\n", ""),
"dispatch_no": str(dispatch_no[i]).strip().replace("\n", "").replace(".0", ""),
"specs_name": str(specs_name[i]).strip().replace("\n", ""),
"model_name": str(model_name[i]).strip().replace("\n", ""),
"provider_name": str(provider_name[i]).strip().replace("\n", ""),
"build_date": str(build_date[i]).strip().replace("\n", ""),
"col_12": str(col_12[i]).strip().replace("\n", ""),
"col_13": str(col_13[i]).strip().replace("\n", ""),
"col_14": str(col_14[i]).strip().replace("\n", ""),
"col_15": str(col_15[i]).strip().replace("\n", ""),
"col_16": str(col_16[i]).strip().replace("\n", ""),
"col_17": str(col_17[i]).strip().replace("\n", ""),
"power_scope": str(power_scope[i]).strip().replace("\n", "")
.replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
"col_19": str(col_19[i]).strip().replace("\n", ""),
"col_20": str(col_20[i]).strip().replace("\n", ""),
"flag": flag})
return list_dict
# 从字典中获取值
def fetch_from_dict(item, key_name, value_type, dict_name=None):
current_item_name = key_name
current_item_str = item[key_name]
try:
if value_type == "str":
return current_item_str
elif value_type == "num":
return dict_name[current_item_str] if (current_item_str in dict_name) else "null"
else:
return "null"
except KeyError as kerr:
print("[Error] when makeup sql: err=%s\n file=%s\t item_name=%s\t item_str=%s"
% (kerr, item['flag'], current_item_name, current_item_str))
item["col_20"] = f"{key_name} 在数据库中未找到定义"
g_device_error.append(item)
raise Exception("[Error] : item_name=%s\t item_str=%s" % (current_item_name, current_item_str))
# 生成SN码
def generate_sn(cursor, item):
var_category = fetch_from_dict(item, "category_name", "num", g_category_code_dict)
var_facility = fetch_from_dict(item, "facility_name", "num", g_facility_code_dict)
global g_cache_facility_name
global g_cache_device_order_in_facility
current_facility_name = item["facility_name"]
if current_facility_name != g_cache_facility_name:
g_cache_facility_name = current_facility_name
g_cache_device_order_in_facility = get_dev_cnt_under_facility(cursor, current_facility_name) + 1
else:
g_cache_device_order_in_facility += 1
sn = f"电-{var_category}-{g_cache_device_order_in_facility}-{item['flag']}{var_facility}01"
if sn.find("null") >= 0:
print("[Error] when makeup sn=%s,\t 文件=%s\t 设施名称=%s\t 设备名称=%s"
% (sn, item['flag'], item['facility_name'], item['device_name']))
if var_category is None or var_category == 'null':
item["col_20"] = f"字段 '设备类别' 在数据库中未找到定义"
if var_facility is None or var_facility == 'null':
item["col_20"] = f"字段 '设施名称' 在数据库中未找到定义"
g_device_error.append(item)
raise Exception("[Error] when makeup sn: err=%s,\t file=%s\t facility=%s\t device=%s"
% (sn, item['flag'], item['facility_name'], item['device_name']))
else:
return sn
# 将数据库中的数据和XML文件中的数据进行逐条处理
def process_data(cursor):
# 根据设施名称和设备名称,将DB数据中的device.id对应写入到XML数据中
global g_max_dev_id
for dev_in_xml in g_device_asset_in_xml:
if dev_in_xml["facility_name"] == "设施名称":
continue
for dev_in_db in g_device_asset_in_db:
if not dev_in_xml["device_name"] is None and not dev_in_db["name"] is None \
and dev_in_xml["device_name"] == dev_in_db["name"] \
and not dev_in_xml["facility_name"] is None and not dev_in_db["f_name"] is None \
and dev_in_xml["facility_name"] == dev_in_db["f_name"]:
dev_in_xml["id"] = dev_in_db["id"]
g_device_asset_in_db.remove(dev_in_db)
if dev_in_db["id"] > g_max_dev_id:
g_max_dev_id = dev_in_db["id"]
print("=======break:01")
break
# SQL语句
device_sql_list = []
# 数据库中有,且XML文件中没有的设备ID,存入列表g_device_asset_in_db_only中
for dev_in_db in g_device_asset_in_db:
sql_str = f"delete from {G_TABLE_TARGET} where id = {dev_in_db['id']};"
g_device_to_delete.append(sql_str)
device_sql_list.extend(g_device_to_delete)
# XML文件中有,且数据库中没有的设备(ID属性为空),为其分配ID
global g_device_error
for dev_in_xml in g_device_asset_in_xml:
if dev_in_xml["facility_name"] == "设施名称":
continue
"""
if 'id' in dev_in_xml:
print("=======pass")
pass
sql_str = f"update {G_TABLE_TARGET} set " \
f"dispatch_no = '{dev_in_xml['dispatch_no']}', " \
f"power_scope_text='{dev_in_xml['power_scope']}' " \
f"where id = {dev_in_xml['id']}; "
g_device_to_update.append(sql_str)
device_sql_list.extend(g_device_to_update)
else:
"""
g_max_dev_id += 1
dev_in_xml["id"] = g_max_dev_id
try:
var_device = fetch_from_dict(dev_in_xml, "device_name", "str")
var_qr = fetch_from_dict(dev_in_xml, "device_code", "str")
var_facility = fetch_from_dict(dev_in_xml, "facility_name", "num", g_facility_dict)
var_provider = fetch_from_dict(dev_in_xml, "provider_name", "num", g_provider_dict)
var_category = fetch_from_dict(dev_in_xml, "category_name", "num", g_category_dict)
var_model = fetch_from_dict(dev_in_xml, "model_name", "num", g_model_dict)
var_specs = fetch_from_dict(dev_in_xml, "specs_name", "num", g_specs_dict)
var_dispatch = fetch_from_dict(dev_in_xml, "dispatch_no", "str")
var_power = fetch_from_dict(dev_in_xml, "power_scope", "str")
var_level = fetch_from_dict(dev_in_xml, "dev_level", "num", g_level_dict)
var_build = fetch_from_dict(dev_in_xml, "build_date", "str")
var_sn = generate_sn(cursor, dev_in_xml)
sql_str = f"insert into {G_TABLE_TARGET}(id, name, belong_facility, sn, mng_user_id, live_state, " \
f"model, specs, category, manufacture, dispatch_no, power_scope_text, "\
f"level, build_date, qr_code) values" \
f"({g_max_dev_id},'{var_device}',{var_facility},'{var_sn}@{dev_in_xml['facility_code']}',1,1," \
f"{var_model},{var_specs},{var_category},{var_provider},'{var_dispatch}','{var_power}',"\
f"{var_level},'{var_build}','{var_qr}');"
except Exception as err:
print(err)
continue
g_device_to_insert.append(sql_str)
device_sql_list.extend(g_device_to_insert)
return device_sql_list
# 写文本文件
def write_sql_file(file_name, str_list):
try:
with open(file_name, 'w', encoding='utf-8') as sql_file:
for sql_str in str_list:
sql_file.write(str(sql_str) + "\n")
except IOError as err:
print(f'[Error] when write sql to file: {err}')
# 写Excel文件
def write_xml_file(file_name, str_list=[]):
handle = xlwt.Workbook()
sheet = handle.add_sheet('error_data')
title = ['设施名称', '设施编码', '设施类别', '设备名称', '设备类别', '设备编号', '设备评级', '调度号', '设备规格', '设备型号',
'制造厂家', '投运日期', '出厂编号', '资产所属', '电源接引', '接入方式', '供电方式', '设备分类', '供电范围', '主要参数', '备注']
for i in range(len(title)):
sheet.write(0, i, title[i])
for i in range(len(str_list)):
dict_cell = list(str_list[i].values())
for j in range(len(dict_cell)):
if j == len(dict_cell)-2:
break
sheet.write(i+1, j, dict_cell[j])
handle.save(file_name)
"""
# 将表格写入Excel
def export_excel(export):
# 将字典列表转换为DataFrame
pf = pd.DataFrame(list(export))
# 指定字段顺序
order = ['file', 'facility_name', 'device_name', 'sn']
pf = pf[order]
# 将列名替换为中文
columns_map = {
'file': '文件',
'facility_name': '设施名称',
'device_name': '设备名称',
'sn': 'sn'
}
pf.rename(columns=columns_map, inplace=True)
# 指定生成的Excel表格名称
file_path = pd.ExcelWriter('error.xlsx')
# 替换空单元格
pf.fillna(' ', inplace=True)
# 输出
pf.to_excel(file_path, encoding='utf-8', index=False)
# 保存表格
file_path.save()
"""
def print_hi(name):
# Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
# 处理一个文件
def process_file(file_name, cursor, flag):
# 从XML文件中读取设备台账信息:name,f_name,dispatch_no,power_scope
make_clean()
global g_device_asset_in_xml
g_device_asset_in_xml = read_xml_file(file_name, flag)
g_device_asset_in_xml = sorted(g_device_asset_in_xml, key=lambda r: r['facility_name'])
process_data(cursor)
write_sql_file(f"output/sql/{flag}-delete.sql", g_device_to_delete)
write_sql_file(f"output/sql/{flag}-insert.sql", g_device_to_insert)
write_sql_file(f"output/sql/{flag}-update.sql", g_device_to_update)
write_xml_file(f"output/errors/{flag}-errors.xls", g_device_error)
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
# print_hi('PyCharm')
# 从数据库中读取设备台账信息:id,name,f_name
g_conn, g_curs = connect_db()
read_from_db(g_curs)
print(f"find {len(g_device_asset_in_db)} device in db")
global g_max_dev_id
"""
# 第一批用此代码
g_max_dev_id = 0
process_file('input/S-P1.xls', g_curs, 'S')
process_file('input/N-P1.xls', g_curs, 'N')
process_file('input/D-P1.xls', g_curs, 'D')
process_file('input/L-P1.xls', g_curs, 'L')
"""
# 第二批用此代码
g_max_dev_id = 9404
process_file('input/feedback/S-P2.xls', g_curs, 'S')
process_file('input/feedback/N-P2.xls', g_curs, 'N')
process_file('input/feedback/D-P2.xls', g_curs, 'D')
process_file('input/feedback/L-P2', g_curs, 'L')
disconnect_db(g_conn, g_curs)
# See PyCharm help at https://www.jetbrains.com/help/pycharm/