123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492 |
- import asyncio
- import json
- import logging
- import re
- import time
- from typing import Any, Callable, Dict, List, Optional, Union
- from urllib.parse import urlencode
- import httpx
- import requests
- from playwright.async_api import BrowserContext, Page
- # import config
- from tools import utils
- # from .exception import DataFetchError, IPBlockError
- from .field import SearchNoteType, SearchSortType
- from .help import get_search_id, sign, Des, GenerateCurve
- from .rotate_ident import invoke_ident_api
- class XiaoHongShuClient:
- def __init__(
- self,
- timeout=10,
- proxies=None,
- *,
- headers: Dict[str, str],
- playwright_page: Page,
- rotate_ident,
- cookie_dict: Dict[str, str],
- ):
- self.xsec_token = None
- self.proxies = proxies
- self.timeout = timeout
- self.headers = headers
- self._host = "https://edith.xiaohongshu.com"
- self._domain = "https://www.xiaohongshu.com"
- self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
- self.IP_ERROR_CODE = 300012
- self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
- self.NOTE_ABNORMAL_CODE = -510001
- self.playwright_page = playwright_page
- self.cookie_dict = cookie_dict
- self.des = Des()
- self.rotate_ident = rotate_ident
- def _pre_headers(self, url: str, data=None) -> Dict:
- """
- 请求头参数签名
- Args:
- url:
- data:
- Returns:
- """
- encrypt_params = self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
- local_storage = self.playwright_page.evaluate("() => window.localStorage")
- signs = sign(
- a1=self.cookie_dict.get("a1", ""),
- b1=local_storage.get("b1", ""),
- x_s=encrypt_params.get("X-s", ""),
- x_t=str(encrypt_params.get("X-t", ""))
- )
- headers = {
- "X-S": signs["x-s"],
- "X-T": signs["x-t"],
- "x-S-Common": signs["x-s-common"],
- "X-B3-Traceid": signs["x-b3-traceid"]
- }
- self.headers.update(headers)
- return self.headers
- def request(self, method, url, need_check=True, **kwargs) -> Union[str, Any]:
- """
- 封装httpx的公共请求方法,对请求响应做一些处理
- Args:
- method: 请求方法
- url: 请求的URL
- need_check: need check 461
- **kwargs: 其他请求参数,例如请求头、请求体等
- Returns:
- """
- # return response.text
- return_response = kwargs.pop('return_response', False)
- with httpx.Client(proxies=self.proxies) as client:
- response = client.request(
- method, url, timeout=self.timeout,
- **kwargs
- )
- if return_response:
- return response.text
- if response.status_code == 461 and need_check:
- self.verify()
- data: Dict = response.json()
- if data["success"]:
- return data.get("data", data.get("success", {}))
- elif data["code"] == self.IP_ERROR_CODE:
- raise Exception(self.IP_ERROR_STR)
- else:
- raise Exception(data.get("msg", None))
- async def get(self, uri: str, params=None) -> Dict:
- """
- GET请求,对请求头签名
- Args:
- uri: 请求路由
- params: 请求参数
- Returns:
- """
- final_uri = uri
- if isinstance(params, dict):
- final_uri = (f"{uri}?"
- f"{urlencode(params)}")
- headers = self._pre_headers(final_uri)
- return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
- def post(self, uri: str, data: dict, need_check=True) -> Dict:
- """
- POST请求,对请求头签名
- Args:
- uri: 请求路由
- data: 请求体参数
- Returns:
- """
- headers = self._pre_headers(uri, data)
- json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
- return self.request(method="POST", url=f"{self._host}{uri}", need_check=need_check,
- data=json_str, headers=headers)
- def update_xsec_token(self):
- """
- 更新token
- :return:
- """
- res = self.get_note_by_keyword('小红书')
- self.xsec_token = res.get('items')[0].get('xsec_token')
- async def get_note_media(self, url: str) -> Union[bytes, None]:
- async with httpx.AsyncClient(proxies=self.proxies) as client:
- response = await client.request("GET", url, timeout=self.timeout)
- if not response.reason_phrase == "OK":
- utils.logger.error(f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}")
- return None
- else:
- return response.content
- def pong(self) -> bool:
- """
- 用于检查登录态是否失效了
- Returns:
- """
- """get a note to check if login state is ok"""
- utils.logger.info("[XiaoHongShuClient.pong] Begin to pong xhs...")
- ping_flag = False
- try:
- note_card: Dict = self.get_note_by_keyword(keyword="小红书")
- if note_card.get("items"):
- ping_flag = True
- self.xsec_token = note_card.get('items')[0].get('xsec_token')
- except Exception as e:
- utils.logger.error(f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again...")
- ping_flag = False
- return ping_flag
- async def update_cookies(self, browser_context: BrowserContext):
- """
- API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法
- Args:
- browser_context: 浏览器上下文对象
- Returns:
- """
- cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
- self.headers["Cookie"] = cookie_str
- self.cookie_dict = cookie_dict
- def get_note_by_keyword(
- self, keyword: str,
- page: int = 1, page_size: int = 20,
- sort: SearchSortType = SearchSortType.GENERAL,
- note_type: SearchNoteType = SearchNoteType.ALL
- ) -> Dict:
- """
- 根据关键词搜索笔记
- Args:
- keyword: 关键词参数
- page: 分页第几页
- page_size: 分页数据长度
- sort: 搜索结果排序指定
- note_type: 搜索的笔记类型
- Returns:
- """
- uri = "/api/sns/web/v1/search/notes"
- data = {
- "keyword": keyword,
- "page": page,
- "page_size": page_size,
- "search_id": get_search_id(),
- "sort": sort.value,
- "note_type": note_type.value
- }
- return self.post(uri, data)
- def get_note_by_id(self, note_id: str) -> Dict:
- """
- 获取笔记详情API
- Args:
- note_id:笔记ID
- xsec_source: 渠道来源
- xsec_token: 搜索关键字之后返回的比较列表中返回的token
- Returns:
- """
- if self.xsec_token == None:
- self.update_xsec_token()
- data = {
- "source_note_id": note_id,
- "image_formats": ["jpg", "webp", "avif"],
- "extra": {"need_body_topic": 1},
- "xsec_source": "pc_search",
- "xsec_token": self.xsec_token
- }
- uri = "/api/sns/web/v1/feed"
- res = self.post(uri, data)
- if res and res.get("items"):
- res_dict: Dict = res["items"][0]["note_card"]
- return res_dict
- # 爬取频繁了可能会出现有的笔记能有结果有的没有
- utils.logger.error(f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}")
- return dict()
- async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
- """
- 获取一级评论的API
- Args:
- note_id: 笔记ID
- cursor: 分页游标
- Returns:
- """
- uri = "/api/sns/web/v2/comment/page"
- params = {
- "note_id": note_id,
- "cursor": cursor,
- "top_comment_id": "",
- "image_formats": "jpg,webp,avif"
- }
- return await self.get(uri, params)
- async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""):
- """
- 获取指定父评论下的子评论的API
- Args:
- note_id: 子评论的帖子ID
- root_comment_id: 根评论ID
- num: 分页数量
- cursor: 分页游标
- Returns:
- """
- uri = "/api/sns/web/v2/comment/sub/page"
- params = {
- "note_id": note_id,
- "root_comment_id": root_comment_id,
- "num": num,
- "cursor": cursor,
- }
- return await self.get(uri, params)
- async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0,
- callback: Optional[Callable] = None) -> List[Dict]:
- """
- 获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
- Args:
- note_id: 笔记ID
- crawl_interval: 爬取一次笔记的延迟单位(秒)
- callback: 一次笔记爬取结束后
- Returns:
- """
- result = []
- comments_has_more = True
- comments_cursor = ""
- while comments_has_more:
- comments_res = await self.get_note_comments(note_id, comments_cursor)
- comments_has_more = comments_res.get("has_more", False)
- comments_cursor = comments_res.get("cursor", "")
- if "comments" not in comments_res:
- utils.logger.info(
- f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
- break
- comments = comments_res["comments"]
- if callback:
- await callback(note_id, comments)
- await asyncio.sleep(crawl_interval)
- result.extend(comments)
- sub_comments = await self.get_comments_all_sub_comments(comments, crawl_interval, callback)
- result.extend(sub_comments)
- return result
- async def get_comments_all_sub_comments(self, comments: List[Dict], crawl_interval: float = 1.0,
- callback: Optional[Callable] = None) -> List[Dict]:
- """
- 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息
- Args:
- comments: 评论列表
- crawl_interval: 爬取一次评论的延迟单位(秒)
- callback: 一次评论爬取结束后
- Returns:
- """
- return []
- # if True:
- # utils.logger.info(
- # f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
- # return []
- #
- # result = []
- # for comment in comments:
- # note_id = comment.get("note_id")
- # sub_comments = comment.get("sub_comments")
- # if sub_comments and callback:
- # await callback(note_id, sub_comments)
- #
- # sub_comment_has_more = comment.get("sub_comment_has_more")
- # if not sub_comment_has_more:
- # continue
- #
- # root_comment_id = comment.get("id")
- # sub_comment_cursor = comment.get("sub_comment_cursor")
- #
- # while sub_comment_has_more:
- # comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor)
- # sub_comment_has_more = comments_res.get("has_more", False)
- # sub_comment_cursor = comments_res.get("cursor", "")
- # if "comments" not in comments_res:
- # utils.logger.info(
- # f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}")
- # break
- # comments = comments_res["comments"]
- # if callback:
- # await callback(note_id, comments)
- # await asyncio.sleep(crawl_interval)
- # result.extend(comments)
- # return result
- async def get_creator_info(self, user_id: str) -> Dict:
- """
- 通过解析网页版的用户主页HTML,获取用户个人简要信息
- PC端用户主页的网页存在window.__INITIAL_STATE__这个变量上的,解析它即可
- eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
- """
- uri = f"/user/profile/{user_id}"
- html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers)
- match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', html_content, re.M)
- if match is None:
- return {}
- info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
- if info is None:
- return {}
- return info.get('user').get('userPageData')
- async def get_notes_by_creator(
- self, creator: str,
- cursor: str,
- page_size: int = 30
- ) -> Dict:
- """
- 获取博主的笔记
- Args:
- creator: 博主ID
- cursor: 上一页最后一条笔记的ID
- page_size: 分页数据长度
- Returns:
- """
- uri = "/api/sns/web/v1/user_posted"
- data = {
- "user_id": creator,
- "cursor": cursor,
- "num": page_size,
- "image_formats": "jpg,webp,avif"
- }
- return await self.get(uri, data)
- async def get_all_notes_by_creator(self, user_id: str, crawl_interval: float = 1.0,
- callback: Optional[Callable] = None) -> List[Dict]:
- """
- 获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息
- Args:
- user_id: 用户ID
- crawl_interval: 爬取一次的延迟单位(秒)
- callback: 一次分页爬取结束后的更新回调函数
- Returns:
- """
- result = []
- notes_has_more = True
- notes_cursor = ""
- while notes_has_more:
- notes_res = await self.get_notes_by_creator(user_id, notes_cursor)
- if not notes_res:
- utils.logger.error(
- f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
- break
- notes_has_more = notes_res.get("has_more", False)
- notes_cursor = notes_res.get("cursor", "")
- if "notes" not in notes_res:
- utils.logger.info(
- f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
- break
- notes = notes_res["notes"]
- utils.logger.info(
- f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}")
- if callback:
- await callback(notes)
- await asyncio.sleep(crawl_interval)
- result.extend(notes)
- return result
- def verify(self):
- image = self.get_image()
- self.check(image)
- def get_image(self):
- json_data = {
- 'secretId': '000',
- 'verifyType': '102',
- 'verifyUuid': '',
- 'verifyBiz': '461',
- 'sourceSite': '',
- }
- response = self.post('/api/redcaptcha/v2/captcha/register',
- need_check=False, data=json_data)
- logging.info(f"get image:{response}")
- captchaInfo = response["captchaInfo"]
- self.rid = response["rid"]
- image_Info = self.des.decrypt("captchaInfo", captchaInfo)
- captchaUrl = json.loads(image_Info)["captchaUrl"]
- logging.info('captchaUrl:' + captchaUrl)
- return captchaUrl
- def check(self, img_url:str):
- img = self.rotate_ident.do_download_img(img_url)
- response = invoke_ident_api(img)
- angle = int(str(response['data']['res_str']).replace('顺时针旋转', '').replace('度', ''))
- rate = angle / 360
- distance = int(rate * 227)
- gen_track = GenerateCurve([0, 0], [distance, 2], [], int(rate * 150)).main()
- track = self.des.encrypt("track", json.dumps(gen_track, separators=(",", ":")))
- mouseEnd = self.des.encrypt("mouseEnd", str(distance))
- time_ = self.des.encrypt("time", str(gen_track[-1][-1] + 199))
- # track = 'P/h0WtKGfU29TgYTjGjG0SIRuELz+YlGj5wZhyl7cM+TXhklOaVpyVwuJrxtRQt8Y0t70fBllPiJYUJZq9XVkPO75tSfvK/mpSkEUXImjF+CnyPZBsAyiUNDFOYPMxF21DU7qp5ZoZMIcAD+Wm5M18s1ctGnk1jK1RugqdwJB412H2H18XTdygGmddBt3KreAoWark9jiba6IjEn5ZLssMlBCn9fRZPwdWNqjre4dGlscChV8wuwXAxz4hIYRI+VgPnX1hossQeX0TBfk0M4f8hFRxZm35d47lwfaqRtYbPsUM5/G8471ViSwStrg+WpckZNBfWs/1cg/wBBJKvss+su/oIF7+NpxMaqryLQW7MSz/F5ejfR8FQCm4/Sp+6tmFf65sEuXAmaHkGYNp5CoabW1AGBW4t4gfB7QLI5PIYZMRRvfsGdGJ1khSEqgiYIBVo3645clzayHwEhpaV4sDu/p6HryP2FcAIw2FL80Q2mWdSiP9ZBSufS+4eAcvz5aIWYKcFRW5wy1QfABc86r9XIlS8kGyQJcE3UPer5JigDqitX19C1FDniYkGaDxanIi1ob3EC2C7BF6pDBUnbaSOhL/8DtzbyfPMI5yAVMcD6ZWPpzDFXbvupYGsjZQP40dwPYdZhtwbTX8ED4FZ5gW7bvHa9AJmADaieOhR6WUKb+MF5NqcNxsPJKc4rJCJwS0DX3DBW9gjL3Zi+tjqvp0RF+ge0On2tLedMgtWi+wFa671XlOakCpD7yYX+CktvhPrTz/Jp8vxVuRaKIX32aPLU3S7xCLJtSiQ2f7xm/efxrUU0h7IjHVlIQLz489UJ26IyKZnzVntmte1UoU9tjZ0WzVSyEEx0l/F0LnN9ZJvbPdMKFWWekZrdHns2/vytsecoqJUibkdpE4h0Xet3NmOkoNzsApY7BDCx1BRnYd5YqWCsh5J594qxTZXca9RcySZlND1s+XCOKskO0ob2EHp+VkLzaHI5gCJGGcJUCSLlc+xxePCIbrjCd4uK2tuCUgLiW9GsZd8MBcQHjJkiA0CUr8CBjHN0asxelR5WZjOcN/cYK2DsoLtHiYjSa2ivbDr5/czi2+x6e8om9MXBDQ/+YN7NWe6e/gznqnkVLfdR5Jpf/vBBxtx08ctETmYtT80BXyv2QZFrSjSxOIvEiluML8mVqvAKSTYh8y4vsYMHEcT4jF0plsn+LOGvzSjhSe9TCvgQ/fyIJ/Z0cJwh0OUkXP5lYqHvvZLgrUKEp6B6riSP4PbVUieUB3PGaXG9e2lV8TR9afre/L8nQolNLc+p08nb8xbqNOIRyrGhSCh68wNpkbfBUUpwy3YkLkkEBHqPYSE/cp4FFAVbSg4/AA=='
- json_data = {
- 'rid': self.rid,
- 'verifyType': 102,
- 'verifyBiz': '461',
- 'verifyUuid': '',
- 'sourceSite': '',
- 'captchaInfo': '{"mouseEnd":"' + mouseEnd + '","time":"' + time_ + '","track":"' + track + '","width":"w1qZrykOUIU="}'
- }
- response = self.post('/api/redcaptcha/v2/captcha/check', need_check=False,
- data=json_data)
- logging.info(f'check:{response}')
|