开心聊聊天信息爬取
# _*_ coding:UTF-8
import time
from selenium.webdriver.support.ui import WebDriverWait
from appium import webdriver
import re
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from lxml import etree
class kaixinliao(object):
def __init__(self):
# self.db = pymysql.connect(host='localhost', port=3306, database='cfda', user='root', password='root',
self.db = pymysql.connect(host='47.98.163.18', port=3306, database='cfda', user='root', password='root',
charset='utf8')
self.cursor = self.db.cursor()
# 初始化参数
self.desired_caps = {}
self.desired_caps['platformName'] = 'Android'
self.desired_caps['deviceName'] = '127.0.0.1:62001' # 夜神模拟器
# self.desired_caps['deviceName'] = 'MXF5T15C24001273' # 华为手机
self.desired_caps['platformVersion'] = '5.1.1' # 夜神模拟器
# self.desired_caps['platformVersion'] = '6.0' # 华为手机
self.desired_caps['appPackage'] = 'cn.rongcloud.kaixinliao'
self.desired_caps['appActivity'] = 'cn.rongcloud.kaixinliao.ui.activity.SplashActivity'
self.desired_caps['noReset'] = True
self.desired_caps['unicodeKeyboard'] = True
self.desired_caps['resetKeyboard'] = True
self.driver = webdriver.Remote('http://192.168.31.39:4723/wd/hub', self.desired_caps)
time.sleep(10)
# 邮箱设置
# 设置发件服务器地址
self.host = 'smtp.163.com'
# 设置发件服务器端口号。注意,这里有SSL和非SSL两种形式
self.port = 465
# 设置发件邮箱,一定要自己注册的邮箱
self.sender = '17682303516@163.com'
# 设置发件邮箱的密码,163邮箱的授权码,等会登陆会用到
self.pwd = 'yu17682303516'
# 设置邮件接收人,可以是扣扣邮箱
self.receiver0 = '1064145110@qq.com'
# self.receiver1 = '54400407@qq.com'
self.main()
def get_size(self):
x = self.driver.get_window_size()['width']
y = self.driver.get_window_size()['height']
return (x, y)
def login(self):
# 登入
size = self.get_size()
for i in range(2):
x1 = int(size[0] * 0.1)
x2 = int(size[0] * 0.9)
y = int(size[1] * 0.5)
self.driver.swipe(x2, y, x1, y, 1000) # 1000毫秒内完成滑动
time.sleep(3)
if WebDriverWait(self.driver, 3).until(lambda x: x.find_element_by_xpath(
"//android.widget.RelativeLayout[@resource-id='cn.rongcloud.kaixinliao:id/guide_go_home_tv']")):
self.driver.find_element_by_xpath(
"//android.widget.RelativeLayout[@resource-id='cn.rongcloud.kaixinliao:id/guide_go_home_tv']").click()
time.sleep(3)
if WebDriverWait(self.driver, 3).until(lambda x: x.find_element_by_xpath(
"//android.widget.EditText[@resource-id='cn.rongcloud.kaixinliao:id/de_login_phone']")):
self.driver.find_element_by_xpath(
"//android.widget.EditText[@resource-id='cn.rongcloud.kaixinliao:id/de_login_phone']").send_keys(
'17682303516')
# '18513031949')
time.sleep(1)
self.driver.tap([(108, 233), (128, 253)], 500) # 点击500毫秒
time.sleep(1)
self.driver.find_element_by_xpath(
"//android.widget.EditText[@resource-id='cn.rongcloud.kaixinliao:id/de_login_password']").send_keys(
'yu106414511')
time.sleep(1)
self.driver.tap([(108, 233), (128, 253)], 500) # 点击500毫秒
time.sleep(1)
self.driver.find_element_by_xpath(
"//android.widget.Button[@resource-id='cn.rongcloud.kaixinliao:id/de_login_sign']").click()
time.sleep(10)
def permission(self):
# app申请手机权限
for i in range(2):
# self.driver.find_element_by_xpath(
# "//android.widget.CheckBox[@resource-id='com.android.packageinstaller:id/do_not_ask_checkbox']").click()
# time.sleep(1)
self.driver.find_element_by_xpath(
"//android.widget.LinearLayout[@resource-id='android:id/buttonPanel']").click()
time.sleep(5)
def data(self):
# 获取数据
if WebDriverWait(self.driver, 3).until(lambda x: x.find_element_by_xpath(
"//android.widget.ImageView[@resource-id='cn.rongcloud.kaixinliao:id/rc_left']")):
self.driver.find_element_by_xpath(
"//android.widget.ImageView[@resource-id='cn.rongcloud.kaixinliao:id/rc_left']").click()
time.sleep(10)
size = self.get_size()
while True:
try:
x1 = int(size[0] * 0.95)
y1 = int(size[0] * 0.25)
y2 = int(size[1] * 0.75)
self.driver.swipe(x1, y2, x1, y1, 1000) # 1000毫秒内完成滑动
time.sleep(3)
print('-' * 100)
html = self.driver.page_source
# print('b' * 100)
# print(html)
# print('b' * 100)
# info_lists = re.findall(r'<android.widget.TextView index="0" text=(.+?)"', html)
# info_image = re.findall(r'<android.widget.ImageView index="0" text=(.+?)"', html)
# name_list1 = re.findall(r'<android.widget.TextView index="1" text="(.*?)"', html)
# name_list2 = re.findall(r'<android.widget.TextView index="2" text="(.*?)"', html)
# print(info_lists)
# print(info_image)
# print(name_list1)
# print(name_list2)
# info_list = info_lists + info_image
# name_list = name_list1 + name_list2
# for i in name_list:
# if 'VIP508' in i:
# name_list.remove(i)
# print(len(info_list))
# print(len(name_list))
# print('*' * 100)
# if info_list:
# for i, info in enumerate(info_list):
biaoq_list = re.findall(r'(&#.+?;)', html)
for i in biaoq_list:
html = html.replace(i, '')
# html = etree.HTML(html.encode('utf-8'))
html = etree.fromstring(html.encode('utf-8', 'surrogatepass'))
for data in html.xpath(
'//android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.FrameLayout/android.widget.ListView/android.widget.RelativeLayout'):
name = data.xpath('android.widget.TextView/@text')
info = data.xpath(
'android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.TextView/@text')
try:
info = info[0]
except:
info = ''
try:
name = name[0]
except:
name = ''
print('info:', info)
print('name:', name)
print('-' * 20)
if info:
datas = time.strftime("%Y-%m-%d", time.localtime())
datatimes = time.strftime("%Y-%m-%d %X", time.localtime())
num = self.cursor.execute(
'select id from kaixinliao where content="{}" and datas="{}"'.format(info[:250], datas))
if not num:
# 插入数据
self.cursor.execute(
'insert into kaixinliao(content, createtime, datas, name) values("{}", "{}", "{}", "{}")'.format(
info[:250], datatimes, datas, name))
self.db.commit()
# 邮箱设置---------------------------
if '王振东' in name:
body = '<h1>' + '王振东来了' + '</h1>'
msg = MIMEText(body, 'html')
message = MIMEMultipart()
message['subject'] = name
message['from'] = self.sender
message['to'] = self.receiver0
message.attach(msg)
try:
s = smtplib.SMTP_SSL(self.host, self.port)
s.login(self.sender, self.pwd)
s.sendmail(self.sender, self.receiver0, message.as_string())
# s.sendmail(self.sender, self.receiver1, message.as_string())
print('邮件发送成功')
except smtplib.SMTPException:
print('邮件发送失败')
if '活动专员' in name and '王振东' in info:
num = self.cursor.execute(
'select id from huodong where content="{}" and datas="{}"'.format(info[:250], datas))
if not num:
# 插入数据
self.cursor.execute(
'insert into huodong(content, createtime, datas, name) values("{}", "{}", "{}", "{}")'.format(
info[:250], datatimes, datas, name))
self.db.commit()
body = '<h1> ' + f'王振东可能要来了' + '</h1>'
msg = MIMEText(body, 'html')
message = MIMEMultipart()
message['subject'] = '王振东可能要来了'
message['from'] = self.sender
message['to'] = self.receiver0
message.attach(msg)
try:
s = smtplib.SMTP_SSL(self.host, self.port)
s.login(self.sender, self.pwd)
s.sendmail(self.sender, self.receiver0, message.as_string())
# s.sendmail(self.sender, self.receiver1, message.as_string())
print('邮件发送成功')
except smtplib.SMTPException:
print('邮件发送失败')
elif '五分快三' in info or '三分快三' in info or '一分快三' in info:
if '五分快三' in info:
types = 5
elif '三分快三' in info:
types = 3
elif '一分快三' in info:
types = 1
numbers = re.findall(r'\d+', info)[0]
if '大' in info:
sizes = '大'
elif '小' in info:
sizes = '小'
multiple = re.findall(r'\d+', info)[1]
datas = time.strftime("%Y-%m-%d", time.localtime())
print(numbers, sizes, multiple, datas, types)
nums = self.cursor.execute(
'select id from fen where numbers={} and datas="{}" and class={}'.format(int(numbers), datas, int(types)))
if not nums:
# 插入数据
self.cursor.execute(
'insert into fen(numbers, sizes, multiple, datas, class, createtime, name) values({}, "{}", {}, "{}", {}, "{}", "{}")'.format(
int(numbers), sizes, int(multiple), datas, int(types), datatimes, name))
self.db.commit()
# 邮箱设置---------------------------
if int(multiple) > 5000:
# 设置邮件正文,这里是支持HTML的
body = '<h1>' + str(types) + '分快三' + str(numbers) + '期【' + sizes + '】' + str(multiple) + '倍 </h1>'
# 设置正文为符合邮件格式的HTML内容
msg = MIMEText(body, 'html')
message = MIMEMultipart()
# 设置邮件标题
message['subject'] = str(types) + '分快三' + str(numbers) + '期【' + sizes + '】' + str(multiple) + '倍'
# 设置发送人
message['from'] = self.sender
# 设置接收人
message['to'] = self.receiver0
message.attach(msg)
try:
# 注意!如果是使用SSL端口,这里就要改为SMTP_SSL
s = smtplib.SMTP_SSL(self.host, self.port)
# 登陆邮箱
s.login(self.sender, self.pwd)
# 发送邮件!
s.sendmail(self.sender, self.receiver0, message.as_string())
# 发送第二人邮件
# s.sendmail(self.sender, self.receiver1, message.as_string())
print('邮件发送成功')
except smtplib.SMTPException:
print('邮件发送失败')
time.sleep(30)
except Exception as e:
print(f'====错误为{e}====')
print('=' * 100)
time.sleep(30)
def main(self):
# self.login()
# self.permission() # 华为手机app申请权限
self.data()
if __name__ == '__main__':
# try:
kaixinliao()
# except:
# print('*' * 100)
# print('***数据库抓取失败,正在重新启动***')
# time.sleep(10)
# kaixinliao()