首先企查查这个网站不登陆也能查公司,不过坑人的就是只能查那么几次,然后就必须要登录了。我想想为了那几次不值得,就搞个登录的爬虫程序吧。
众所周知,登录最重要的参数是Cookie,这个一般在浏览器的XHR(XMLHttpRequest对象/Ajax对象等)里复制任意一个元素的Cookie就可以了,建议使用CV大法复制,右击Copy value可能会复制到中文,在此先献上不知道从哪搜到的读取谷歌浏览器Cookie的方法:
- # _*_ coding:utf-8 _*_
- # FileName: get_cookies.py
- # IDE: PyCharm
- # 菜菜代码,永无BUG!
-
- import sys
- import json
- import base64
- import getpass
- import sqlite3
- import urllib3
- import ctypes.wintypes
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-
-
- urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
-
-
- class DataBlob(ctypes.Structure):
- _fields_ = [('cbData', ctypes.wintypes.DWORD), ('pbData', ctypes.POINTER(ctypes.c_char))]
-
-
- def dp_api_decrypt(encrypted):
- p = ctypes.create_string_buffer(encrypted, len(encrypted))
- blob_out = DataBlob()
- ret_val = ctypes.windll.crypt32.CryptUnprotectData(ctypes.byref(DataBlob(ctypes.sizeof(p), p)), None, None, None, None, 0, ctypes.byref(blob_out))
- if not ret_val:
- raise ctypes.WinError()
- result = ctypes.string_at(blob_out.pbData, blob_out.cbData)
- ctypes.windll.kernel32.LocalFree(blob_out.pbData)
- return result
-
-
- def aes_decrypt(encrypted_txt):
- with open(f'C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data\\Local State', encoding='utf-8', mode="r") as f:
- jsn = json.loads(str(f.readline()))
- encrypted_key = base64.b64decode(jsn["os_crypt"]["encrypted_key"].encode())
- encrypted_key = encrypted_key[5:]
- cipher = Cipher(algorithms.AES(dp_api_decrypt(encrypted_key)), None, backend=default_backend())
- cipher.mode = modes.GCM(encrypted_txt[3:15])
- return cipher.decryptor().update(encrypted_txt[15:])
-
-
- def chrome_decrypt(encrypted_txt):
- if sys.platform == 'win32':
- try:
- if encrypted_txt[:4] == b'x01x00x00x00':
- return dp_api_decrypt(encrypted_txt).decode()
- elif encrypted_txt[:3] == b'v10':
- return aes_decrypt(encrypted_txt)[:-16].decode()
- except WindowsError:
- return None
- else:
- raise WindowsError
-
-
- def get_cookies_from_chrome(d):
- con = sqlite3.connect(f'C:\\Users\\{getpass.getuser()}\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\Cookies')
- con.row_factory = sqlite3.Row
- cur = con.cursor()
- cur.execute(f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{d}%"')
- cookie = ''
- for row in cur:
- if row['value'] is not None:
- value = chrome_decrypt(row['value'])
- if value is not None:
- cookie += row['name'] + '=' + value + ';'
- return cookie
-
有了这段代码无疑打开浏览器就不用去打开浏览器开发调试工具了,不过有时候还得手动复制Cookie,因为企查查登录后所拥有的Cookie不止来自一个域。
好了开始用Cookie进入企查查拿到数据,首先headers中有referer和user-agent能大大减少异常请求的几率,显得更“真实”,我不采用解析企查查表格,我选择解析其数据源(解析表格的搜的到,虽然我没有试现在还能不能使用),字典window.__INITIAL_STATE__,有概率会不出现在页面源码中,但只要坚持不断重新执行程序,我相信总会出现的(数据源中的数据多,虽然我并不需要那么多数据,但完美点肯定是好的)。
我写的程序对公司名匹配仅包括去括号和和去括号及括号内容来匹配公司名一致,如有需要可以稍微改动,公司名为name_变量,匹配规则在它的下面两行,下面是源码:
- # _*_ coding:utf-8 _*_
- # FileName: get_qcc_company.py
- # IDE: PyCharm
- # 菜菜代码,永无BUG!
-
- import json
- import time
- import requests
- from urllib import parse
- from bs4 import BeautifulSoup
- from get_cookies import get_cookies_from_chrome
-
- # https://www.qcc.com/
-
- str_time = lambda _: _ == 253392422400 and "9999-09-09" or _ and time.strftime("%Y-%m-%d", time.localtime(_)) or "无固定期限" # 格式化日期
-
- # 对接企查查登录查询
-
- headers = {
- "referer": "https://www.qcc.com/",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
- }
-
- search_url = "https://www.qcc.com/web/search?" # 搜索结果网页
-
- # 用ctrl+c复制,不然可能会有中文;用谷歌浏览器访问企查查并登录,稍微过会等待cookie数据写入本地数据库
- cookie = get_cookies_from_chrome('qcc.com') + get_cookies_from_chrome('www.qcc.com')
-
-
- def get_company(company_name):
- r = requests.get(search_url + parse.urlencode({"key": company_name}), headers=headers, cookies={"cookie": cookie})
- if r.ok:
- soup = BeautifulSoup(r.text, "html.parser")
- table = soup.find("table", attrs={"class": "ntable ntable-list"})
- if table is None:
- return f"未搜寻到公司 “{company_name}” !"
- for tr in table.find_all("tr"):
- info = tr.find_all("td")[2].find("div")
- if info.find("a").find("span") is None:
- continue
- name_ = info.find("a").find("span").text.replace('(', '(').replace(')', ')')
- url = info.find("a")["href"]
- if company_name != name_.replace(name_[name_.find('('): name_.rfind(')') + 1], '') and company_name != name_.replace('(', '').replace(')', ''):
- continue
- r = requests.get(url, headers=headers, cookies={"cookie": cookie})
- if r.ok:
- r.encoding = 'utf-8'
- soup = BeautifulSoup(r.text, "html.parser")
- script = soup.find_all('script')
- for s in script:
- if 'window.__INITIAL_STATE__' in s.text:
- script = s.text
- break
- else:
- return '请清除谷歌浏览器缓存,并重新登录企查查重新执行程序!如果多次出现此提示,请手动复制任意XHR的cookie值赋予到cookie变量!'
- information = json.loads(script[script.find('{'): script.rfind('};') + 1]) # 全部细节
- # 开始从字典中获取信息
- detail = information["company"]["companyDetail"] # 企业细节
- name = detail["Name"] # 企业名称
- update = str_time(detail["UpdatedDate"]) # 信息更新时间
- person = detail["Oper"]["Name"] # 法定代表人
- status = detail["Status"] # 登记状态
- credit_code = detail["CreditCode"] # 统一社会信用代码
- no = detail["No"] # 工商注册号
- org_code = detail["OrgNo"] # 组织机构代码
- tax_code = detail["TaxNo"] # 纳税人识别号
- tax_type = detail.get("TaxpayerType", "") # 纳税人资质
- register_capital = detail["RegistCapi"] # 注册资本
- paid_capital = detail["RecCap"] # 实缴资本
- belong_org = detail["BelongOrg"] # 登记机关
- time_start = str_time(detail["TermStart"]) # 成立日期
- check_date = str_time(detail["CheckDate"]) # 核准日期
- time_range = f'{str_time(detail["TermStart"])}至{str_time(detail["TeamEnd"])}' # 营业期限
- address = detail["Address"] # 注册地址
- scope = detail["Scope"] # 宗旨和业务范围
- kind = detail["EconKind"] # 企业类型
- industry = detail["Industry"]["SubIndustry"] # 所属行业
- area = detail["Area"]["Province"] # 所属地区
- person_num = detail["profile"]["Info"] # 人员规模
- can_bao = [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"] # 参保人数
- can_bao = can_bao and can_bao[0] or '' # 参保人数
- english = detail["EnglishName"] # 英文名
- if detail["OriginalName"] is None:
- organization_names = [] # 曾用名
- else:
- organization_names = [_["Name"] for _ in detail["OriginalName"]] # 曾用名
- company = {
- "企业名称": name,
- "信息更新时间": update,
- "法定代表人": person,
- "登记状态": status,
- "统一社会信用代码": credit_code,
- "工商注册号": no,
- "组织机构代码": org_code,
- "纳税人识别号": tax_code,
- "纳税人资质": tax_type,
- "注册资本": register_capital,
- "实缴资本": paid_capital,
- "登记机关": belong_org,
- "成立日期": time_start,
- "核准日期": check_date,
- "营业期限": time_range,
- "注册地址": address,
- "宗旨和业务范围": scope,
- "企业类型": kind,
- "所属行业": industry,
- "所属地区": area,
- "人员规模": person_num,
- "参保人数": can_bao,
- "英文名": english,
- "曾用名": organization_names
- }
- return company
- return f"获取公司 “{name_}” 详情信息失败!"
- return f"未搜寻到公司 “{company_name}” !"
- return "搜索失败!"
-
-
- print(get_company("需要查询的公司名"))
-
看到由短到长的import,有没有感觉到我那无处安放的强迫症?