您当前的位置:首页 > 计算机 > 编程开发 > Python

Python 爬取企查查

时间:04-23来源:作者:点击数:

首先企查查这个网站不登陆也能查公司,不过坑人的就是只能查那么几次,然后就必须要登录了。我想想为了那几次不值得,就搞个登录的爬虫程序吧。

众所周知,登录最重要的参数是Cookie,这个一般在浏览器的XHR(XMLHttpRequest对象/Ajax对象等)里复制任意一个元素的Cookie就可以了,建议使用CV大法复制,右击Copy value可能会复制到中文,在此先献上不知道从哪搜到的读取谷歌浏览器Cookie的方法:

# _*_ coding:utf-8 _*_
# FileName: get_cookies.py
# IDE: PyCharm
# 菜菜代码,永无BUG!

import sys
import json
import base64
import getpass
import sqlite3
import urllib3
import ctypes.wintypes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class DataBlob(ctypes.Structure):
    _fields_ = [('cbData', ctypes.wintypes.DWORD), ('pbData', ctypes.POINTER(ctypes.c_char))]


def dp_api_decrypt(encrypted):
    p = ctypes.create_string_buffer(encrypted, len(encrypted))
    blob_out = DataBlob()
    ret_val = ctypes.windll.crypt32.CryptUnprotectData(ctypes.byref(DataBlob(ctypes.sizeof(p), p)), None, None, None, None, 0, ctypes.byref(blob_out))
    if not ret_val:
        raise ctypes.WinError()
    result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
    ctypes.windll.kernel32.LocalFree(blob_out.pbData)
    return result


def aes_decrypt(encrypted_txt):
    with open(f'C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data\\Local State', encoding='utf-8', mode="r") as f:
        jsn = json.loads(str(f.readline()))
    encrypted_key = base64.b64decode(jsn["os_crypt"]["encrypted_key"].encode())
    encrypted_key = encrypted_key[5:]
    cipher = Cipher(algorithms.AES(dp_api_decrypt(encrypted_key)), None, backend=default_backend())
    cipher.mode = modes.GCM(encrypted_txt[3:15])
    return cipher.decryptor().update(encrypted_txt[15:])


def chrome_decrypt(encrypted_txt):
    if sys.platform == 'win32':
        try:
            if encrypted_txt[:4] == b'x01x00x00x00':
                return dp_api_decrypt(encrypted_txt).decode()
            elif encrypted_txt[:3] == b'v10':
                return aes_decrypt(encrypted_txt)[:-16].decode()
        except WindowsError:
            return None
    else:
        raise WindowsError


def get_cookies_from_chrome(d):
    con = sqlite3.connect(f'C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Cookies')
    con.row_factory = sqlite3.Row
    cur = con.cursor()
    cur.execute(f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{d}%"')
    cookie = ''
    for row in cur:
        if row['value'] is not None:
            value = chrome_decrypt(row['value'])
            if value is not None:
                cookie += row['name'] + '=' + value + ';'
    return cookie

有了这段代码无疑打开浏览器就不用去打开浏览器开发调试工具了,不过有时候还得手动复制Cookie,因为企查查登录后所拥有的Cookie不止来自一个域。

好了开始用Cookie进入企查查拿到数据,首先headers中有referer和user-agent能大大减少异常请求的几率,显得更“真实”,我不采用解析企查查表格,我选择解析其数据源(解析表格的搜的到,虽然我没有试现在还能不能使用),字典window.__INITIAL_STATE__,有概率会不出现在页面源码中,但只要坚持不断重新执行程序,我相信总会出现的(数据源中的数据多,虽然我并不需要那么多数据,但完美点肯定是好的)。

我写的程序对公司名匹配仅包括去括号和和去括号及括号内容来匹配公司名一致,如有需要可以稍微改动,公司名为name_变量,匹配规则在它的下面两行,下面是源码:

# _*_ coding:utf-8 _*_
# FileName: get_qcc_company.py
# IDE: PyCharm
# 菜菜代码,永无BUG!

import json
import time
import requests
from urllib import parse
from bs4 import BeautifulSoup
from get_cookies import get_cookies_from_chrome

# https://www.qcc.com/

str_time = lambda _: _ == 253392422400 and "9999-09-09" or _ and time.strftime("%Y-%m-%d", time.localtime(_)) or "无固定期限"  # 格式化日期

# 对接企查查登录查询

headers = {
    "referer": "https://www.qcc.com/",
    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
}

search_url = "https://www.qcc.com/web/search?"  # 搜索结果网页

# 用ctrl+c复制,不然可能会有中文;用谷歌浏览器访问企查查并登录,稍微过会等待cookie数据写入本地数据库
cookie = get_cookies_from_chrome('qcc.com') + get_cookies_from_chrome('www.qcc.com')


def get_company(company_name):
    r = requests.get(search_url + parse.urlencode({"key": company_name}), headers=headers, cookies={"cookie": cookie})
    if r.ok:
        soup = BeautifulSoup(r.text, "html.parser")
        table = soup.find("table", attrs={"class": "ntable ntable-list"})
        if table is None:
            return f"未搜寻到公司 “{company_name}” !"
        for tr in table.find_all("tr"):
            info = tr.find_all("td")[2].find("div")
            if info.find("a").find("span") is None:
                continue
            name_ = info.find("a").find("span").text.replace('(', '(').replace(')', ')')
            url = info.find("a")["href"]
            if company_name != name_.replace(name_[name_.find('('): name_.rfind(')') + 1], '') and company_name != name_.replace('(', '').replace(')', ''):
                continue
            r = requests.get(url, headers=headers, cookies={"cookie": cookie})
            if r.ok:
                r.encoding = 'utf-8'
                soup = BeautifulSoup(r.text, "html.parser")
                script = soup.find_all('script')
                for s in script:
                    if 'window.__INITIAL_STATE__' in s.text:
                        script = s.text
                        break
                else:
                    return '请清除谷歌浏览器缓存,并重新登录企查查重新执行程序!如果多次出现此提示,请手动复制任意XHR的cookie值赋予到cookie变量!'
                information = json.loads(script[script.find('{'): script.rfind('};') + 1])  # 全部细节
                # 开始从字典中获取信息
                detail = information["company"]["companyDetail"]  # 企业细节
                name = detail["Name"]  # 企业名称
                update = str_time(detail["UpdatedDate"])  # 信息更新时间
                person = detail["Oper"]["Name"]  # 法定代表人
                status = detail["Status"]  # 登记状态
                credit_code = detail["CreditCode"]  # 统一社会信用代码
                no = detail["No"]  # 工商注册号
                org_code = detail["OrgNo"]  # 组织机构代码
                tax_code = detail["TaxNo"]  # 纳税人识别号
                tax_type = detail.get("TaxpayerType", "")  # 纳税人资质
                register_capital = detail["RegistCapi"]  # 注册资本
                paid_capital = detail["RecCap"]  # 实缴资本
                belong_org = detail["BelongOrg"]  # 登记机关
                time_start = str_time(detail["TermStart"])  # 成立日期
                check_date = str_time(detail["CheckDate"])  # 核准日期
                time_range = f'{str_time(detail["TermStart"])}至{str_time(detail["TeamEnd"])}'  # 营业期限
                address = detail["Address"]  # 注册地址
                scope = detail["Scope"]  # 宗旨和业务范围
                kind = detail["EconKind"]  # 企业类型
                industry = detail["Industry"]["SubIndustry"]  # 所属行业
                area = detail["Area"]["Province"]  # 所属地区
                person_num = detail["profile"]["Info"]  # 人员规模
                can_bao = [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"]  # 参保人数
                can_bao = can_bao and can_bao[0] or ''  # 参保人数
                english = detail["EnglishName"]  # 英文名
                if detail["OriginalName"] is None:
                    organization_names = []  # 曾用名
                else:
                    organization_names = [_["Name"] for _ in detail["OriginalName"]]  # 曾用名
                company = {
                    "企业名称": name,
                    "信息更新时间": update,
                    "法定代表人": person,
                    "登记状态": status,
                    "统一社会信用代码": credit_code,
                    "工商注册号": no,
                    "组织机构代码": org_code,
                    "纳税人识别号": tax_code,
                    "纳税人资质": tax_type,
                    "注册资本": register_capital,
                    "实缴资本": paid_capital,
                    "登记机关": belong_org,
                    "成立日期": time_start,
                    "核准日期": check_date,
                    "营业期限": time_range,
                    "注册地址": address,
                    "宗旨和业务范围": scope,
                    "企业类型": kind,
                    "所属行业": industry,
                    "所属地区": area,
                    "人员规模": person_num,
                    "参保人数": can_bao,
                    "英文名": english,
                    "曾用名": organization_names
                }
                return company
            return f"获取公司 “{name_}” 详情信息失败!"
        return f"未搜寻到公司 “{company_name}” !"
    return "搜索失败!"


print(get_company("需要查询的公司名"))

看到由短到长的import,有没有感觉到我那无处安放的强迫症?

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门