每次在一个临时测试点此测试一大片服务器的延迟和丢包, 一个个去跑太蛋疼, 于是用PyQt做了这么个小工具来测试各种线路的延迟丢包等信息.
截图:
这个工具我已经初步实现了跨平台(在以上截图环境下运行正常), 在编写过程中, 我有如下的收获:
注: 代码中已经内置了一份测试IP列表, 可以根据需要添加/删除. 第一次运行会生成 ips.conf 文件, 以后需要修改IP列表, 只需要编辑此文件.
再注: 这个程序写的确实很丑, 欢迎各种拍砖 = =
下面贴上全部的代码:
# -*- coding: utf-8 -*-
import sys
import time
import subprocess
from threading import Thread
import re
from PyQt4.QtGui import QMainWindow, QApplication, QStandardItemModel, QStandardItem, QWidget, QVBoxLayout, QTableView
from PyQt4.QtCore import pyqtSignature, Qt, QTimer, SIGNAL, QString, QMetaObject
from Queue import Queue
#from Ui_main import Ui_MainWindow
try:
_fromUtf8 = QString.fromUtf8
except AttributeError:
_fromUtf8 = lambda s: s
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName(_fromUtf8("MainWindow"))
MainWindow.resize(500, 435)
self.centralWidget = QWidget(MainWindow)
self.centralWidget.setObjectName(_fromUtf8("centralWidget"))
self.verticalLayout = QVBoxLayout(self.centralWidget)
self.verticalLayout.setObjectName(_fromUtf8("verticalLayout"))
self.tableView = QTableView(self.centralWidget)
self.tableView.setObjectName(_fromUtf8("tableView"))
self.verticalLayout.addWidget(self.tableView)
MainWindow.setCentralWidget(self.centralWidget)
self.retranslateUi(MainWindow)
QMetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
MainWindow.setWindowTitle(QApplication.translate("MainWindow", "Ping Tester", None, QApplication.UnicodeUTF8))
if sys.platform.startswith('linux'):
getdata = re.compile(r"icmp_req=(\d+) ttl=(\d+) time=([\d\.]+)\sms")
pingstr = ["ping", "-n", "-i 0.2"]
filtered = "Packet filtered"
delaytime = 200
else:
getdata = re.compile(r"=([\d\.]+)ms TTL=(\d+)")
pingstr = ["ping.exe", "-t"]
timeout = "Request timed out."
delaytime = 500
try:
with open("ips.conf", "r") as f:
t_node = f.read().decode('utf-8')
if not t_node:
raise IOError
except IOError:
with open("ips.conf", "w") as f:
t_node = u"""
210.51.176.182-北京联通BGP
202.97.224.68-黑龙江网通
202.99.96.68-天津网通
202.99.160.68-河北网通
202.96.69.38-大连网通
221.208.172.1-哈尔滨网通
202.99.192.68-山西网通
202.99.160.68-河北网通
210.52.149.2-湖北网通
218.30.64.121-湖南电信
202.100.4.15-陕西电信
202.96.199.133-上海电信
219.150.150.150-河南电信
219.146.0.130-黑龙江电信
202.98.96.68-四川电信
59.66.4.50-北京教育网
202.112.26.246-上海教育网
202.114.0.254-武汉教育网
211.161.159.9-武汉长城宽带
143.90.12.170-日本东京odn
202.160.123.5-新加坡Host1Plus
60.199.17.138-台湾固网
220.128.152.1-台湾HINET
168.95.1.1-台湾中华电信
202.65.207.1-香港DYX
202.181.171.1-香港HKCIX
203.169.186.1-香港NTT.HKNET
59.148.193.38-香港HKCTI
220.232.214.1-香港Supernet
59.152.208.1-香港WTT
210.56.48.1-香港HKSUN
202.76.56.1-香港CPCNET
218.213.250.130-香港SNL
69.162.132.1-美国芝加哥Level3
205.185.112.131-美国佛里蒙特HE
204.152.221.1-美国洛杉矶nLayer
178.63.62.208-德国Nuremburg
94.23.202.83-法国OVH
69.163.43.73-美国波特兰HE
216.189.1.14-美国RockHill
69.172.231.1-美国洛杉矶Peer1
184.22.112.34-美国迈阿密HE
199.48.146.37-美国圣琼斯
216.245.208.1-美国达拉斯
109.74.207.9-英国伦敦
"""
f.write(t_node.encode('utf-8'))
node = []
for line in t_node.split('\n'):
try:
ip, desc = line.strip().split("-")
node.append((ip, desc))
except ValueError:
pass
nodecount = len(node)
class MainWindow(QMainWindow, Ui_MainWindow):
"""
Class documentation goes here.
"""
def __init__(self, parent = None):
"""
Constructor
"""
QMainWindow.__init__(self, parent)
self.setupUi(self)
self.model = QStandardItemModel()
self.model.setColumnCount(6)
self.model.setRowCount(nodecount)
self.model.setHorizontalHeaderLabels(["IP", "Description", "Loss%", "CurPing", "AvgPing", "TTL"])
for i, (ip, desc) in enumerate(node):
self.setitem(i, 0, ip)
self.setitem(i, 1, desc)
self.setitem(i, 2, "")
self.setitem(i, 3, "")
self.setitem(i, 4, "")
self.setitem(i, 5, "")
self.tableView.setModel(self.model)
for i in range(len(node)):
self.tableView.setRowHeight(i, 18)
self.resizetable()
self.timer = QTimer(self)
self.connect(self.timer,
SIGNAL("timeout()"),
self.checkitems)
self.timer.start(delaytime)
def checkitems(self):
while not q.empty():
item = q.get()
self.chgtxt(*item)
q.task_done()
self.resizetable()
def resizetable(self):
self.tableView.resizeColumnsToContents()
def chgtxt(self, x, y, value):
self.model.item(x, y).setText(value)
def setitem(self, x, y, value):
self.model.setItem(x, y, QStandardItem(value))
app = QApplication(sys.argv)
ui = MainWindow()
ui.show()
q = Queue()
def pinger(i, ip, desc):
s = ""
avgping = 0
count = 0
timeoutcount = 0
ret = subprocess.Popen(pingstr + [ip],
stdout=subprocess.PIPE)
while True:
try:
s += ret.stdout.read(1)
tryfind = getdata.findall(s)
if sys.platform.startswith('linux'):
if len(tryfind) > 0:
req, ttl, crtping = tryfind[-1]
avgping += float(crtping)
count += 1
q.put((i, 3, crtping + "ms"))
q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
q.put((i, 5, ttl))
q.put((i, 2, "%.2f" % ((int(req) - count) * 100.0 / int(req))))
s = ""
elif filtered in s:
q.put((i, 2, "Failed"))
q.put((i, 3, "Failed"))
q.put((i, 4, "Failed"))
q.put((i, 5, "Failed"))
ret.kill()
s = ""
else:
if len(tryfind) > 0:
crtping, ttl = tryfind[-1]
avgping += float(crtping)
count += 1
q.put((i, 3, crtping + "ms"))
q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
q.put((i, 5, ttl))
q.put((i, 2, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
elif timeout in s:
timeoutcount += 1
q.put((i, 2, "-"))
q.put((i, 3, "-"))
if count:
q.put((i, 5, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
else:
q.put((i, 5, "-"))
s = ""
except IOError:
print s
break
def startworkers():
for i, (ip, desc) in enumerate(node):
worker = Thread(target=pinger, args=(i, ip, desc))
worker.setDaemon(True)
worker.start()
time.sleep(delaytime / 10000.0)
startthread = Thread(target=startworkers)
startthread.setDaemon(True)
startthread.start()
sys.exit(app.exec_())