原代码是python2的,经过简单调试后现在为python3版本。
直接上代码:
import socket
import sys
import time
import struct
from collections import OrderedDict
class Banner:
def __init__(self):
self.__banner = OrderedDict(
[('version', 0),
('length', 0),
('pid', 0),
('realWidth', 0),
('realHeight', 0),
('virtualWidth', 0),
('virtualHeight', 0),
('orientation', 0),
('quirks', 0)
])
def __setitem__(self, key, value):
self.__banner[key] = value
def __getitem__(self, key):
return self.__banner[key]
def keys(self):
return self.__banner.keys()
def __str__(self):
return str(self.__banner)
class Minicap:
def __init__(self, host, port, banner):
self.buffer_size = 4096
self.host = host
self.port = port
self.banner = banner
def connect(self):
try:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except (socket.error) as e:
print(e)
sys.exit(1)
self.__socket.connect((self.host, self.port))
def on_image_transfered(self, data):
file_name = str(time.time()) + '.jpg'
with open(file_name, 'wb') as f:
for b in data:
f.write((b).to_bytes(1,'big'))
def consume(self):
readBannerBytes = 0
bannerLength = 24
readFrameBytes = 0
frameBodyLength = 0
data = []
while True:
try:
chunk = self.__socket.recv(self.buffer_size)
except (socket.error) as e:
print(e)
sys.exit(1)
cursor = 0
buf_len = len(chunk)
while cursor < buf_len:
if readBannerBytes < bannerLength:
map(lambda i, val: self.banner.__setitem__(self.banner.keys()[i], val),
[i for i in range(len(self.banner.keys()))], struct.unpack("<2b5ibB", chunk))
cursor = buf_len
readBannerBytes = bannerLength
print(self.banner)
elif readFrameBytes < 4:
print(struct.unpack('B', (chunk[cursor]).to_bytes(1,'big')))
# frameBodyLength += (struct.unpack('B', chunk[cursor])[0] << (readFrameBytes * 8)) >> 0
frameBodyLength += (chunk[cursor] << (readFrameBytes * 8)) >> 0
cursor += 1
readFrameBytes += 1
else:
print("frame length:{0} buf_len:{1} cursor:{2}".format(frameBodyLength, buf_len, cursor))
# pic end
if buf_len - cursor >= frameBodyLength:
data.extend(chunk[cursor:cursor + frameBodyLength])
self.on_image_transfered(data)
cursor += frameBodyLength
frameBodyLength = readFrameBytes = 0
data = []
else:
data.extend(chunk[cursor:buf_len])
frameBodyLength -= buf_len - cursor
readFrameBytes += buf_len - cursor
cursor = buf_len
if '__main__' == __name__:
mc = Minicap('localhost', 1717, Banner())
mc.connect()
mc.consume()
启动服务之前需要保证的条件:
启动后当手机端有操作时,会在当前目录下载minicap发过来的截图。再添加一个websocket服务就可以支持web端来实时查看手机界面了。