Browse Source

refactor: 获取小红书改回使用playwright

wuwenyi 6 months ago
parent
commit
643fa1106d
5 changed files with 230 additions and 31 deletions
  1. 15 1
      api/search.py
  2. 37 23
      xhs/__init__.py
  3. 59 6
      xhs/client.py
  4. 113 0
      xhs/help.py
  5. 6 1
      xhs/rotate_ident.py

+ 15 - 1
api/search.py

@@ -25,4 +25,18 @@ def search_note():
     xhs_browser = XhsBrowser(lock_util.get_idle_phone('xhs'), playwright)
     xhs_browser = XhsBrowser(lock_util.get_idle_phone('xhs'), playwright)
     xhs_browser.polish_huitun_note(result)
     xhs_browser.polish_huitun_note(result)
     playwright.stop()
     playwright.stop()
-    return api.success(result)
+    return api.success(result)
+
+
+@search_opt.route('/note-info', methods=["POST"])
+def search_note_info():
+    """
+    根据笔记id获取笔记详情
+    :return:
+    """
+    with sync_playwright() as playwright:
+        request_body = request.json
+        xhs_browser = XhsBrowser(lock_util.get_idle_phone('xhs'), playwright=playwright)
+        xhs_browser.__init_browser__()
+        note = xhs_browser.get_note(request_body.get('noteId'))
+        return api.success(note)

+ 37 - 23
xhs/__init__.py

@@ -1,6 +1,7 @@
 """
 """
 小红书
 小红书
 """
 """
+import json
 import logging
 import logging
 from time import sleep
 from time import sleep
 from typing import Optional
 from typing import Optional
@@ -11,7 +12,7 @@ from browser import BaseBrowser
 from tools import utils
 from tools import utils
 from util.lock_util import LockManager
 from util.lock_util import LockManager
 from util.playwright_util import is_element_present
 from util.playwright_util import is_element_present
-from .client import XiaoHongShuClient
+# from .client import XiaoHongShuClient
 from .rotate_ident import RotateIdent
 from .rotate_ident import RotateIdent
 
 
 lock_manager = LockManager()
 lock_manager = LockManager()
@@ -28,28 +29,36 @@ class XhsBrowser(BaseBrowser):
 
 
     def __init_browser__(self):
     def __init_browser__(self):
         super().__init_browser__()
         super().__init_browser__()
-        self.xhs_client = self.create_xhs_client(None)
         self.rotate_ident = RotateIdent(self.page)
         self.rotate_ident = RotateIdent(self.page)
-        self.page.goto(XHS_URL)
+        # client存在406和416异常,未解决前暂时不用client
+        # self.xhs_client = self.create_xhs_client(None)
+        # self.page.goto('https://www.xiaohongshu.com/explore/66b4c36b000000001e01cf8a',wait_until='domcontentloaded')
         self.rotate_ident.handle_rotate()
         self.rotate_ident.handle_rotate()
+        self.browser.add_cookies([{
+            'name': "webId",
+            'value': "xxx123",  # any value
+            'domain': ".xiaohongshu.com",
+            'path': "/"
+        }])
 
 
-    def create_xhs_client(self, httpx_proxy: Optional[str]) -> XiaoHongShuClient:
-        """Create xhs client"""
-        utils.logger.info("[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ...")
-        cookie_str, cookie_dict = utils.convert_cookies(self.browser.cookies())
-        xhs_client_obj = XiaoHongShuClient(
-            proxies=httpx_proxy,
-            headers={
-                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
-                "Cookie": cookie_str,
-                "Origin": "https://www.xiaohongshu.com",
-                "Referer": "https://www.xiaohongshu.com",
-                "Content-Type": "application/json;charset=UTF-8"
-            },
-            playwright_page=self.page,
-            cookie_dict=cookie_dict,
-        )
-        return xhs_client_obj
+    # def create_xhs_client(self, httpx_proxy: Optional[str]) -> XiaoHongShuClient:
+    #     """Create xhs client"""
+    #     utils.logger.info("[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ...")
+    #     cookie_str, cookie_dict = utils.convert_cookies(self.browser.cookies())
+    #     xhs_client_obj = XiaoHongShuClient(
+    #         proxies=httpx_proxy,
+    #         headers={
+    #             "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
+    #             "Cookie": cookie_str,
+    #             "Origin": "https://www.xiaohongshu.com",
+    #             "Referer": "https://www.xiaohongshu.com",
+    #             "Content-Type": "application/json;charset=UTF-8"
+    #         },
+    #         playwright_page=self.page,
+    #         rotate_ident=self.rotate_ident,
+    #         cookie_dict=cookie_dict,
+    #     )
+    #     return xhs_client_obj
 
 
     def login(self):
     def login(self):
         self.__init_browser__()
         self.__init_browser__()
@@ -64,14 +73,19 @@ class XhsBrowser(BaseBrowser):
         :return:
         :return:
         """
         """
         self.__init_browser__()
         self.__init_browser__()
-        if not self.xhs_client.pong():
-            return huitun_notes
         for huitun_note in huitun_notes:
         for huitun_note in huitun_notes:
             try:
             try:
                 note_id = huitun_note.get('noteId')
                 note_id = huitun_note.get('noteId')
-                note_info = self.xhs_client.get_note_by_id(note_id=note_id)
+                # note_info = self.xhs_client.get_note_by_id(note_id=note_id)
+                note_info = self.get_note(note_id=note_id)
                 huitun_note['authorInfo'] = note_info.get('user')
                 huitun_note['authorInfo'] = note_info.get('user')
                 huitun_note['imageList'] = [img.get('url_default') for img in note_info.get('image_list')]
                 huitun_note['imageList'] = [img.get('url_default') for img in note_info.get('image_list')]
                 sleep(2)
                 sleep(2)
             except Exception as e:
             except Exception as e:
                 utils.logger.error(f"爬取小红书异常 {e}")
                 utils.logger.error(f"爬取小红书异常 {e}")
+
+    def get_note(self, note_id: str):
+        # note = self.xhs_client.get_note_by_id(note_id=note_id)
+        self.page.goto(f'{XHS_URL}/explore/{note_id}')
+        data = self.page.evaluate('noteId => window.__INITIAL_STATE__ && JSON.stringify(window.__INITIAL_STATE__.note.noteDetailMap[noteId].note)', note_id)
+        return json.loads(data)

+ 59 - 6
xhs/client.py

@@ -1,10 +1,13 @@
 import asyncio
 import asyncio
 import json
 import json
+import logging
 import re
 import re
+import time
 from typing import Any, Callable, Dict, List, Optional, Union
 from typing import Any, Callable, Dict, List, Optional, Union
 from urllib.parse import urlencode
 from urllib.parse import urlencode
 
 
 import httpx
 import httpx
+import requests
 from playwright.async_api import BrowserContext, Page
 from playwright.async_api import BrowserContext, Page
 
 
 # import config
 # import config
@@ -12,7 +15,8 @@ from tools import utils
 
 
 # from .exception import DataFetchError, IPBlockError
 # from .exception import DataFetchError, IPBlockError
 from .field import SearchNoteType, SearchSortType
 from .field import SearchNoteType, SearchSortType
-from .help import get_search_id, sign
+from .help import get_search_id, sign, Des, GenerateCurve
+from .rotate_ident import invoke_ident_api
 
 
 
 
 class XiaoHongShuClient:
 class XiaoHongShuClient:
@@ -23,6 +27,7 @@ class XiaoHongShuClient:
             *,
             *,
             headers: Dict[str, str],
             headers: Dict[str, str],
             playwright_page: Page,
             playwright_page: Page,
+            rotate_ident,
             cookie_dict: Dict[str, str],
             cookie_dict: Dict[str, str],
     ):
     ):
         self.xsec_token = None
         self.xsec_token = None
@@ -37,6 +42,8 @@ class XiaoHongShuClient:
         self.NOTE_ABNORMAL_CODE = -510001
         self.NOTE_ABNORMAL_CODE = -510001
         self.playwright_page = playwright_page
         self.playwright_page = playwright_page
         self.cookie_dict = cookie_dict
         self.cookie_dict = cookie_dict
+        self.des = Des()
+        self.rotate_ident = rotate_ident
 
 
     def _pre_headers(self, url: str, data=None) -> Dict:
     def _pre_headers(self, url: str, data=None) -> Dict:
         """
         """
@@ -66,12 +73,13 @@ class XiaoHongShuClient:
         self.headers.update(headers)
         self.headers.update(headers)
         return self.headers
         return self.headers
 
 
-    def request(self, method, url, **kwargs) -> Union[str, Any]:
+    def request(self, method, url, need_check=True, **kwargs) -> Union[str, Any]:
         """
         """
         封装httpx的公共请求方法,对请求响应做一些处理
         封装httpx的公共请求方法,对请求响应做一些处理
         Args:
         Args:
             method: 请求方法
             method: 请求方法
             url: 请求的URL
             url: 请求的URL
+            need_check: need check 461
             **kwargs: 其他请求参数,例如请求头、请求体等
             **kwargs: 其他请求参数,例如请求头、请求体等
 
 
         Returns:
         Returns:
@@ -89,8 +97,8 @@ class XiaoHongShuClient:
         if return_response:
         if return_response:
             return response.text
             return response.text
 
 
-        if response.status_code == 461:
-            self.update_xsec_token()
+        if response.status_code == 461 and need_check:
+            self.verify()
         data: Dict = response.json()
         data: Dict = response.json()
         if data["success"]:
         if data["success"]:
             return data.get("data", data.get("success", {}))
             return data.get("data", data.get("success", {}))
@@ -116,7 +124,7 @@ class XiaoHongShuClient:
         headers = self._pre_headers(final_uri)
         headers = self._pre_headers(final_uri)
         return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
         return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
 
 
-    def post(self, uri: str, data: dict) -> Dict:
+    def post(self, uri: str, data: dict, need_check=True) -> Dict:
         """
         """
         POST请求,对请求头签名
         POST请求,对请求头签名
         Args:
         Args:
@@ -128,7 +136,7 @@ class XiaoHongShuClient:
         """
         """
         headers = self._pre_headers(uri, data)
         headers = self._pre_headers(uri, data)
         json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
         json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-        return self.request(method="POST", url=f"{self._host}{uri}",
+        return self.request(method="POST", url=f"{self._host}{uri}", need_check=need_check,
                             data=json_str, headers=headers)
                             data=json_str, headers=headers)
 
 
     def update_xsec_token(self):
     def update_xsec_token(self):
@@ -437,3 +445,48 @@ class XiaoHongShuClient:
             await asyncio.sleep(crawl_interval)
             await asyncio.sleep(crawl_interval)
             result.extend(notes)
             result.extend(notes)
         return result
         return result
+
+    def verify(self):
+        image = self.get_image()
+        self.check(image)
+
+    def get_image(self):
+        json_data = {
+            'secretId': '000',
+            'verifyType': '102',
+            'verifyUuid': '',
+            'verifyBiz': '461',
+            'sourceSite': '',
+        }
+        response = self.post('/api/redcaptcha/v2/captcha/register',
+                             need_check=False, data=json_data)
+        logging.info(f"get image:{response}")
+        captchaInfo = response["captchaInfo"]
+        self.rid = response["rid"]
+        image_Info = self.des.decrypt("captchaInfo", captchaInfo)
+        captchaUrl = json.loads(image_Info)["captchaUrl"]
+        logging.info('captchaUrl:' + captchaUrl)
+        return captchaUrl
+
+    def check(self, img_url:str):
+        img = self.rotate_ident.do_download_img(img_url)
+        response = invoke_ident_api(img)
+        angle = int(str(response['data']['res_str']).replace('顺时针旋转', '').replace('度', ''))
+        rate = angle / 360
+        distance = int(rate * 227)
+        gen_track = GenerateCurve([0, 0], [distance, 2], [], int(rate * 150)).main()
+        track = self.des.encrypt("track", json.dumps(gen_track, separators=(",", ":")))
+        mouseEnd = self.des.encrypt("mouseEnd", str(distance))
+        time_ = self.des.encrypt("time", str(gen_track[-1][-1] + 199))
+        # track = 'P/h0WtKGfU29TgYTjGjG0SIRuELz+YlGj5wZhyl7cM+TXhklOaVpyVwuJrxtRQt8Y0t70fBllPiJYUJZq9XVkPO75tSfvK/mpSkEUXImjF+CnyPZBsAyiUNDFOYPMxF21DU7qp5ZoZMIcAD+Wm5M18s1ctGnk1jK1RugqdwJB412H2H18XTdygGmddBt3KreAoWark9jiba6IjEn5ZLssMlBCn9fRZPwdWNqjre4dGlscChV8wuwXAxz4hIYRI+VgPnX1hossQeX0TBfk0M4f8hFRxZm35d47lwfaqRtYbPsUM5/G8471ViSwStrg+WpckZNBfWs/1cg/wBBJKvss+su/oIF7+NpxMaqryLQW7MSz/F5ejfR8FQCm4/Sp+6tmFf65sEuXAmaHkGYNp5CoabW1AGBW4t4gfB7QLI5PIYZMRRvfsGdGJ1khSEqgiYIBVo3645clzayHwEhpaV4sDu/p6HryP2FcAIw2FL80Q2mWdSiP9ZBSufS+4eAcvz5aIWYKcFRW5wy1QfABc86r9XIlS8kGyQJcE3UPer5JigDqitX19C1FDniYkGaDxanIi1ob3EC2C7BF6pDBUnbaSOhL/8DtzbyfPMI5yAVMcD6ZWPpzDFXbvupYGsjZQP40dwPYdZhtwbTX8ED4FZ5gW7bvHa9AJmADaieOhR6WUKb+MF5NqcNxsPJKc4rJCJwS0DX3DBW9gjL3Zi+tjqvp0RF+ge0On2tLedMgtWi+wFa671XlOakCpD7yYX+CktvhPrTz/Jp8vxVuRaKIX32aPLU3S7xCLJtSiQ2f7xm/efxrUU0h7IjHVlIQLz489UJ26IyKZnzVntmte1UoU9tjZ0WzVSyEEx0l/F0LnN9ZJvbPdMKFWWekZrdHns2/vytsecoqJUibkdpE4h0Xet3NmOkoNzsApY7BDCx1BRnYd5YqWCsh5J594qxTZXca9RcySZlND1s+XCOKskO0ob2EHp+VkLzaHI5gCJGGcJUCSLlc+xxePCIbrjCd4uK2tuCUgLiW9GsZd8MBcQHjJkiA0CUr8CBjHN0asxelR5WZjOcN/cYK2DsoLtHiYjSa2ivbDr5/czi2+x6e8om9MXBDQ/+YN7NWe6e/gznqnkVLfdR5Jpf/vBBxtx08ctETmYtT80BXyv2QZFrSjSxOIvEiluML8mVqvAKSTYh8y4vsYMHEcT4jF0plsn+LOGvzSjhSe9TCvgQ/fyIJ/Z0cJwh0OUkXP5lYqHvvZLgrUKEp6B6riSP4PbVUieUB3PGaXG9e2lV8TR9afre/L8nQolNLc+p08nb8xbqNOIRyrGhSCh68wNpkbfBUUpwy3YkLkkEBHqPYSE/cp4FFAVbSg4/AA=='
+        json_data = {
+            'rid': self.rid,
+            'verifyType': 102,
+            'verifyBiz': '461',
+            'verifyUuid': '',
+            'sourceSite': '',
+             'captchaInfo': '{"mouseEnd":"' + mouseEnd + '","time":"' + time_ + '","track":"' + track + '","width":"w1qZrykOUIU="}'
+        }
+        response = self.post('/api/redcaptcha/v2/captcha/check', need_check=False,
+                             data=json_data)
+        logging.info(f'check:{response}')

+ 113 - 0
xhs/help.py

@@ -1,9 +1,14 @@
+import base64
 import ctypes
 import ctypes
 import json
 import json
+import math
 import random
 import random
 import time
 import time
 import urllib.parse
 import urllib.parse
 
 
+from Crypto.Cipher import DES
+from matplotlib import pyplot as plt
+
 
 
 def sign(a1="", b1="", x_s="", x_t=""):
 def sign(a1="", b1="", x_s="", x_t=""):
     """
     """
@@ -277,6 +282,114 @@ def get_trace_id(img_url: str):
     return f"spectrum/{img_url.split('/')[-1]}" if img_url.find("spectrum") != -1 else img_url.split("/")[-1]
     return f"spectrum/{img_url.split('/')[-1]}" if img_url.find("spectrum") != -1 else img_url.split("/")[-1]
 
 
 
 
+BS = DES.block_size
+
+
+class Des:
+    key_map = {"mouseEnd": 'WquqhEkd', "track": 'PYrm8rMk', "time": 'vPMvCY4K', "captchaInfo": '76a2171c'}
+
+    @staticmethod
+    def pad(s):
+        return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
+
+    @staticmethod
+    def unpad(s):
+        return s[0:-ord(s[-1])]
+
+    def encrypt(self, key, text):
+        text = self.pad(text)
+        cryptor = DES.new(self.key_map[key].encode(), DES.MODE_ECB)
+        x = len(text) % 8
+        if x != 0:
+            text = text + '\0' * (8 - x)
+        ciphertext = cryptor.encrypt(text.encode())
+        return base64.standard_b64encode(ciphertext).decode("utf-8")
+
+    def decrypt(self, key, text):
+        cryptor = DES.new(self.key_map[key].encode(), DES.MODE_ECB)
+        de_text = base64.standard_b64decode(text)
+        plain_text = cryptor.decrypt(de_text)
+        st = str(plain_text.decode("utf-8")).rstrip('\0')
+        out = self.unpad(st)
+        return out
+
+
+class GenerateCurve:
+    '''根据两点坐标确定一条被瑟尔曲线'''
+
+    def __init__(self, point0, point1, control_point=[], point_nums=random.randint(0, 3), debug=False):
+        '''
+        :param point0: 起点
+        :param point1: 终点
+        :param control_point: 控制点
+        :param point_nums: 生成曲线坐标点的数量.数量越多图越凹凸不平,越少越平滑
+        '''
+        self.point0 = point0
+        self.point1 = point1
+        self.control_point = control_point
+        self.point_nums = point_nums
+        self.debug = debug
+
+    def getBezierPoints(self):
+        '''
+        :return:
+        '''
+        if not self.point_nums:
+            self.point_nums = random.randint(1, 6)
+        pointList = []
+        x1, y1 = int(self.point0[0]), int(self.point0[1])
+        x2, y2 = int(self.point1[0]), int(self.point1[1])
+        cx, cy = int(self.control_point[0]), int(self.control_point[1])
+        tm = 0
+        for i in range(self.point_nums + 1):
+            t = i / self.point_nums
+            x = math.pow(1 - t, 2) * x1 + 2 * t * (1 - t) * cx + math.pow(t, 2) * x2
+            y = math.pow(1 - t, 2) * y1 + 2 * t * (1 - t) * cy + math.pow(t, 2) * y2
+            tm += random.randint(15, 30)
+            pointList.append([int(x), int(y), int(tm)])
+        return pointList
+
+    def getControlPoint(self):
+        '''
+        :return: 控制点
+        '''
+        if self.control_point:
+            return self.control_point
+
+        x0, y0 = int(self.point0[0]), int(self.point0[1])
+        x1, y1 = int(self.point1[0]), int(self.point1[1])
+
+        abs_x = abs(x0 - x1) / 2  # 两点横坐标相减绝对值/2
+        abs_y = abs(y0 - y1) / 2  # 两点横坐标相减绝对值/2
+        # print(abs_y)
+        ran_x = random.randint(0, int(abs_x))  # x取随机差值
+        ran_y = random.randint(0, int(abs_y))  # y取随机差值
+
+        # print(ran_x, ran_y)
+        self.control_point.append((x0 + x1) / 2 + random.choice([-ran_x, +ran_x]))
+        self.control_point.append((y0 + y1) / 2 + random.choice([-ran_y, +ran_y]))
+
+    def showRoute(self, pointList):
+        '''
+        展示曲线走势
+        :return:
+        '''
+        pass
+        _xx = []
+        _yy = []
+        for p in pointList:
+            _xx.append(p[0])
+            _yy.append(p[1])
+        plt.plot(_xx, _yy, 'b-')
+        plt.show()
+
+    def main(self):
+        self.getControlPoint()
+        pointList = self.getBezierPoints()
+        if self.debug == True:
+            self.showRoute(pointList)
+        return pointList
+
 if __name__ == '__main__':
 if __name__ == '__main__':
     _img_url = "https://sns-img-bd.xhscdn.com/7a3abfaf-90c1-a828-5de7-022c80b92aa3"
     _img_url = "https://sns-img-bd.xhscdn.com/7a3abfaf-90c1-a828-5de7-022c80b92aa3"
     # 获取一个图片地址在多个cdn下的url地址
     # 获取一个图片地址在多个cdn下的url地址

+ 6 - 1
xhs/rotate_ident.py

@@ -64,6 +64,7 @@ def invoke_ident_api(img):
     return response.json()
     return response.json()
 
 
 
 
+
 class RotateIdent:
 class RotateIdent:
     """
     """
     旋转验证码处理类
     旋转验证码处理类
@@ -137,7 +138,11 @@ class RotateIdent:
             "TE": "Trailers",
             "TE": "Trailers",
         }
         }
         # 下载图片
         # 下载图片
-        response = requests.get(url=img_url, headers=header, timeout=20)
+        return self.do_download_img(img_url)
+
+    def do_download_img(self, img_url):
+        # 下载图片
+        response = requests.get(url=img_url, timeout=20)
         img = Image.open(BytesIO(response.content))
         img = Image.open(BytesIO(response.content))
         img_folder = os.path.join(root_dir, 'train_img')
         img_folder = os.path.join(root_dir, 'train_img')
         # 如果目标文件夹不存在,则创建
         # 如果目标文件夹不存在,则创建