2025年3月29日 星期六 甲辰(龙)年 月廿八 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

PyQt作品 – PingTester – 多点Ping测试工具

时间:11-14来源:作者:点击数:41

每次在一个临时测试点此测试一大片服务器的延迟和丢包, 一个个去跑太蛋疼, 于是用PyQt做了这么个小工具来测试各种线路的延迟丢包等信息.

截图:


(Archlinux / KDE4 环境下)

(Windows XP)

这个工具我已经初步实现了跨平台(在以上截图环境下运行正常), 在编写过程中, 我有如下的收获:

  • 不可以在子线程中直接操作UI, 以免引起资源冲突导致Segmentation Fault
  • 使用Queue类可以初步实现子线程与UI线程更新界面的通信. Signal方面, 我实例了一个QTimer, 每隔一定时间处理一次消息队列.
  • QTableView比QTableWidget效率高得多, 尤其是在Win32平台下. 因此应尽量采用 QTableView + QStandardItemModel 的搭配来做Table

注: 代码中已经内置了一份测试IP列表, 可以根据需要添加/删除. 第一次运行会生成 ips.conf 文件, 以后需要修改IP列表, 只需要编辑此文件.

再注: 这个程序写的确实很丑, 欢迎各种拍砖 = =

下面贴上全部的代码:

  • # -*- coding: utf-8 -*-
  • import sys
  • import time
  • import subprocess
  • from threading import Thread
  • import re
  • from PyQt4.QtGui import QMainWindow, QApplication, QStandardItemModel, QStandardItem, QWidget, QVBoxLayout, QTableView
  • from PyQt4.QtCore import pyqtSignature, Qt, QTimer, SIGNAL, QString, QMetaObject
  • from Queue import Queue
  • #from Ui_main import Ui_MainWindow
  • try:
  • _fromUtf8 = QString.fromUtf8
  • except AttributeError:
  • _fromUtf8 = lambda s: s
  • class Ui_MainWindow(object):
  • def setupUi(self, MainWindow):
  • MainWindow.setObjectName(_fromUtf8("MainWindow"))
  • MainWindow.resize(500, 435)
  • self.centralWidget = QWidget(MainWindow)
  • self.centralWidget.setObjectName(_fromUtf8("centralWidget"))
  • self.verticalLayout = QVBoxLayout(self.centralWidget)
  • self.verticalLayout.setObjectName(_fromUtf8("verticalLayout"))
  • self.tableView = QTableView(self.centralWidget)
  • self.tableView.setObjectName(_fromUtf8("tableView"))
  • self.verticalLayout.addWidget(self.tableView)
  • MainWindow.setCentralWidget(self.centralWidget)
  • self.retranslateUi(MainWindow)
  • QMetaObject.connectSlotsByName(MainWindow)
  • def retranslateUi(self, MainWindow):
  • MainWindow.setWindowTitle(QApplication.translate("MainWindow", "Ping Tester", None, QApplication.UnicodeUTF8))
  • if sys.platform.startswith('linux'):
  • getdata = re.compile(r"icmp_req=(\d+) ttl=(\d+) time=([\d\.]+)\sms")
  • pingstr = ["ping", "-n", "-i 0.2"]
  • filtered = "Packet filtered"
  • delaytime = 200
  • else:
  • getdata = re.compile(r"=([\d\.]+)ms TTL=(\d+)")
  • pingstr = ["ping.exe", "-t"]
  • timeout = "Request timed out."
  • delaytime = 500
  • try:
  • with open("ips.conf", "r") as f:
  • t_node = f.read().decode('utf-8')
  • if not t_node:
  • raise IOError
  • except IOError:
  • with open("ips.conf", "w") as f:
  • t_node = u"""
  • 210.51.176.182-北京联通BGP
  • 202.97.224.68-黑龙江网通
  • 202.99.96.68-天津网通
  • 202.99.160.68-河北网通
  • 202.96.69.38-大连网通
  • 221.208.172.1-哈尔滨网通
  • 202.99.192.68-山西网通
  • 202.99.160.68-河北网通
  • 210.52.149.2-湖北网通
  • 218.30.64.121-湖南电信
  • 202.100.4.15-陕西电信
  • 202.96.199.133-上海电信
  • 219.150.150.150-河南电信
  • 219.146.0.130-黑龙江电信
  • 202.98.96.68-四川电信
  • 59.66.4.50-北京教育网
  • 202.112.26.246-上海教育网
  • 202.114.0.254-武汉教育网
  • 211.161.159.9-武汉长城宽带
  • 143.90.12.170-日本东京odn
  • 202.160.123.5-新加坡Host1Plus
  • 60.199.17.138-台湾固网
  • 220.128.152.1-台湾HINET
  • 168.95.1.1-台湾中华电信
  • 202.65.207.1-香港DYX
  • 202.181.171.1-香港HKCIX
  • 203.169.186.1-香港NTT.HKNET
  • 59.148.193.38-香港HKCTI
  • 220.232.214.1-香港Supernet
  • 59.152.208.1-香港WTT
  • 210.56.48.1-香港HKSUN
  • 202.76.56.1-香港CPCNET
  • 218.213.250.130-香港SNL
  • 69.162.132.1-美国芝加哥Level3
  • 205.185.112.131-美国佛里蒙特HE
  • 204.152.221.1-美国洛杉矶nLayer
  • 178.63.62.208-德国Nuremburg
  • 94.23.202.83-法国OVH
  • 69.163.43.73-美国波特兰HE
  • 216.189.1.14-美国RockHill
  • 69.172.231.1-美国洛杉矶Peer1
  • 184.22.112.34-美国迈阿密HE
  • 199.48.146.37-美国圣琼斯
  • 216.245.208.1-美国达拉斯
  • 109.74.207.9-英国伦敦
  • """
  • f.write(t_node.encode('utf-8'))
  • node = []
  • for line in t_node.split('\n'):
  • try:
  • ip, desc = line.strip().split("-")
  • node.append((ip, desc))
  • except ValueError:
  • pass
  • nodecount = len(node)
  • class MainWindow(QMainWindow, Ui_MainWindow):
  • """
  • Class documentation goes here.
  • """
  • def __init__(self, parent = None):
  • """
  • Constructor
  • """
  • QMainWindow.__init__(self, parent)
  • self.setupUi(self)
  • self.model = QStandardItemModel()
  • self.model.setColumnCount(6)
  • self.model.setRowCount(nodecount)
  • self.model.setHorizontalHeaderLabels(["IP", "Description", "Loss%", "CurPing", "AvgPing", "TTL"])
  • for i, (ip, desc) in enumerate(node):
  • self.setitem(i, 0, ip)
  • self.setitem(i, 1, desc)
  • self.setitem(i, 2, "")
  • self.setitem(i, 3, "")
  • self.setitem(i, 4, "")
  • self.setitem(i, 5, "")
  • self.tableView.setModel(self.model)
  • for i in range(len(node)):
  • self.tableView.setRowHeight(i, 18)
  • self.resizetable()
  • self.timer = QTimer(self)
  • self.connect(self.timer,
  • SIGNAL("timeout()"),
  • self.checkitems)
  • self.timer.start(delaytime)
  • def checkitems(self):
  • while not q.empty():
  • item = q.get()
  • self.chgtxt(*item)
  • q.task_done()
  • self.resizetable()
  • def resizetable(self):
  • self.tableView.resizeColumnsToContents()
  • def chgtxt(self, x, y, value):
  • self.model.item(x, y).setText(value)
  • def setitem(self, x, y, value):
  • self.model.setItem(x, y, QStandardItem(value))
  • app = QApplication(sys.argv)
  • ui = MainWindow()
  • ui.show()
  • q = Queue()
  • def pinger(i, ip, desc):
  • s = ""
  • avgping = 0
  • count = 0
  • timeoutcount = 0
  • ret = subprocess.Popen(pingstr + [ip],
  • stdout=subprocess.PIPE)
  • while True:
  • try:
  • s += ret.stdout.read(1)
  • tryfind = getdata.findall(s)
  • if sys.platform.startswith('linux'):
  • if len(tryfind) > 0:
  • req, ttl, crtping = tryfind[-1]
  • avgping += float(crtping)
  • count += 1
  • q.put((i, 3, crtping + "ms"))
  • q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
  • q.put((i, 5, ttl))
  • q.put((i, 2, "%.2f" % ((int(req) - count) * 100.0 / int(req))))
  • s = ""
  • elif filtered in s:
  • q.put((i, 2, "Failed"))
  • q.put((i, 3, "Failed"))
  • q.put((i, 4, "Failed"))
  • q.put((i, 5, "Failed"))
  • ret.kill()
  • s = ""
  • else:
  • if len(tryfind) > 0:
  • crtping, ttl = tryfind[-1]
  • avgping += float(crtping)
  • count += 1
  • q.put((i, 3, crtping + "ms"))
  • q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
  • q.put((i, 5, ttl))
  • q.put((i, 2, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
  • elif timeout in s:
  • timeoutcount += 1
  • q.put((i, 2, "-"))
  • q.put((i, 3, "-"))
  • if count:
  • q.put((i, 5, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
  • else:
  • q.put((i, 5, "-"))
  • s = ""
  • except IOError:
  • print s
  • break
  • def startworkers():
  • for i, (ip, desc) in enumerate(node):
  • worker = Thread(target=pinger, args=(i, ip, desc))
  • worker.setDaemon(True)
  • worker.start()
  • time.sleep(delaytime / 10000.0)
  • startthread = Thread(target=startworkers)
  • startthread.setDaemon(True)
  • startthread.start()
  • sys.exit(app.exec_())
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门