client.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. import asyncio
  2. import json
  3. import re
  4. from typing import Any, Callable, Dict, List, Optional, Union
  5. from urllib.parse import urlencode
  6. import httpx
  7. from playwright.async_api import BrowserContext, Page
  8. # import config
  9. from tools import utils
  10. # from .exception import DataFetchError, IPBlockError
  11. from .field import SearchNoteType, SearchSortType
  12. from .help import get_search_id, sign
  13. class XiaoHongShuClient:
  14. def __init__(
  15. self,
  16. timeout=10,
  17. proxies=None,
  18. *,
  19. headers: Dict[str, str],
  20. playwright_page: Page,
  21. cookie_dict: Dict[str, str],
  22. ):
  23. self.xsec_token = None
  24. self.proxies = proxies
  25. self.timeout = timeout
  26. self.headers = headers
  27. self._host = "https://edith.xiaohongshu.com"
  28. self._domain = "https://www.xiaohongshu.com"
  29. self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
  30. self.IP_ERROR_CODE = 300012
  31. self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
  32. self.NOTE_ABNORMAL_CODE = -510001
  33. self.playwright_page = playwright_page
  34. self.cookie_dict = cookie_dict
  35. def _pre_headers(self, url: str, data=None) -> Dict:
  36. """
  37. 请求头参数签名
  38. Args:
  39. url:
  40. data:
  41. Returns:
  42. """
  43. encrypt_params = self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
  44. local_storage = self.playwright_page.evaluate("() => window.localStorage")
  45. signs = sign(
  46. a1=self.cookie_dict.get("a1", ""),
  47. b1=local_storage.get("b1", ""),
  48. x_s=encrypt_params.get("X-s", ""),
  49. x_t=str(encrypt_params.get("X-t", ""))
  50. )
  51. headers = {
  52. "X-S": signs["x-s"],
  53. "X-T": signs["x-t"],
  54. "x-S-Common": signs["x-s-common"],
  55. "X-B3-Traceid": signs["x-b3-traceid"]
  56. }
  57. self.headers.update(headers)
  58. return self.headers
  59. def request(self, method, url, **kwargs) -> Union[str, Any]:
  60. """
  61. 封装httpx的公共请求方法,对请求响应做一些处理
  62. Args:
  63. method: 请求方法
  64. url: 请求的URL
  65. **kwargs: 其他请求参数,例如请求头、请求体等
  66. Returns:
  67. """
  68. # return response.text
  69. return_response = kwargs.pop('return_response', False)
  70. with httpx.Client(proxies=self.proxies) as client:
  71. response = client.request(
  72. method, url, timeout=self.timeout,
  73. **kwargs
  74. )
  75. if return_response:
  76. return response.text
  77. data: Dict = response.json()
  78. if data["success"]:
  79. return data.get("data", data.get("success", {}))
  80. elif data["code"] == self.IP_ERROR_CODE:
  81. raise Exception(self.IP_ERROR_STR)
  82. else:
  83. raise Exception(data.get("msg", None))
  84. async def get(self, uri: str, params=None) -> Dict:
  85. """
  86. GET请求,对请求头签名
  87. Args:
  88. uri: 请求路由
  89. params: 请求参数
  90. Returns:
  91. """
  92. final_uri = uri
  93. if isinstance(params, dict):
  94. final_uri = (f"{uri}?"
  95. f"{urlencode(params)}")
  96. headers = self._pre_headers(final_uri)
  97. return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
  98. def post(self, uri: str, data: dict) -> Dict:
  99. """
  100. POST请求,对请求头签名
  101. Args:
  102. uri: 请求路由
  103. data: 请求体参数
  104. Returns:
  105. """
  106. headers = self._pre_headers(uri, data)
  107. json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  108. return self.request(method="POST", url=f"{self._host}{uri}",
  109. data=json_str, headers=headers)
  110. def update_xsec_token(self):
  111. """
  112. 更新token
  113. :return:
  114. """
  115. res = self.get_note_by_keyword('车')
  116. self.xsec_token = res.get('items')[0].get('xsec_token')
  117. async def get_note_media(self, url: str) -> Union[bytes, None]:
  118. async with httpx.AsyncClient(proxies=self.proxies) as client:
  119. response = await client.request("GET", url, timeout=self.timeout)
  120. if not response.reason_phrase == "OK":
  121. utils.logger.error(f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}")
  122. return None
  123. else:
  124. return response.content
  125. def pong(self) -> bool:
  126. """
  127. 用于检查登录态是否失效了
  128. Returns:
  129. """
  130. """get a note to check if login state is ok"""
  131. utils.logger.info("[XiaoHongShuClient.pong] Begin to pong xhs...")
  132. ping_flag = False
  133. try:
  134. note_card: Dict = self.get_note_by_keyword(keyword="小红书")
  135. if note_card.get("items"):
  136. ping_flag = True
  137. self.xsec_token = note_card.get('items')[0].get('xsec_token')
  138. except Exception as e:
  139. utils.logger.error(f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again...")
  140. ping_flag = False
  141. return ping_flag
  142. async def update_cookies(self, browser_context: BrowserContext):
  143. """
  144. API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法
  145. Args:
  146. browser_context: 浏览器上下文对象
  147. Returns:
  148. """
  149. cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
  150. self.headers["Cookie"] = cookie_str
  151. self.cookie_dict = cookie_dict
  152. def get_note_by_keyword(
  153. self, keyword: str,
  154. page: int = 1, page_size: int = 20,
  155. sort: SearchSortType = SearchSortType.GENERAL,
  156. note_type: SearchNoteType = SearchNoteType.ALL
  157. ) -> Dict:
  158. """
  159. 根据关键词搜索笔记
  160. Args:
  161. keyword: 关键词参数
  162. page: 分页第几页
  163. page_size: 分页数据长度
  164. sort: 搜索结果排序指定
  165. note_type: 搜索的笔记类型
  166. Returns:
  167. """
  168. uri = "/api/sns/web/v1/search/notes"
  169. data = {
  170. "keyword": keyword,
  171. "page": page,
  172. "page_size": page_size,
  173. "search_id": get_search_id(),
  174. "sort": sort.value,
  175. "note_type": note_type.value
  176. }
  177. return self.post(uri, data)
  178. def get_note_by_id(self, note_id: str) -> Dict:
  179. """
  180. 获取笔记详情API
  181. Args:
  182. note_id:笔记ID
  183. xsec_source: 渠道来源
  184. xsec_token: 搜索关键字之后返回的比较列表中返回的token
  185. Returns:
  186. """
  187. if self.xsec_token == None:
  188. self.update_xsec_token()
  189. data = {
  190. "source_note_id": note_id,
  191. "image_formats": ["jpg", "webp", "avif"],
  192. "extra": {"need_body_topic": 1},
  193. "xsec_source": "pc_search",
  194. "xsec_token": self.xsec_token
  195. }
  196. uri = "/api/sns/web/v1/feed"
  197. res = self.post(uri, data)
  198. if res and res.get("items"):
  199. res_dict: Dict = res["items"][0]["note_card"]
  200. return res_dict
  201. # 爬取频繁了可能会出现有的笔记能有结果有的没有
  202. utils.logger.error(f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}")
  203. return dict()
  204. async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
  205. """
  206. 获取一级评论的API
  207. Args:
  208. note_id: 笔记ID
  209. cursor: 分页游标
  210. Returns:
  211. """
  212. uri = "/api/sns/web/v2/comment/page"
  213. params = {
  214. "note_id": note_id,
  215. "cursor": cursor,
  216. "top_comment_id": "",
  217. "image_formats": "jpg,webp,avif"
  218. }
  219. return await self.get(uri, params)
  220. async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""):
  221. """
  222. 获取指定父评论下的子评论的API
  223. Args:
  224. note_id: 子评论的帖子ID
  225. root_comment_id: 根评论ID
  226. num: 分页数量
  227. cursor: 分页游标
  228. Returns:
  229. """
  230. uri = "/api/sns/web/v2/comment/sub/page"
  231. params = {
  232. "note_id": note_id,
  233. "root_comment_id": root_comment_id,
  234. "num": num,
  235. "cursor": cursor,
  236. }
  237. return await self.get(uri, params)
  238. async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0,
  239. callback: Optional[Callable] = None) -> List[Dict]:
  240. """
  241. 获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
  242. Args:
  243. note_id: 笔记ID
  244. crawl_interval: 爬取一次笔记的延迟单位(秒)
  245. callback: 一次笔记爬取结束后
  246. Returns:
  247. """
  248. result = []
  249. comments_has_more = True
  250. comments_cursor = ""
  251. while comments_has_more:
  252. comments_res = await self.get_note_comments(note_id, comments_cursor)
  253. comments_has_more = comments_res.get("has_more", False)
  254. comments_cursor = comments_res.get("cursor", "")
  255. if "comments" not in comments_res:
  256. utils.logger.info(
  257. f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
  258. break
  259. comments = comments_res["comments"]
  260. if callback:
  261. await callback(note_id, comments)
  262. await asyncio.sleep(crawl_interval)
  263. result.extend(comments)
  264. sub_comments = await self.get_comments_all_sub_comments(comments, crawl_interval, callback)
  265. result.extend(sub_comments)
  266. return result
  267. async def get_comments_all_sub_comments(self, comments: List[Dict], crawl_interval: float = 1.0,
  268. callback: Optional[Callable] = None) -> List[Dict]:
  269. """
  270. 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息
  271. Args:
  272. comments: 评论列表
  273. crawl_interval: 爬取一次评论的延迟单位(秒)
  274. callback: 一次评论爬取结束后
  275. Returns:
  276. """
  277. return []
  278. # if True:
  279. # utils.logger.info(
  280. # f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
  281. # return []
  282. #
  283. # result = []
  284. # for comment in comments:
  285. # note_id = comment.get("note_id")
  286. # sub_comments = comment.get("sub_comments")
  287. # if sub_comments and callback:
  288. # await callback(note_id, sub_comments)
  289. #
  290. # sub_comment_has_more = comment.get("sub_comment_has_more")
  291. # if not sub_comment_has_more:
  292. # continue
  293. #
  294. # root_comment_id = comment.get("id")
  295. # sub_comment_cursor = comment.get("sub_comment_cursor")
  296. #
  297. # while sub_comment_has_more:
  298. # comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor)
  299. # sub_comment_has_more = comments_res.get("has_more", False)
  300. # sub_comment_cursor = comments_res.get("cursor", "")
  301. # if "comments" not in comments_res:
  302. # utils.logger.info(
  303. # f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}")
  304. # break
  305. # comments = comments_res["comments"]
  306. # if callback:
  307. # await callback(note_id, comments)
  308. # await asyncio.sleep(crawl_interval)
  309. # result.extend(comments)
  310. # return result
  311. async def get_creator_info(self, user_id: str) -> Dict:
  312. """
  313. 通过解析网页版的用户主页HTML,获取用户个人简要信息
  314. PC端用户主页的网页存在window.__INITIAL_STATE__这个变量上的,解析它即可
  315. eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
  316. """
  317. uri = f"/user/profile/{user_id}"
  318. html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers)
  319. match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', html_content, re.M)
  320. if match is None:
  321. return {}
  322. info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
  323. if info is None:
  324. return {}
  325. return info.get('user').get('userPageData')
  326. async def get_notes_by_creator(
  327. self, creator: str,
  328. cursor: str,
  329. page_size: int = 30
  330. ) -> Dict:
  331. """
  332. 获取博主的笔记
  333. Args:
  334. creator: 博主ID
  335. cursor: 上一页最后一条笔记的ID
  336. page_size: 分页数据长度
  337. Returns:
  338. """
  339. uri = "/api/sns/web/v1/user_posted"
  340. data = {
  341. "user_id": creator,
  342. "cursor": cursor,
  343. "num": page_size,
  344. "image_formats": "jpg,webp,avif"
  345. }
  346. return await self.get(uri, data)
  347. async def get_all_notes_by_creator(self, user_id: str, crawl_interval: float = 1.0,
  348. callback: Optional[Callable] = None) -> List[Dict]:
  349. """
  350. 获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息
  351. Args:
  352. user_id: 用户ID
  353. crawl_interval: 爬取一次的延迟单位(秒)
  354. callback: 一次分页爬取结束后的更新回调函数
  355. Returns:
  356. """
  357. result = []
  358. notes_has_more = True
  359. notes_cursor = ""
  360. while notes_has_more:
  361. notes_res = await self.get_notes_by_creator(user_id, notes_cursor)
  362. if not notes_res:
  363. utils.logger.error(
  364. f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
  365. break
  366. notes_has_more = notes_res.get("has_more", False)
  367. notes_cursor = notes_res.get("cursor", "")
  368. if "notes" not in notes_res:
  369. utils.logger.info(
  370. f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
  371. break
  372. notes = notes_res["notes"]
  373. utils.logger.info(
  374. f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}")
  375. if callback:
  376. await callback(notes)
  377. await asyncio.sleep(crawl_interval)
  378. result.extend(notes)
  379. return result