2025年2月24日 星期一 甲辰(龙)年 腊月廿四 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

开心聊聊天信息爬取

时间:07-03来源:作者:点击数:33

开心聊聊天信息爬取

  • # _*_ coding:UTF-8
  • import time
  • from selenium.webdriver.support.ui import WebDriverWait
  • from appium import webdriver
  • import re
  • import pymysql
  • import smtplib
  • from email.mime.text import MIMEText
  • from email.mime.multipart import MIMEMultipart
  • from lxml import etree
  • class kaixinliao(object):
  • def __init__(self):
  • # self.db = pymysql.connect(host='localhost', port=3306, database='cfda', user='root', password='root',
  • self.db = pymysql.connect(host='47.98.163.18', port=3306, database='cfda', user='root', password='root',
  • charset='utf8')
  • self.cursor = self.db.cursor()
  • # 初始化参数
  • self.desired_caps = {}
  • self.desired_caps['platformName'] = 'Android'
  • self.desired_caps['deviceName'] = '127.0.0.1:62001' # 夜神模拟器
  • # self.desired_caps['deviceName'] = 'MXF5T15C24001273' # 华为手机
  • self.desired_caps['platformVersion'] = '5.1.1' # 夜神模拟器
  • # self.desired_caps['platformVersion'] = '6.0' # 华为手机
  • self.desired_caps['appPackage'] = 'cn.rongcloud.kaixinliao'
  • self.desired_caps['appActivity'] = 'cn.rongcloud.kaixinliao.ui.activity.SplashActivity'
  • self.desired_caps['noReset'] = True
  • self.desired_caps['unicodeKeyboard'] = True
  • self.desired_caps['resetKeyboard'] = True
  • self.driver = webdriver.Remote('http://192.168.31.39:4723/wd/hub', self.desired_caps)
  • time.sleep(10)
  • # 邮箱设置
  • # 设置发件服务器地址
  • self.host = 'smtp.163.com'
  • # 设置发件服务器端口号。注意,这里有SSL和非SSL两种形式
  • self.port = 465
  • # 设置发件邮箱,一定要自己注册的邮箱
  • self.sender = '17682303516@163.com'
  • # 设置发件邮箱的密码,163邮箱的授权码,等会登陆会用到
  • self.pwd = 'yu17682303516'
  • # 设置邮件接收人,可以是扣扣邮箱
  • self.receiver0 = '1064145110@qq.com'
  • # self.receiver1 = '54400407@qq.com'
  • self.main()
  • def get_size(self):
  • x = self.driver.get_window_size()['width']
  • y = self.driver.get_window_size()['height']
  • return (x, y)
  • def login(self):
  • # 登入
  • size = self.get_size()
  • for i in range(2):
  • x1 = int(size[0] * 0.1)
  • x2 = int(size[0] * 0.9)
  • y = int(size[1] * 0.5)
  • self.driver.swipe(x2, y, x1, y, 1000) # 1000毫秒内完成滑动
  • time.sleep(3)
  • if WebDriverWait(self.driver, 3).until(lambda x: x.find_element_by_xpath(
  • "//android.widget.RelativeLayout[@resource-id='cn.rongcloud.kaixinliao:id/guide_go_home_tv']")):
  • self.driver.find_element_by_xpath(
  • "//android.widget.RelativeLayout[@resource-id='cn.rongcloud.kaixinliao:id/guide_go_home_tv']").click()
  • time.sleep(3)
  • if WebDriverWait(self.driver, 3).until(lambda x: x.find_element_by_xpath(
  • "//android.widget.EditText[@resource-id='cn.rongcloud.kaixinliao:id/de_login_phone']")):
  • self.driver.find_element_by_xpath(
  • "//android.widget.EditText[@resource-id='cn.rongcloud.kaixinliao:id/de_login_phone']").send_keys(
  • '17682303516')
  • # '18513031949')
  • time.sleep(1)
  • self.driver.tap([(108, 233), (128, 253)], 500) # 点击500毫秒
  • time.sleep(1)
  • self.driver.find_element_by_xpath(
  • "//android.widget.EditText[@resource-id='cn.rongcloud.kaixinliao:id/de_login_password']").send_keys(
  • 'yu106414511')
  • time.sleep(1)
  • self.driver.tap([(108, 233), (128, 253)], 500) # 点击500毫秒
  • time.sleep(1)
  • self.driver.find_element_by_xpath(
  • "//android.widget.Button[@resource-id='cn.rongcloud.kaixinliao:id/de_login_sign']").click()
  • time.sleep(10)
  • def permission(self):
  • # app申请手机权限
  • for i in range(2):
  • # self.driver.find_element_by_xpath(
  • # "//android.widget.CheckBox[@resource-id='com.android.packageinstaller:id/do_not_ask_checkbox']").click()
  • # time.sleep(1)
  • self.driver.find_element_by_xpath(
  • "//android.widget.LinearLayout[@resource-id='android:id/buttonPanel']").click()
  • time.sleep(5)
  • def data(self):
  • # 获取数据
  • if WebDriverWait(self.driver, 3).until(lambda x: x.find_element_by_xpath(
  • "//android.widget.ImageView[@resource-id='cn.rongcloud.kaixinliao:id/rc_left']")):
  • self.driver.find_element_by_xpath(
  • "//android.widget.ImageView[@resource-id='cn.rongcloud.kaixinliao:id/rc_left']").click()
  • time.sleep(10)
  • size = self.get_size()
  • while True:
  • try:
  • x1 = int(size[0] * 0.95)
  • y1 = int(size[0] * 0.25)
  • y2 = int(size[1] * 0.75)
  • self.driver.swipe(x1, y2, x1, y1, 1000) # 1000毫秒内完成滑动
  • time.sleep(3)
  • print('-' * 100)
  • html = self.driver.page_source
  • # print('b' * 100)
  • # print(html)
  • # print('b' * 100)
  • # info_lists = re.findall(r'<android.widget.TextView index="0" text=(.+?)"', html)
  • # info_image = re.findall(r'<android.widget.ImageView index="0" text=(.+?)"', html)
  • # name_list1 = re.findall(r'<android.widget.TextView index="1" text="(.*?)"', html)
  • # name_list2 = re.findall(r'<android.widget.TextView index="2" text="(.*?)"', html)
  • # print(info_lists)
  • # print(info_image)
  • # print(name_list1)
  • # print(name_list2)
  • # info_list = info_lists + info_image
  • # name_list = name_list1 + name_list2
  • # for i in name_list:
  • # if 'VIP508' in i:
  • # name_list.remove(i)
  • # print(len(info_list))
  • # print(len(name_list))
  • # print('*' * 100)
  • # if info_list:
  • # for i, info in enumerate(info_list):
  • biaoq_list = re.findall(r'(&#.+?;)', html)
  • for i in biaoq_list:
  • html = html.replace(i, '')
  • # html = etree.HTML(html.encode('utf-8'))
  • html = etree.fromstring(html.encode('utf-8', 'surrogatepass'))
  • for data in html.xpath(
  • '//android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.FrameLayout/android.widget.ListView/android.widget.RelativeLayout'):
  • name = data.xpath('android.widget.TextView/@text')
  • info = data.xpath(
  • 'android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.TextView/@text')
  • try:
  • info = info[0]
  • except:
  • info = ''
  • try:
  • name = name[0]
  • except:
  • name = ''
  • print('info:', info)
  • print('name:', name)
  • print('-' * 20)
  • if info:
  • datas = time.strftime("%Y-%m-%d", time.localtime())
  • datatimes = time.strftime("%Y-%m-%d %X", time.localtime())
  • num = self.cursor.execute(
  • 'select id from kaixinliao where content="{}" and datas="{}"'.format(info[:250], datas))
  • if not num:
  • # 插入数据
  • self.cursor.execute(
  • 'insert into kaixinliao(content, createtime, datas, name) values("{}", "{}", "{}", "{}")'.format(
  • info[:250], datatimes, datas, name))
  • self.db.commit()
  • # 邮箱设置---------------------------
  • if '王振东' in name:
  • body = '<h1>' + '王振东来了' + '</h1>'
  • msg = MIMEText(body, 'html')
  • message = MIMEMultipart()
  • message['subject'] = name
  • message['from'] = self.sender
  • message['to'] = self.receiver0
  • message.attach(msg)
  • try:
  • s = smtplib.SMTP_SSL(self.host, self.port)
  • s.login(self.sender, self.pwd)
  • s.sendmail(self.sender, self.receiver0, message.as_string())
  • # s.sendmail(self.sender, self.receiver1, message.as_string())
  • print('邮件发送成功')
  • except smtplib.SMTPException:
  • print('邮件发送失败')
  • if '活动专员' in name and '王振东' in info:
  • num = self.cursor.execute(
  • 'select id from huodong where content="{}" and datas="{}"'.format(info[:250], datas))
  • if not num:
  • # 插入数据
  • self.cursor.execute(
  • 'insert into huodong(content, createtime, datas, name) values("{}", "{}", "{}", "{}")'.format(
  • info[:250], datatimes, datas, name))
  • self.db.commit()
  • body = '<h1> ' + f'王振东可能要来了' + '</h1>'
  • msg = MIMEText(body, 'html')
  • message = MIMEMultipart()
  • message['subject'] = '王振东可能要来了'
  • message['from'] = self.sender
  • message['to'] = self.receiver0
  • message.attach(msg)
  • try:
  • s = smtplib.SMTP_SSL(self.host, self.port)
  • s.login(self.sender, self.pwd)
  • s.sendmail(self.sender, self.receiver0, message.as_string())
  • # s.sendmail(self.sender, self.receiver1, message.as_string())
  • print('邮件发送成功')
  • except smtplib.SMTPException:
  • print('邮件发送失败')
  • elif '五分快三' in info or '三分快三' in info or '一分快三' in info:
  • if '五分快三' in info:
  • types = 5
  • elif '三分快三' in info:
  • types = 3
  • elif '一分快三' in info:
  • types = 1
  • numbers = re.findall(r'\d+', info)[0]
  • if '大' in info:
  • sizes = '大'
  • elif '小' in info:
  • sizes = '小'
  • multiple = re.findall(r'\d+', info)[1]
  • datas = time.strftime("%Y-%m-%d", time.localtime())
  • print(numbers, sizes, multiple, datas, types)
  • nums = self.cursor.execute(
  • 'select id from fen where numbers={} and datas="{}" and class={}'.format(int(numbers), datas, int(types)))
  • if not nums:
  • # 插入数据
  • self.cursor.execute(
  • 'insert into fen(numbers, sizes, multiple, datas, class, createtime, name) values({}, "{}", {}, "{}", {}, "{}", "{}")'.format(
  • int(numbers), sizes, int(multiple), datas, int(types), datatimes, name))
  • self.db.commit()
  • # 邮箱设置---------------------------
  • if int(multiple) > 5000:
  • # 设置邮件正文,这里是支持HTML的
  • body = '<h1>' + str(types) + '分快三' + str(numbers) + '期【' + sizes + '】' + str(multiple) + '倍 </h1>'
  • # 设置正文为符合邮件格式的HTML内容
  • msg = MIMEText(body, 'html')
  • message = MIMEMultipart()
  • # 设置邮件标题
  • message['subject'] = str(types) + '分快三' + str(numbers) + '期【' + sizes + '】' + str(multiple) + '倍'
  • # 设置发送人
  • message['from'] = self.sender
  • # 设置接收人
  • message['to'] = self.receiver0
  • message.attach(msg)
  • try:
  • # 注意!如果是使用SSL端口,这里就要改为SMTP_SSL
  • s = smtplib.SMTP_SSL(self.host, self.port)
  • # 登陆邮箱
  • s.login(self.sender, self.pwd)
  • # 发送邮件!
  • s.sendmail(self.sender, self.receiver0, message.as_string())
  • # 发送第二人邮件
  • # s.sendmail(self.sender, self.receiver1, message.as_string())
  • print('邮件发送成功')
  • except smtplib.SMTPException:
  • print('邮件发送失败')
  • time.sleep(30)
  • except Exception as e:
  • print(f'====错误为{e}====')
  • print('=' * 100)
  • time.sleep(30)
  • def main(self):
  • # self.login()
  • # self.permission() # 华为手机app申请权限
  • self.data()
  • if __name__ == '__main__':
  • # try:
  • kaixinliao()
  • # except:
  • # print('*' * 100)
  • # print('***数据库抓取失败,正在重新启动***')
  • # time.sleep(10)
  • # kaixinliao()

 

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门