微信公众号文章爬取

由于时效问题,该文某些代码、技术可能已经过期,请注意!!!本文最后更新于:1 年前

如题

每个公众号文章会有很多,所以这里只提取了最近发布的文章,时间可以自定义。
完整code如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import datetime

## 这里定义了爬取3天内的文章
earliest = datetime.datetime.now() + datetime.timedelta(-int(3))
earliest = earliest.strftime('%Y-%m-%d %H:%M:%S')
earliest = datetime.datetime.strptime(earliest, '%Y-%m-%d %H:%M:%S')
earliest = datetime.datetime.timestamp(earliest)
earliest

import time
import json
import random
import requests
import re
import urllib3
import math
import os,sys
from selenium import webdriver
from selenium.webdriver.common.by import By

BASE_DIR = os.path.abspath(".")
# sys.path.append(BASE_DIR)

class Wechat_Crawl:

def __init__(self,cookie_path):
self.cookie_path = cookie_path
self.qurey = subscription_account
self.url = 'https://mp.weixin.qq.com'
self.search_url = 'https://mp.weixin.qq.com/cgi-bin/searchbiz?'
self.appmsg_url = 'https://mp.weixin.qq.com/cgi-bin/appmsg?'
self.header = {
"HOST": "mp.weixin.qq.com",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
}

def wechat_login(self):
'''
Login wechat by automatically inputing accounts and keywords and manully scanning QR code, and then you can
get the cookie information, save it in local file in order to simulate loginning and crawling……
:param __username:
:param __password:
:return:
'''
print("浏览器将自动打开并跳转至微信公众号登录页面……")
time.sleep(1)
driver = webdriver.Chrome()
driver.get("https://mp.weixin.qq.com/")
time.sleep(2)
print("请拿手机扫码二维码登录公众号")
time.sleep(15)
print("登录成功")

cookies = driver.get_cookies()
info = {}
for cookie in cookies:
info[cookie['name']] = cookie['value']
cookie_info = json.dumps(info)
# print(cookie_info)
with open(cookie_path, 'w+', encoding='utf-8') as f:
f.write(cookie_info)
f.flush()
print("cookies已存入cookie.txt",flush=True)
driver.quit()

def get_cookie(self):
with open(self.cookie_path, 'r', encoding='utf-8') as f:
cookie = f.read()
self.cookies = json.loads(cookie)


def _session(self):
urllib3.disable_warnings()
# requests库urllib编写,是对urllib进行了封装,在urllib2版本对https的处理非常简单,只需要在请求的时候加上verify = False即可,
# 这个参数的意思是忽略https安全证书的验证,也就是不验证证书的可靠性,直接请求,这其实是不安全的,因为证书可以伪造,不验证的话就不
# 能保证数据的真实性。
# 在urllib3版本,官方强制验证https的安全证书,如果没有通过是不能通过请求的,虽然添加忽略验证的参数,但是依然会给出醒目的Warning
# urllib3.disable_warnings()可以禁用urllib3的警告。
session = requests.Session()
session.keep_alive = False
session.adapters.DEFAULT_RETRIES = 511
self.session = session

def get_token(self):
'''
to get the token from the loginned page.
:param cookie_path:
:return: token
'''
time.sleep(1)
response = self.session.get(url=self.url, cookies=self.cookies, verify=False)
url = str(response.url)
pattern = r'token=([0-9]+)'
self.token = re.findall(pattern,url)[0]

return self.token

def get_fakedid(self, query):
query_id = {
'action': 'search_biz',
'token': self.token,
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': query,
'begin': '0',
'count': '5'
}
search_response = self.session.get(
self.search_url,
headers=self.header,
cookies=self.cookies,
params=query_id)
lists = search_response.json()['list'][0]

self.fakeid = lists['fakeid']

def get_args(self, query):
self.get_cookie()
self._session()
self.get_token()
self.get_fakedid(query)

def get_info(self,output_path):
self.begin = 0
self.flag = True

self.data = {
"token": self.token,
"lang": "zh_CN",
"f": "json",
"ajax": "1",
"action": "list_ex",
"begin": self.begin,
"count": "5",
"query": "",
"fakeid": self.fakeid,
"type": "9",
}

# print(self.data)

while True:
if self.flag == False:
break
# 随机暂停几秒,避免过快的请求导致过快的被查到
time.sleep(random.randint(5,10))
self.data['begin'] = self.begin * 5
res = requests.get(self.appmsg_url, cookies=self.cookies, headers=self.header, params=self.data)
try:
json = res.json()
count = json['app_msg_cnt']

for item in json["app_msg_list"]:
if item['create_time'] < earliest:
self.flag = False
break
create_date = time.strftime("%Y-%m-%d", time.localtime(item['create_time']))
# print(create_date)
title = item['title'].replace("\n", "")
link = item['link']
with open(output_path, 'a+', encoding='utf-8-sig') as fh:
article_info = f'{create_date},{title},{link}\n'
fh.write(article_info)
fh.flush()

self.begin += 1
print(f'完成第{self.begin}页')

except:
print(res.json()['base_resp']['err_msg'])
print("Try it again after two hours.")
break

def run(self,query, output_path):
self.get_args(query)
self.get_info(output_path)


if __name__ == '__main__':
## 可自定义需要爬取的公众号
subscription_account_list = {'医麦客':'eMedClub', 'AIDD-Pro':"AIDDPro"}

cookie_path = BASE_DIR + os.sep + 'db' + os.sep + 'cookie.txt'
wc = Wechat_Crawl(cookie_path)
wc.wechat_login()
for k,v in subscription_account_list.items():
output_path = BASE_DIR + os.sep + 'db' + os.sep + 'data' + os.sep + k + '.csv'
print("开始爬取公众号:",k)
wc.run(v, output_path)
print("爬取完成:", k)

参考:https://cloud.tencent.com/developer/article/1506163
https://zhuanlan.zhihu.com/p/379062852

另外得到的公众号文章需要进一步处理,这里以某篇为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import lxml
import re
import requests
from bs4 import BeautifulSoup

url = 'https://mp.weixin.qq.com/s/v5NtX6xr2a2-lpDG2yvgLA'
page = requests.get(url)

# 有些正文被隐藏
html = page.text.replace("data-src", "src").replace('style="visibility: hidden;"',"")
# html = re.sub("</html>","",page.text,flags=re.S|re.IGNORECASE)+"</html>"
# soup = BeautifulSoup(page.content, 'lxml')
soup = BeautifulSoup(html, 'lxml')

# 获取正文, 通过id或者class获取
# wb = soup.find('div', {'class': "rich_media_content"})
wb = soup.find('div', id='js_content')

## 删除不需要的信息
tmp = 0
for i, tag in enumerate(wb.find_all('section')):
if '招聘信息' in tag.prettify():
tmp = i
break

for i, tag in enumerate(wb.find_all('section')):
if '前言' in tag.prettify() and i>=3:
# tag.extract()
tag.decompose()
if i >= tmp:
# tag.extract()
tag.decompose()

# del_str = wb.find_all('ol', {'class': "list-paddingleft-1"})[-1].extract()
wb.find_all('ol', {'class': "list-paddingleft-1"})[-1].decompose()

# 保存成html格式
with open('AI药物周资讯.html',"w",encoding="utf-8-sig")as f:
f.write(wb.prettify(formatter='html'))

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!