1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
| import datetime
earliest = datetime.datetime.now() + datetime.timedelta(-int(3)) earliest = earliest.strftime('%Y-%m-%d %H:%M:%S') earliest = datetime.datetime.strptime(earliest, '%Y-%m-%d %H:%M:%S') earliest = datetime.datetime.timestamp(earliest) earliest
import time import json import random import requests import re import urllib3 import math import os,sys from selenium import webdriver from selenium.webdriver.common.by import By
BASE_DIR = os.path.abspath(".")
class Wechat_Crawl:
def __init__(self,cookie_path): self.cookie_path = cookie_path self.qurey = subscription_account self.url = 'https://mp.weixin.qq.com' self.search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?' self.appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?' self.header = { "HOST": "mp.weixin.qq.com", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36" }
def wechat_login(self): ''' Login wechat by automatically inputing accounts and keywords and manully scanning QR code, and then you can get the cookie information, save it in local file in order to simulate loginning and crawling…… :param __username: :param __password: :return: ''' print("浏览器将自动打开并跳转至微信公众号登录页面……") time.sleep(1) driver = webdriver.Chrome() driver.get("https://mp.weixin.qq.com/") time.sleep(2) print("请拿手机扫码二维码登录公众号") time.sleep(15) print("登录成功")
cookies = driver.get_cookies() info = {} for cookie in cookies: info[cookie['name']] = cookie['value'] cookie_info = json.dumps(info)
with open(cookie_path, 'w+', encoding='utf-8') as f: f.write(cookie_info) f.flush() print("cookies已存入cookie.txt",flush=True) driver.quit()
def get_cookie(self): with open(self.cookie_path, 'r', encoding='utf-8') as f: cookie = f.read() self.cookies = json.loads(cookie)
def _session(self): urllib3.disable_warnings() session = requests.Session() session.keep_alive = False session.adapters.DEFAULT_RETRIES = 511 self.session = session
def get_token(self): ''' to get the token from the loginned page. :param cookie_path: :return: token ''' time.sleep(1) response = self.session.get(url=self.url, cookies=self.cookies, verify=False) url = str(response.url) pattern = r'token=([0-9]+)' self.token = re.findall(pattern,url)[0]
return self.token
def get_fakedid(self, query): query_id = { 'action': 'search_biz', 'token': self.token, 'lang': 'zh_CN', 'f': 'json', 'ajax': '1', 'random': random.random(), 'query': query, 'begin': '0', 'count': '5' } search_response = self.session.get( self.search_url, headers=self.header, cookies=self.cookies, params=query_id) lists = search_response.json()['list'][0]
self.fakeid = lists['fakeid']
def get_args(self, query): self.get_cookie() self._session() self.get_token() self.get_fakedid(query)
def get_info(self,output_path): self.begin = 0 self.flag = True self.data = { "token": self.token, "lang": "zh_CN", "f": "json", "ajax": "1", "action": "list_ex", "begin": self.begin, "count": "5", "query": "", "fakeid": self.fakeid, "type": "9", }
while True: if self.flag == False: break time.sleep(random.randint(5,10)) self.data['begin'] = self.begin * 5 res = requests.get(self.appmsg_url, cookies=self.cookies, headers=self.header, params=self.data) try: json = res.json() count = json['app_msg_cnt']
for item in json["app_msg_list"]: if item['create_time'] < earliest: self.flag = False break create_date = time.strftime("%Y-%m-%d", time.localtime(item['create_time']))
title = item['title'].replace("\n", "") link = item['link'] with open(output_path, 'a+', encoding='utf-8-sig') as fh: article_info = f'{create_date},{title},{link}\n' fh.write(article_info) fh.flush() self.begin += 1 print(f'完成第{self.begin}页')
except: print(res.json()['base_resp']['err_msg']) print("Try it again after two hours.") break
def run(self,query, output_path): self.get_args(query) self.get_info(output_path)
if __name__ == '__main__': subscription_account_list = {'医麦客':'eMedClub', 'AIDD-Pro':"AIDDPro"} cookie_path = BASE_DIR + os.sep + 'db' + os.sep + 'cookie.txt' wc = Wechat_Crawl(cookie_path) wc.wechat_login() for k,v in subscription_account_list.items(): output_path = BASE_DIR + os.sep + 'db' + os.sep + 'data' + os.sep + k + '.csv' print("开始爬取公众号:",k) wc.run(v, output_path) print("爬取完成:", k)
|