client.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. import asyncio
  2. import json
  3. import logging
  4. import re
  5. import time
  6. from typing import Any, Callable, Dict, List, Optional, Union
  7. from urllib.parse import urlencode
  8. import httpx
  9. import requests
  10. from playwright.async_api import BrowserContext, Page
  11. # import config
  12. from tools import utils
  13. # from .exception import DataFetchError, IPBlockError
  14. from .field import SearchNoteType, SearchSortType
  15. from .help import get_search_id, sign, Des, GenerateCurve
  16. from .rotate_ident import invoke_ident_api
  17. class XiaoHongShuClient:
  18. def __init__(
  19. self,
  20. timeout=10,
  21. proxies=None,
  22. *,
  23. headers: Dict[str, str],
  24. playwright_page: Page,
  25. rotate_ident,
  26. cookie_dict: Dict[str, str],
  27. ):
  28. self.xsec_token = None
  29. self.proxies = proxies
  30. self.timeout = timeout
  31. self.headers = headers
  32. self._host = "https://edith.xiaohongshu.com"
  33. self._domain = "https://www.xiaohongshu.com"
  34. self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
  35. self.IP_ERROR_CODE = 300012
  36. self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
  37. self.NOTE_ABNORMAL_CODE = -510001
  38. self.playwright_page = playwright_page
  39. self.cookie_dict = cookie_dict
  40. self.des = Des()
  41. self.rotate_ident = rotate_ident
  42. def _pre_headers(self, url: str, data=None) -> Dict:
  43. """
  44. 请求头参数签名
  45. Args:
  46. url:
  47. data:
  48. Returns:
  49. """
  50. encrypt_params = self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
  51. local_storage = self.playwright_page.evaluate("() => window.localStorage")
  52. signs = sign(
  53. a1=self.cookie_dict.get("a1", ""),
  54. b1=local_storage.get("b1", ""),
  55. x_s=encrypt_params.get("X-s", ""),
  56. x_t=str(encrypt_params.get("X-t", ""))
  57. )
  58. headers = {
  59. "X-S": signs["x-s"],
  60. "X-T": signs["x-t"],
  61. "x-S-Common": signs["x-s-common"],
  62. "X-B3-Traceid": signs["x-b3-traceid"]
  63. }
  64. self.headers.update(headers)
  65. return self.headers
  66. def request(self, method, url, need_check=True, **kwargs) -> Union[str, Any]:
  67. """
  68. 封装httpx的公共请求方法,对请求响应做一些处理
  69. Args:
  70. method: 请求方法
  71. url: 请求的URL
  72. need_check: need check 461
  73. **kwargs: 其他请求参数,例如请求头、请求体等
  74. Returns:
  75. """
  76. # return response.text
  77. return_response = kwargs.pop('return_response', False)
  78. with httpx.Client(proxies=self.proxies) as client:
  79. response = client.request(
  80. method, url, timeout=self.timeout,
  81. **kwargs
  82. )
  83. if return_response:
  84. return response.text
  85. if response.status_code == 461 and need_check:
  86. self.verify()
  87. data: Dict = response.json()
  88. if data["success"]:
  89. return data.get("data", data.get("success", {}))
  90. elif data["code"] == self.IP_ERROR_CODE:
  91. raise Exception(self.IP_ERROR_STR)
  92. else:
  93. raise Exception(data.get("msg", None))
  94. async def get(self, uri: str, params=None) -> Dict:
  95. """
  96. GET请求,对请求头签名
  97. Args:
  98. uri: 请求路由
  99. params: 请求参数
  100. Returns:
  101. """
  102. final_uri = uri
  103. if isinstance(params, dict):
  104. final_uri = (f"{uri}?"
  105. f"{urlencode(params)}")
  106. headers = self._pre_headers(final_uri)
  107. return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
  108. def post(self, uri: str, data: dict, need_check=True) -> Dict:
  109. """
  110. POST请求,对请求头签名
  111. Args:
  112. uri: 请求路由
  113. data: 请求体参数
  114. Returns:
  115. """
  116. headers = self._pre_headers(uri, data)
  117. json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  118. return self.request(method="POST", url=f"{self._host}{uri}", need_check=need_check,
  119. data=json_str, headers=headers)
  120. def update_xsec_token(self):
  121. """
  122. 更新token
  123. :return:
  124. """
  125. res = self.get_note_by_keyword('小红书')
  126. self.xsec_token = res.get('items')[0].get('xsec_token')
  127. async def get_note_media(self, url: str) -> Union[bytes, None]:
  128. async with httpx.AsyncClient(proxies=self.proxies) as client:
  129. response = await client.request("GET", url, timeout=self.timeout)
  130. if not response.reason_phrase == "OK":
  131. utils.logger.error(f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}")
  132. return None
  133. else:
  134. return response.content
  135. def pong(self) -> bool:
  136. """
  137. 用于检查登录态是否失效了
  138. Returns:
  139. """
  140. """get a note to check if login state is ok"""
  141. utils.logger.info("[XiaoHongShuClient.pong] Begin to pong xhs...")
  142. ping_flag = False
  143. try:
  144. note_card: Dict = self.get_note_by_keyword(keyword="小红书")
  145. if note_card.get("items"):
  146. ping_flag = True
  147. self.xsec_token = note_card.get('items')[0].get('xsec_token')
  148. except Exception as e:
  149. utils.logger.error(f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again...")
  150. ping_flag = False
  151. return ping_flag
  152. async def update_cookies(self, browser_context: BrowserContext):
  153. """
  154. API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法
  155. Args:
  156. browser_context: 浏览器上下文对象
  157. Returns:
  158. """
  159. cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
  160. self.headers["Cookie"] = cookie_str
  161. self.cookie_dict = cookie_dict
  162. def get_note_by_keyword(
  163. self, keyword: str,
  164. page: int = 1, page_size: int = 20,
  165. sort: SearchSortType = SearchSortType.GENERAL,
  166. note_type: SearchNoteType = SearchNoteType.ALL
  167. ) -> Dict:
  168. """
  169. 根据关键词搜索笔记
  170. Args:
  171. keyword: 关键词参数
  172. page: 分页第几页
  173. page_size: 分页数据长度
  174. sort: 搜索结果排序指定
  175. note_type: 搜索的笔记类型
  176. Returns:
  177. """
  178. uri = "/api/sns/web/v1/search/notes"
  179. data = {
  180. "keyword": keyword,
  181. "page": page,
  182. "page_size": page_size,
  183. "search_id": get_search_id(),
  184. "sort": sort.value,
  185. "note_type": note_type.value
  186. }
  187. return self.post(uri, data)
  188. def get_note_by_id(self, note_id: str) -> Dict:
  189. """
  190. 获取笔记详情API
  191. Args:
  192. note_id:笔记ID
  193. xsec_source: 渠道来源
  194. xsec_token: 搜索关键字之后返回的比较列表中返回的token
  195. Returns:
  196. """
  197. if self.xsec_token == None:
  198. self.update_xsec_token()
  199. data = {
  200. "source_note_id": note_id,
  201. "image_formats": ["jpg", "webp", "avif"],
  202. "extra": {"need_body_topic": 1},
  203. "xsec_source": "pc_search",
  204. "xsec_token": self.xsec_token
  205. }
  206. uri = "/api/sns/web/v1/feed"
  207. res = self.post(uri, data)
  208. if res and res.get("items"):
  209. res_dict: Dict = res["items"][0]["note_card"]
  210. return res_dict
  211. # 爬取频繁了可能会出现有的笔记能有结果有的没有
  212. utils.logger.error(f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}")
  213. return dict()
  214. async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
  215. """
  216. 获取一级评论的API
  217. Args:
  218. note_id: 笔记ID
  219. cursor: 分页游标
  220. Returns:
  221. """
  222. uri = "/api/sns/web/v2/comment/page"
  223. params = {
  224. "note_id": note_id,
  225. "cursor": cursor,
  226. "top_comment_id": "",
  227. "image_formats": "jpg,webp,avif"
  228. }
  229. return await self.get(uri, params)
  230. async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""):
  231. """
  232. 获取指定父评论下的子评论的API
  233. Args:
  234. note_id: 子评论的帖子ID
  235. root_comment_id: 根评论ID
  236. num: 分页数量
  237. cursor: 分页游标
  238. Returns:
  239. """
  240. uri = "/api/sns/web/v2/comment/sub/page"
  241. params = {
  242. "note_id": note_id,
  243. "root_comment_id": root_comment_id,
  244. "num": num,
  245. "cursor": cursor,
  246. }
  247. return await self.get(uri, params)
  248. async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0,
  249. callback: Optional[Callable] = None) -> List[Dict]:
  250. """
  251. 获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
  252. Args:
  253. note_id: 笔记ID
  254. crawl_interval: 爬取一次笔记的延迟单位(秒)
  255. callback: 一次笔记爬取结束后
  256. Returns:
  257. """
  258. result = []
  259. comments_has_more = True
  260. comments_cursor = ""
  261. while comments_has_more:
  262. comments_res = await self.get_note_comments(note_id, comments_cursor)
  263. comments_has_more = comments_res.get("has_more", False)
  264. comments_cursor = comments_res.get("cursor", "")
  265. if "comments" not in comments_res:
  266. utils.logger.info(
  267. f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
  268. break
  269. comments = comments_res["comments"]
  270. if callback:
  271. await callback(note_id, comments)
  272. await asyncio.sleep(crawl_interval)
  273. result.extend(comments)
  274. sub_comments = await self.get_comments_all_sub_comments(comments, crawl_interval, callback)
  275. result.extend(sub_comments)
  276. return result
  277. async def get_comments_all_sub_comments(self, comments: List[Dict], crawl_interval: float = 1.0,
  278. callback: Optional[Callable] = None) -> List[Dict]:
  279. """
  280. 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息
  281. Args:
  282. comments: 评论列表
  283. crawl_interval: 爬取一次评论的延迟单位(秒)
  284. callback: 一次评论爬取结束后
  285. Returns:
  286. """
  287. return []
  288. # if True:
  289. # utils.logger.info(
  290. # f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
  291. # return []
  292. #
  293. # result = []
  294. # for comment in comments:
  295. # note_id = comment.get("note_id")
  296. # sub_comments = comment.get("sub_comments")
  297. # if sub_comments and callback:
  298. # await callback(note_id, sub_comments)
  299. #
  300. # sub_comment_has_more = comment.get("sub_comment_has_more")
  301. # if not sub_comment_has_more:
  302. # continue
  303. #
  304. # root_comment_id = comment.get("id")
  305. # sub_comment_cursor = comment.get("sub_comment_cursor")
  306. #
  307. # while sub_comment_has_more:
  308. # comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor)
  309. # sub_comment_has_more = comments_res.get("has_more", False)
  310. # sub_comment_cursor = comments_res.get("cursor", "")
  311. # if "comments" not in comments_res:
  312. # utils.logger.info(
  313. # f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}")
  314. # break
  315. # comments = comments_res["comments"]
  316. # if callback:
  317. # await callback(note_id, comments)
  318. # await asyncio.sleep(crawl_interval)
  319. # result.extend(comments)
  320. # return result
  321. async def get_creator_info(self, user_id: str) -> Dict:
  322. """
  323. 通过解析网页版的用户主页HTML,获取用户个人简要信息
  324. PC端用户主页的网页存在window.__INITIAL_STATE__这个变量上的,解析它即可
  325. eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
  326. """
  327. uri = f"/user/profile/{user_id}"
  328. html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers)
  329. match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', html_content, re.M)
  330. if match is None:
  331. return {}
  332. info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
  333. if info is None:
  334. return {}
  335. return info.get('user').get('userPageData')
  336. async def get_notes_by_creator(
  337. self, creator: str,
  338. cursor: str,
  339. page_size: int = 30
  340. ) -> Dict:
  341. """
  342. 获取博主的笔记
  343. Args:
  344. creator: 博主ID
  345. cursor: 上一页最后一条笔记的ID
  346. page_size: 分页数据长度
  347. Returns:
  348. """
  349. uri = "/api/sns/web/v1/user_posted"
  350. data = {
  351. "user_id": creator,
  352. "cursor": cursor,
  353. "num": page_size,
  354. "image_formats": "jpg,webp,avif"
  355. }
  356. return await self.get(uri, data)
  357. async def get_all_notes_by_creator(self, user_id: str, crawl_interval: float = 1.0,
  358. callback: Optional[Callable] = None) -> List[Dict]:
  359. """
  360. 获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息
  361. Args:
  362. user_id: 用户ID
  363. crawl_interval: 爬取一次的延迟单位(秒)
  364. callback: 一次分页爬取结束后的更新回调函数
  365. Returns:
  366. """
  367. result = []
  368. notes_has_more = True
  369. notes_cursor = ""
  370. while notes_has_more:
  371. notes_res = await self.get_notes_by_creator(user_id, notes_cursor)
  372. if not notes_res:
  373. utils.logger.error(
  374. f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
  375. break
  376. notes_has_more = notes_res.get("has_more", False)
  377. notes_cursor = notes_res.get("cursor", "")
  378. if "notes" not in notes_res:
  379. utils.logger.info(
  380. f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
  381. break
  382. notes = notes_res["notes"]
  383. utils.logger.info(
  384. f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}")
  385. if callback:
  386. await callback(notes)
  387. await asyncio.sleep(crawl_interval)
  388. result.extend(notes)
  389. return result
  390. def verify(self):
  391. image = self.get_image()
  392. self.check(image)
  393. def get_image(self):
  394. json_data = {
  395. 'secretId': '000',
  396. 'verifyType': '102',
  397. 'verifyUuid': '',
  398. 'verifyBiz': '461',
  399. 'sourceSite': '',
  400. }
  401. response = self.post('/api/redcaptcha/v2/captcha/register',
  402. need_check=False, data=json_data)
  403. logging.info(f"get image:{response}")
  404. captchaInfo = response["captchaInfo"]
  405. self.rid = response["rid"]
  406. image_Info = self.des.decrypt("captchaInfo", captchaInfo)
  407. captchaUrl = json.loads(image_Info)["captchaUrl"]
  408. logging.info('captchaUrl:' + captchaUrl)
  409. return captchaUrl
  410. def check(self, img_url:str):
  411. img = self.rotate_ident.do_download_img(img_url)
  412. response = invoke_ident_api(img)
  413. angle = int(str(response['data']['res_str']).replace('顺时针旋转', '').replace('度', ''))
  414. rate = angle / 360
  415. distance = int(rate * 227)
  416. gen_track = GenerateCurve([0, 0], [distance, 2], [], int(rate * 150)).main()
  417. track = self.des.encrypt("track", json.dumps(gen_track, separators=(",", ":")))
  418. mouseEnd = self.des.encrypt("mouseEnd", str(distance))
  419. time_ = self.des.encrypt("time", str(gen_track[-1][-1] + 199))
  420. # track = 'P/h0WtKGfU29TgYTjGjG0SIRuELz+YlGj5wZhyl7cM+TXhklOaVpyVwuJrxtRQt8Y0t70fBllPiJYUJZq9XVkPO75tSfvK/mpSkEUXImjF+CnyPZBsAyiUNDFOYPMxF21DU7qp5ZoZMIcAD+Wm5M18s1ctGnk1jK1RugqdwJB412H2H18XTdygGmddBt3KreAoWark9jiba6IjEn5ZLssMlBCn9fRZPwdWNqjre4dGlscChV8wuwXAxz4hIYRI+VgPnX1hossQeX0TBfk0M4f8hFRxZm35d47lwfaqRtYbPsUM5/G8471ViSwStrg+WpckZNBfWs/1cg/wBBJKvss+su/oIF7+NpxMaqryLQW7MSz/F5ejfR8FQCm4/Sp+6tmFf65sEuXAmaHkGYNp5CoabW1AGBW4t4gfB7QLI5PIYZMRRvfsGdGJ1khSEqgiYIBVo3645clzayHwEhpaV4sDu/p6HryP2FcAIw2FL80Q2mWdSiP9ZBSufS+4eAcvz5aIWYKcFRW5wy1QfABc86r9XIlS8kGyQJcE3UPer5JigDqitX19C1FDniYkGaDxanIi1ob3EC2C7BF6pDBUnbaSOhL/8DtzbyfPMI5yAVMcD6ZWPpzDFXbvupYGsjZQP40dwPYdZhtwbTX8ED4FZ5gW7bvHa9AJmADaieOhR6WUKb+MF5NqcNxsPJKc4rJCJwS0DX3DBW9gjL3Zi+tjqvp0RF+ge0On2tLedMgtWi+wFa671XlOakCpD7yYX+CktvhPrTz/Jp8vxVuRaKIX32aPLU3S7xCLJtSiQ2f7xm/efxrUU0h7IjHVlIQLz489UJ26IyKZnzVntmte1UoU9tjZ0WzVSyEEx0l/F0LnN9ZJvbPdMKFWWekZrdHns2/vytsecoqJUibkdpE4h0Xet3NmOkoNzsApY7BDCx1BRnYd5YqWCsh5J594qxTZXca9RcySZlND1s+XCOKskO0ob2EHp+VkLzaHI5gCJGGcJUCSLlc+xxePCIbrjCd4uK2tuCUgLiW9GsZd8MBcQHjJkiA0CUr8CBjHN0asxelR5WZjOcN/cYK2DsoLtHiYjSa2ivbDr5/czi2+x6e8om9MXBDQ/+YN7NWe6e/gznqnkVLfdR5Jpf/vBBxtx08ctETmYtT80BXyv2QZFrSjSxOIvEiluML8mVqvAKSTYh8y4vsYMHEcT4jF0plsn+LOGvzSjhSe9TCvgQ/fyIJ/Z0cJwh0OUkXP5lYqHvvZLgrUKEp6B6riSP4PbVUieUB3PGaXG9e2lV8TR9afre/L8nQolNLc+p08nb8xbqNOIRyrGhSCh68wNpkbfBUUpwy3YkLkkEBHqPYSE/cp4FFAVbSg4/AA=='
  421. json_data = {
  422. 'rid': self.rid,
  423. 'verifyType': 102,
  424. 'verifyBiz': '461',
  425. 'verifyUuid': '',
  426. 'sourceSite': '',
  427. 'captchaInfo': '{"mouseEnd":"' + mouseEnd + '","time":"' + time_ + '","track":"' + track + '","width":"w1qZrykOUIU="}'
  428. }
  429. response = self.post('/api/redcaptcha/v2/captcha/check', need_check=False,
  430. data=json_data)
  431. logging.info(f'check:{response}')