2025年2月24日 星期一 甲辰(龙)年 腊月廿四 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

python 使用spyne发布webservice 使用suds调用webservice示例

时间:01-18来源:作者:点击数:22

python 使用spyne发布webservice 使用suds调用webservice示例

  • import logging
  • import socket
  • import json
  • import suds
  • try:
  • import xml.etree.cElementTree as ET
  • except ImportError:
  • import xml.etree.ElementTree as ET
  • from suds.client import Client
  • from suds.sax.element import Element
  • from suds.sax.attribute import Attribute
  • from spyne.application import Application
  • from spyne.decorator import rpc
  • from spyne import ServiceBase, Iterable, Integer, Unicode
  • from spyne.protocol.soap import Soap11, Soap12
  • from spyne.server.wsgi import WsgiApplication
  • from wsgiref.simple_server import make_server
  • from xml.dom.minidom import Document
  • class HimapWSManager:
  • __himapwsurl = 'http://127.0.0.1:8080/himapws/CallServicePort?wsdl'
  • __timeout = 30
  • def __CallService(self, sProcessName, sRequestXml):
  • client = Client(self.__himapwsurl, timeout=self.__timeout)
  • #logging.info('调用流程:%s,请求包:%s' % (sProcessName, sRequestXml))
  • result = client.service.callService("", sProcessName, sRequestXml, "admin", "fr4t3t1y6s6s1e0502c4d5")
  • #logging.info('调用流程:%s,接收包:%s' % (sProcessName, result))
  • return result
  • def __TextIsNull(self, textValue, defaultValue=''):
  • return textValue if textValue else defaultValue
  • def __AddRequestChild(self, doc, parent, child_name, child_value):
  • text = doc.createTextNode(child_value)
  • child_node = doc.createElement(child_name)
  • child_node.appendChild(text)
  • parent.appendChild(child_node)
  • def __ParseChargingDetailInterChangeXml(self, responsexml):
  • try:
  • root = ET.fromstring(responsexml)
  • except BaseException as e:
  • raise BaseException(e.message)
  • response = {}
  • returncode = root.find('RETURNCODE').text
  • codetext = root.find('CODETEXT').text
  • if returncode != 'SUCCEED':
  • response['Success'] = '0'
  • response['ErrorMessage'] = self.__TextIsNull(codetext, '失败')
  • response['InvoiceNumber'] = ''
  • response['ChargeDate'] = ''
  • else:
  • response['Success'] = '1'
  • response['ErrorMessage'] = self.__TextIsNull(codetext, '成功')
  • response['InvoiceNumber'] = self.__TextIsNull(root.find('response/InvoiceNumber').text)
  • response['ChargeDate'] = self.__TextIsNull(root.find('response/ChargeDate').text)
  • return response
  • def __ParseSynchronizeChargingDetailXml(self, responsexml):
  • try:
  • root = ET.fromstring(responsexml)
  • except BaseException as e:
  • raise BaseException(e.message)
  • response = []
  • returncode = root.find('RETURNCODE').text
  • codetext = root.find('CODETEXT').text
  • if returncode != 'SUCCEED':
  • record = {}
  • record['Success'] = '0'
  • record['ErrorMessage'] = self.__TextIsNull(codetext, '失败')
  • response.append(record)
  • else:
  • for Item in root.findall('response'):
  • record = {}
  • record['DetailedChargesId'] = self.__TextIsNull(Item.find('DetailedChargesId').text)
  • record['HospCode'] = self.__TextIsNull(Item.find('HospCode').text)
  • record['His_Keyno'] = self.__TextIsNull(Item.find('His_Keyno').text)
  • record['ChargeCode'] = self.__TextIsNull(Item.find('ChargeCode').text)
  • record['Success'] = self.__TextIsNull(Item.find('Success').text)
  • record['ErrorMessage'] = self.__TextIsNull(Item.find('ErrorMessage').text)
  • record['InvoiceNumber'] = self.__TextIsNull(Item.find('InvoiceNumber').text)
  • record['ChargeDate'] = self.__TextIsNull(Item.find('ChargeDate').text)
  • response.append(record)
  • return response
  • def CallChargingDetailInterChange(self, sHisId, sApplyNumber, sChargeJson, sApplyJson):
  • logging.info('收费请求,HisId:%s,ApplyNumber:%s' % (sHisId, sApplyNumber))
  • doc = Document() # 创建DOM文档对象
  • root = doc.createElement('request') # 创建根元素
  • doc.appendChild(root) # 根节点插入doc树
  • self.__AddRequestChild(doc, root, 'sHisId', sHisId)
  • self.__AddRequestChild(doc, root, 'sApplyNumber', sApplyNumber)
  • self.__AddRequestChild(doc, root, 'sChargeJson', sChargeJson)
  • self.__AddRequestChild(doc, root, 'sApplyJson', sApplyJson)
  • docxml = doc.toprettyxml()
  • requestxml = docxml.replace('<?xml version="1.0" ?>', '')
  • # requestxml = '<request><sHisId>?</sHisId><sApplyNumber>?</sApplyNumber><sChargeJson>?</sChargeJson><sApplyJson>?</sApplyJson></request>'
  • responsexml = self.__CallService('PROC_PA_CENTER_CHARGE', requestxml)
  • responsejson = json.dumps(self.__ParseChargingDetailInterChangeXml(responsexml), ensure_ascii=False)
  • logging.info('收费返回,HisId:%s,ApplyNumber:%s,JsonStr:%s' % (sHisId, sApplyNumber, responsejson))
  • return responsejson
  • def CallSynchronizeChargingDetail(self, sHisId, sApplyNumber, sChargeJson):
  • logging.info('同步请求,HisId:%s,ApplyNumber:%s' % (sHisId, sApplyNumber))
  • doc = Document() # 创建DOM文档对象
  • root = doc.createElement('request') # 创建根元素
  • doc.appendChild(root) # 根节点插入doc树
  • self.__AddRequestChild(doc, root, 'sHisId', sHisId)
  • self.__AddRequestChild(doc, root, 'sApplyNumber', sApplyNumber)
  • self.__AddRequestChild(doc, root, 'sChargeJson', sChargeJson)
  • docxml = doc.toprettyxml()
  • requestxml = docxml.replace('<?xml version="1.0" ?>', '')
  • # requestxml = '<request><sHisId>?</sHisId><sApplyNumber>?</sApplyNumber><sChargeJson>?</sChargeJson></request>'
  • responsexml = self.__CallService('PROC_PA_CENTER_CHARGE_DETAIL', requestxml)
  • responsejson = json.dumps(self.__ParseSynchronizeChargingDetailXml(responsexml), ensure_ascii=False)
  • logging.info('同步返回,HisId:%s,ApplyNumber:%s,JsonStr:%s' % (sHisId, sApplyNumber, responsejson))
  • return responsejson
  • class HimapWSProvider(ServiceBase):
  • @rpc(Unicode, Unicode, Unicode, Unicode, _returns=Unicode)
  • def ChargingDetailInterChange(self, sHisId, sApplyNumber, sChargeJson, sApplyJson):
  • himapwsmanager = HimapWSManager()
  • return himapwsmanager.CallChargingDetailInterChange(sHisId, sApplyNumber, sChargeJson, sApplyJson)
  • @rpc(Unicode, Unicode, Unicode, _returns=Unicode)
  • def SynchronizeChargingDetail(self, sHisId, sApplyNumber, sChargeJson):
  • himapwsmanager = HimapWSManager()
  • return himapwsmanager.CallSynchronizeChargingDetail(sHisId, sApplyNumber, sChargeJson)
  • application = Application([HimapWSProvider], 'http://schemas.xmlsoap.org/soap/envelope', in_protocol=Soap11(validator='lxml'), out_protocol=Soap11())
  • wsgi_application = WsgiApplication(application)
  • if __name__ == '__main__':
  • # result = ParseChargingDetailInterChangeXml(xml_charge)
  • # print(result)
  • logging.basicConfig(level=logging.INFO,format='%(asctime)s %(levelname)s:%(message)s')
  • logging.getLogger('spyne.protocol.xml').setLevel(logging.INFO)
  • hostname = socket.gethostname()
  • ip = socket.gethostbyname(hostname)
  • port = 8081
  • logging.info('wsdl service url: http://%s:%d/HimapWSProvider?wsdl' % (ip, port))
  • server = make_server(ip, port, wsgi_application)
  • logging.info("service startup successfully")
  • server.serve_forever()
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门