Forráskód Böngészése

feat: 小红书rpa

wuwenyi 7 hónapja
szülő
commit
249d23486d
18 módosított fájl, 1674 hozzáadás és 147 törlés
  1. 5 38
      api/__init__.py
  2. 16 3
      api/login.py
  3. 8 1
      api/search.py
  4. 2 2
      app.py
  5. 57 0
      browser/__init__.py
  6. 39 103
      huitun/__init__.py
  7. 0 0
      tools/__init__.py
  8. 135 0
      tools/crawler_util.py
  9. 164 0
      tools/slider_util.py
  10. 106 0
      tools/time_util.py
  11. 31 0
      tools/utils.py
  12. 77 0
      util/lock_util.py
  13. 12 0
      util/playwright_util.py
  14. 77 0
      xhs/__init__.py
  15. 437 0
      xhs/client.py
  16. 72 0
      xhs/field.py
  17. 287 0
      xhs/help.py
  18. 149 0
      xhs/rotate_ident.py

+ 5 - 38
api/__init__.py

@@ -1,10 +1,8 @@
 """
 通用api
 """
-import os
 import json
-import logging
-import time
+from flask import jsonify
 
 SUCCESS_RESPONSE = json.dumps({
     "code": 1,
@@ -12,37 +10,6 @@ SUCCESS_RESPONSE = json.dumps({
     "success": True,
 }, ensure_ascii=False)
 
-phones = set()
-directory = "./.data/huitun"
-if not os.path.exists(directory):
-    os.makedirs(directory)
-for entry in os.listdir(directory):
-    # 构建完整的路径
-    full_path = os.path.join(directory, entry)
-    # 检查是否是文件夹
-    if os.path.isdir(full_path):
-        # 如果是文件夹,将文件夹名称添加到集合中
-        phones.add(entry)
-
-print("已存在的账号:", phones)
-
-
-def contain_browser(phone):
-    return phone in phones
-
-
-def get_idle_phone():
-    from huitun import lock_manager
-    while True:
-        for phone in phones:
-            if not lock_manager.is_locked(phone):
-                return phone
-        time.sleep(1)
-
-
-def add_phone(phone):
-    phones.add(phone)
-
 
 class BusinessException(Exception):
     """
@@ -63,11 +30,11 @@ def fail_response(msg: str):
     """
     请求失败
     """
-    return json.dumps({
+    return jsonify({
         "code": 0,
         "msg": msg,
         "success": False,
-    }, ensure_ascii=False)
+    })
 
 
 def assert_not_none(data, msg):
@@ -81,9 +48,9 @@ def assert_not_none(data, msg):
 def success(data=None):
     if data is None:
         return SUCCESS_RESPONSE
-    return json.dumps({
+    return jsonify({
         "code": 1,
         "msg": "请求成功",
         "data": data,
         "success": True,
-    }, ensure_ascii=False)
+    })

+ 16 - 3
api/login.py

@@ -7,17 +7,30 @@ from flask import request
 
 import api
 import huitun
+import xhs
 
 login_opt = Blueprint('login', __name__)
 
-@login_opt.route('/login', methods=["POST"])
-def login():
+
+@login_opt.route('/huitun', methods=["POST"])
+def huitun_login():
     """
     登录接口
-    :return: 1-登录成功 2-需要验证码
     """
     request_body = request.json
     phone = request_body.get('phone')
     browser = huitun.HuiTunBrowser(phone)
     login_result = browser.login(request_body.get('password'))
     return api.success(login_result)
+
+
+@login_opt.route('/xhs', methods=["POST"])
+def xhs_loigin():
+    """
+    登录接口
+    """
+    request_body = request.json
+    phone = request_body.get('phone')
+    browser = xhs.XhsBrowser(phone)
+    browser.login()
+    return api.success()

+ 8 - 1
api/search.py

@@ -6,6 +6,9 @@ from flask import request
 
 import api
 from huitun import HuiTunBrowser
+from util import lock_util
+from xhs import XhsBrowser
+from playwright.sync_api import Playwright, sync_playwright
 
 search_opt = Blueprint('search', __name__)
 
@@ -16,6 +19,10 @@ def search_note():
     :return:
     """
     request_body = request.json
-    browser = HuiTunBrowser(api.get_idle_phone())
+    playwright = sync_playwright().start()
+    browser = HuiTunBrowser(lock_util.get_idle_phone('huitun'), playwright)
     result = browser.search_note(request_body.get('tagName'), request_body.get('searchLimit'))
+    xhs_browser = XhsBrowser(lock_util.get_idle_phone('xhs'), playwright)
+    xhs_browser.polish_huitun_note(result)
+    playwright.stop()
     return api.success(result)

+ 2 - 2
app.py

@@ -61,8 +61,8 @@ def log_response(response):
     打印返回
     """
     data = response.get_data(as_text=True)
-    if len(data) > 1000:
-        logging.info('Response Body: %s', data[:1000] + "...")
+    if len(data) > 300:
+        logging.info('Response Body: %s', data[:300] + "...")
     else:
         logging.info('Response Body: %s', data)
     return response

+ 57 - 0
browser/__init__.py

@@ -0,0 +1,57 @@
+"""
+
+"""
+from abc import abstractmethod
+
+import api
+from playwright.sync_api import Playwright, sync_playwright
+
+
+class BaseBrowser:
+    def __init__(self, phone: str, playwright=None):
+        api.assert_not_none(phone, "手机号不能为空")
+        self.phone = phone
+        self.browser = None
+        self.page = None
+        self.result = None
+        self.list_result = []
+        self.has_more = False
+        self.playwright = playwright
+
+    def __init_browser__(self):
+        self.sure_playwright()
+        self.browser = self.playwright.chromium.launch_persistent_context(
+            proxy=None,
+            user_data_dir=f'./.data/{self.__get_name__()}/{self.phone}',
+            headless=False,
+            slow_mo=1000,
+            channel="chrome",
+            ignore_https_errors=True,
+            user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
+            args=[
+                '--disable-blink-features=AutomationControlled',
+                '--incognito',
+                '--ignore-certificate-errors-spki-list',
+                '--disable-web-security',  # 禁用 Web 安全性,类似于 ChromeOptions 中的 --ignore-certificate-errors-spki-list
+                '--no-sandbox',  # 禁用沙盒模式
+                '--disable-dev-shm-usage',  # 禁用/dev/shm使用
+                '--disable-features=site-per-process',  # 禁用每个站点的进程,类似于 ChromeOptions 中的 --no-sandbox
+                '--ignore-certificate-errors',  # 忽略证书错误
+                '--disable-features=AutomationControlled'  # 禁用与自动化相关的特性
+            ])
+        self.browser.add_init_script(path="./stealth.min.js")
+        self.page = self.browser.new_page()
+
+    def sure_playwright(self):
+        if self.playwright is None:
+            self.playwright = sync_playwright().start()
+
+    def close(self):
+        if self.page is not None:
+            self.page.close()
+        if self.browser is not None:
+            self.browser.close()
+
+    @abstractmethod
+    def __get_name__(self):
+        pass

+ 39 - 103
huitun/__init__.py

@@ -2,130 +2,63 @@
 
 """
 import logging
-import threading
 
 from playwright.sync_api import sync_playwright, Page, Playwright
 
 import api
+from browser import BaseBrowser
+from util.lock_util import LockManager
+from util.playwright_util import is_element_present
 
 HUITUN_URL = 'https://xhs.huitun.com/'
-
-
-def is_element_present(page, selector):
-    try:
-        page.wait_for_selector(selector, timeout=2000)
-        return True
-    except Exception:
-        return False
-
-
-class LockManager():
-    """
-    全局锁管理,每个手机号只能打开一个上下文相同的浏览器
-    """
-
-    def __init__(self):
-        self.locks = {}
-
-    def acquire_lock(self, key):
-        if key not in self.locks:
-            self.locks[key] = threading.Lock()
-        acquire = self.locks[key].acquire(timeout=300)
-        if acquire:
-            logging.info(f"{key} 获取锁成功")
-
-    def release_lock(self, key):
-        if key in self.locks:
-            self.locks[key].release()
-            logging.info(f"{key} 释放锁成功")
-
-    def is_locked(self, key):
-        """
-        检查给定的键是否处于锁定状态
-        """
-        if key in self.locks:
-            return self.locks[key].locked()
-        else:
-            return False
-
-
 lock_manager = LockManager()
 
 
-class HuiTunBrowser:
-    def __init__(self, phone: str):
-        api.assert_not_none(phone, "手机号不能为空")
-        self.phone = phone
-        self.browser = None
-        self.page = None
-        self.result = None
-        self.list_result = []
-        self.has_more = False
-
-    def __init_browser__(self, playwright: Playwright):
-        self.browser = playwright.chromium.launch_persistent_context(
-            user_data_dir=f'./.data/huitun/{self.phone}',
-            headless=False,
-            slow_mo=1000,
-            channel="chrome",
-            ignore_https_errors=True,
-            args=[
-                '--disable-blink-features=AutomationControlled',
-                '--incognito',
-                '--ignore-certificate-errors-spki-list',
-                '--disable-web-security',  # 禁用 Web 安全性,类似于 ChromeOptions 中的 --ignore-certificate-errors-spki-list
-                '--no-sandbox',  # 禁用沙盒模式
-                '--disable-dev-shm-usage',  # 禁用/dev/shm使用
-                '--disable-features=site-per-process',  # 禁用每个站点的进程,类似于 ChromeOptions 中的 --no-sandbox
-                '--ignore-certificate-errors',  # 忽略证书错误
-                '--disable-features=AutomationControlled'  # 禁用与自动化相关的特性
-            ])
-        self.browser.add_init_script(path="./stealth.min.js")
-        self.page = self.browser.new_page()
-
-    def close(self):
-        if self.browser is not None:
-            self.browser.close()
-        if self.page is not None:
-            self.page.close()
+class HuiTunBrowser(BaseBrowser):
+    def __get_name__(self):
+        return 'huitun'
 
     def login(self, password: str):
         """
         登录抖音,一个登录之后,全部的页面都有了登录状态
         :return: 2- 需要验证码 1-登录成功
         """
-        with sync_playwright() as playwright:
-            self.__init_browser__(playwright)
-            self.page.goto(HUITUN_URL)
-            if is_element_present(self.page, '.ant-modal-body'):
-                if not is_element_present(self.page, 'text=密码登录'):
-                    pwd_login = self.page.query_selector('.b9dOaTo9gfF3wLAi7jlXTg\=\=')
-                    if pwd_login is not None:
-                        pwd_login.click()
-                self.page.get_by_placeholder('请输入手机号').type(self.phone)
-                self.page.get_by_placeholder('6-15位数字与字母组合').type(password)
-                self.page.get_by_text('登 录', exact=True).click()
-                self.page.wait_for_timeout(30_000)
+        self.__init_browser__()
+        self.page.goto(HUITUN_URL)
+        if is_element_present(self.page, '.ant-modal-body'):
+            if not is_element_present(self.page, 'text=密码登录'):
+                pwd_login = self.page.query_selector('.b9dOaTo9gfF3wLAi7jlXTg\=\=')
+                if pwd_login is not None:
+                    pwd_login.click()
+            self.page.get_by_placeholder('请输入手机号').type(self.phone)
+            self.page.get_by_placeholder('6-15位数字与字母组合').type(password)
+            self.page.get_by_text('登 录', exact=True).click()
+            self.page.wait_for_timeout(30_000)
+        self.close()
 
     def search_note(self, tag_name: str, size: int):
         lock_manager.acquire_lock(self.phone)
         try:
-            with sync_playwright() as playwright:
-                self.__init_browser__(playwright)
-                self.list_result = []
-                api.assert_not_none(tag_name, "标签不能为空")
-                self.page.on('response', self.search_note_handler)
-                self.page.goto('https://xhs.huitun.com/#/note/note_search')
-                self.page.wait_for_timeout(3000)
-                while size is None or len(self.list_result) < size:
-                    logging.info('继续搜索用户主页')
-                    self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
-                    self.page.wait_for_timeout(2000)
-                    logging.info('搜索用户主页图文结果数:%s', len(self.list_result))
-                self.close()
-                return self.list_result
+            self.__init_browser__()
+            self.list_result = []
+            api.assert_not_none(tag_name, "标签不能为空")
+            self.page.goto('https://xhs.huitun.com/#/note/note_search')
+            # 展开全部标签
+            self.page.query_selector('.RaWdmGo9iaS1-bQ6mK5K4w\=\=').click()
+            self.page.get_by_text(tag_name, exact=True).click()
+            self.page.get_by_text('图文笔记', exact=True).click()
+            self.page.wait_for_timeout(500)
+            self.page.on('response', self.search_note_handler)
+            self.page.get_by_text('近3天', exact=True).click()
+            while size is None or len(self.list_result) < size:
+                logging.info('继续搜索用户主页')
+                self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
+                self.page.wait_for_timeout(2000)
+                logging.info('搜索用户主页图文结果数:%s', len(self.list_result))
+            return self.list_result
         finally:
             lock_manager.release_lock(self.phone)
+            self.close()
 
     def search_note_handler(self, response):
         """
@@ -137,7 +70,10 @@ class HuiTunBrowser:
             response_body = response.json()
             if response_body.get('status') == 0:
                 note_list = response_body.get('extData').get('list')
+                self.has_more = len(note_list) > 0
                 if len(self.list_result) == 0:
                     self.list_result = note_list
                 else:
                     self.list_result.extend(note_list)
+            else:
+                self.has_more = False

+ 0 - 0
tools/__init__.py


+ 135 - 0
tools/crawler_util.py

@@ -0,0 +1,135 @@
+# -*- coding: utf-8 -*-
+# @Author  : relakkes@gmail.com
+# @Time    : 2023/12/2 12:53
+# @Desc    : 爬虫相关的工具函数
+
+import base64
+import random
+import re
+from io import BytesIO
+from typing import Dict, List, Optional, Tuple
+
+import httpx
+from PIL import Image, ImageDraw
+from playwright.async_api import Cookie, Page
+
+from . import utils
+
+
+async def find_login_qrcode(page: Page, selector: str) -> str:
+    """find login qrcode image from target selector"""
+    try:
+        elements = await page.wait_for_selector(
+            selector=selector,
+        )
+        login_qrcode_img = str(await elements.get_property("src"))  # type: ignore
+        if "http://" in login_qrcode_img or "https://" in login_qrcode_img:
+            async with httpx.AsyncClient(follow_redirects=True) as client:
+                utils.logger.info(f"[find_login_qrcode] get qrcode by url:{login_qrcode_img}")
+                resp = await client.get(login_qrcode_img, headers={"User-Agent": get_user_agent()})
+                if resp.status_code == 200:
+                    image_data = resp.content
+                    base64_image = base64.b64encode(image_data).decode('utf-8')
+                    return base64_image
+                raise Exception(f"fetch login image url failed, response message:{resp.text}")
+        return login_qrcode_img
+
+    except Exception as e:
+        print(e)
+        return ""
+
+
+def show_qrcode(qr_code) -> None:  # type: ignore
+    """parse base64 encode qrcode image and show it"""
+    if "," in qr_code:
+        qr_code = qr_code.split(",")[1]
+    qr_code = base64.b64decode(qr_code)
+    image = Image.open(BytesIO(qr_code))
+
+    # Add a square border around the QR code and display it within the border to improve scanning accuracy.
+    width, height = image.size
+    new_image = Image.new('RGB', (width + 20, height + 20), color=(255, 255, 255))
+    new_image.paste(image, (10, 10))
+    draw = ImageDraw.Draw(new_image)
+    draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1)
+    new_image.show()
+
+
+def get_user_agent() -> str:
+    ua_list = [
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36",
+        "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36",
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.5112.79 Safari/537.36",
+        "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.5060.53 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.4844.84 Safari/537.36",
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5112.79 Safari/537.36",
+        "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.5060.53 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.4844.84 Safari/537.36",
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5112.79 Safari/537.36"
+    ]
+    return random.choice(ua_list)
+
+
+def get_mobile_user_agent() -> str:
+    ua_list = [
+        "Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1",
+        "Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Mobile/15E148 Safari/604.1",
+        "Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.99 Mobile/15E148 Safari/604.1",
+        "Mozilla/5.0 (iPad; CPU OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/114.0.5735.124 Mobile/15E148 Safari/604.1",
+        "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
+        "Mozilla/5.0 (Linux; Android 13; SAMSUNG SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/21.0 Chrome/110.0.5481.154 Mobile Safari/537.36",
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 OPR/99.0.0.0",
+        "Mozilla/5.0 (Linux; Android 10; JNY-LX1; HMSCore 6.11.0.302) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.88 HuaweiBrowser/13.0.5.303 Mobile Safari/537.36"
+    ]
+    return random.choice(ua_list)
+
+
+def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]:
+    if not cookies:
+        return "", {}
+    cookies_str = ";".join([f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies])
+    cookie_dict = dict()
+    for cookie in cookies:
+        cookie_dict[cookie.get('name')] = cookie.get('value')
+    return cookies_str, cookie_dict
+
+
+def convert_str_cookie_to_dict(cookie_str: str) -> Dict:
+    cookie_dict: Dict[str, str] = dict()
+    if not cookie_str:
+        return cookie_dict
+    for cookie in cookie_str.split(";"):
+        cookie = cookie.strip()
+        if not cookie:
+            continue
+        cookie_list = cookie.split("=")
+        if len(cookie_list) != 2:
+            continue
+        cookie_value = cookie_list[1]
+        if isinstance(cookie_value, list):
+            cookie_value = "".join(cookie_value)
+        cookie_dict[cookie_list[0]] = cookie_value
+    return cookie_dict
+
+
+def match_interact_info_count(count_str: str) -> int:
+    if not count_str:
+        return 0
+
+    match = re.search(r'\d+', count_str)
+    if match:
+        number = match.group()
+        return int(number)
+    else:
+        return 0

+ 164 - 0
tools/slider_util.py

@@ -0,0 +1,164 @@
+# -*- coding: utf-8 -*-
+# @Author  : relakkes@gmail.com
+# @Time    : 2023/12/2 12:55
+# @Desc    : 滑块相关的工具包
+import os
+from typing import List
+from urllib.parse import urlparse
+
+import cv2
+import httpx
+import numpy as np
+
+
+class Slide:
+    """
+    copy from https://blog.csdn.net/weixin_43582101 thanks for author
+    update: relakkes
+    """
+    def __init__(self, gap, bg, gap_size=None, bg_size=None, out=None):
+        """
+        :param gap: 缺口图片链接或者url
+        :param bg: 带缺口的图片链接或者url
+        """
+        self.img_dir = os.path.join(os.getcwd(), 'temp_image')
+        if not os.path.exists(self.img_dir):
+            os.makedirs(self.img_dir)
+
+        bg_resize = bg_size if bg_size else (340, 212)
+        gap_size = gap_size if gap_size else (68, 68)
+        self.bg = self.check_is_img_path(bg, 'bg', resize=bg_resize)
+        self.gap = self.check_is_img_path(gap, 'gap', resize=gap_size)
+        self.out = out if out else os.path.join(self.img_dir, 'out.jpg')
+
+    @staticmethod
+    def check_is_img_path(img, img_type, resize):
+        if img.startswith('http'):
+            headers = {
+                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;"
+                          "q=0.8,application/signed-exchange;v=b3;q=0.9",
+                "Accept-Encoding": "gzip, deflate, br",
+                "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7,ja;q=0.6",
+                "AbstractCache-Control": "max-age=0",
+                "Connection": "keep-alive",
+                "Host": urlparse(img).hostname,
+                "Upgrade-Insecure-Requests": "1",
+                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
+                              "Chrome/91.0.4472.164 Safari/537.36",
+            }
+            img_res = httpx.get(img, headers=headers)
+            if img_res.status_code == 200:
+                img_path = f'./temp_image/{img_type}.jpg'
+                image = np.asarray(bytearray(img_res.content), dtype="uint8")
+                image = cv2.imdecode(image, cv2.IMREAD_COLOR)
+                if resize:
+                    image = cv2.resize(image, dsize=resize)
+                cv2.imwrite(img_path, image)
+                return img_path
+            else:
+                raise Exception(f"保存{img_type}图片失败")
+        else:
+            return img
+
+    @staticmethod
+    def clear_white(img):
+        """清除图片的空白区域,这里主要清除滑块的空白"""
+        img = cv2.imread(img)
+        rows, cols, channel = img.shape
+        min_x = 255
+        min_y = 255
+        max_x = 0
+        max_y = 0
+        for x in range(1, rows):
+            for y in range(1, cols):
+                t = set(img[x, y])
+                if len(t) >= 2:
+                    if x <= min_x:
+                        min_x = x
+                    elif x >= max_x:
+                        max_x = x
+
+                    if y <= min_y:
+                        min_y = y
+                    elif y >= max_y:
+                        max_y = y
+        img1 = img[min_x:max_x, min_y: max_y]
+        return img1
+
+    def template_match(self, tpl, target):
+        th, tw = tpl.shape[:2]
+        result = cv2.matchTemplate(target, tpl, cv2.TM_CCOEFF_NORMED)
+        # 寻找矩阵(一维数组当作向量,用Mat定义) 中最小值和最大值的位置
+        min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
+        tl = max_loc
+        br = (tl[0] + tw, tl[1] + th)
+        # 绘制矩形边框,将匹配区域标注出来
+        # target:目标图像
+        # tl:矩形定点
+        # br:矩形的宽高
+        # (0,0,255):矩形边框颜色
+        # 1:矩形边框大小
+        cv2.rectangle(target, tl, br, (0, 0, 255), 2)
+        cv2.imwrite(self.out, target)
+        return tl[0]
+
+    @staticmethod
+    def image_edge_detection(img):
+        edges = cv2.Canny(img, 100, 200)
+        return edges
+
+    def discern(self):
+        img1 = self.clear_white(self.gap)
+        img1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
+        slide = self.image_edge_detection(img1)
+
+        back = cv2.imread(self.bg, cv2.COLOR_RGB2GRAY)
+        back = self.image_edge_detection(back)
+
+        slide_pic = cv2.cvtColor(slide, cv2.COLOR_GRAY2RGB)
+        back_pic = cv2.cvtColor(back, cv2.COLOR_GRAY2RGB)
+        x = self.template_match(slide_pic, back_pic)
+        # 输出横坐标, 即 滑块在图片上的位置
+        return x
+
+
+def get_track_simple(distance) -> List[int]:
+    # 有的检测移动速度的 如果匀速移动会被识别出来,来个简单点的 渐进
+    # distance为传入的总距离
+    # 移动轨迹
+    track: List[int] = []
+    # 当前位移
+    current = 0
+    # 减速阈值
+    mid = distance * 4 / 5
+    # 计算间隔
+    t = 0.2
+    # 初速度
+    v = 1
+
+    while current < distance:
+        if current < mid:
+            # 加速度为2
+            a = 4
+        else:
+            # 加速度为-2
+            a = -3
+        v0 = v
+        # 当前速度
+        v = v0 + a * t  # type: ignore
+        # 移动距离
+        move = v0 * t + 1 / 2 * a * t * t
+        # 当前位移
+        current += move  # type: ignore
+        # 加入轨迹
+        track.append(round(move))
+    return track
+
+
+def get_tracks(distance: int, level: str = "easy") -> List[int]:
+    if level == "easy":
+        return get_track_simple(distance)
+    else:
+        from . import easing
+        _, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo")
+        return tricks

+ 106 - 0
tools/time_util.py

@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+# @Author  : relakkes@gmail.com
+# @Time    : 2023/12/2 12:52
+# @Desc    : 时间相关的工具函数
+
+import time
+from datetime import datetime, timedelta, timezone
+
+
+def get_current_timestamp() -> int:
+    """
+    获取当前的时间戳(13 位):1701493264496
+    :return:
+    """
+    return int(time.time() * 1000)
+
+
+def get_current_time() -> str:
+    """
+    获取当前的时间:'2023-12-02 13:01:23'
+    :return:
+    """
+    return time.strftime('%Y-%m-%d %X', time.localtime())
+
+
+def get_current_date() -> str:
+    """
+    获取当前的日期:'2023-12-02'
+    :return:
+    """
+    return time.strftime('%Y-%m-%d', time.localtime())
+
+
+def get_time_str_from_unix_time(unixtime):
+    """
+    unix 整数类型时间戳  ==> 字符串日期时间
+    :param unixtime:
+    :return:
+    """
+    if int(unixtime) > 1000000000000:
+        unixtime = int(unixtime) / 1000
+    return time.strftime('%Y-%m-%d %X', time.localtime(unixtime))
+
+
+def get_date_str_from_unix_time(unixtime):
+    """
+    unix 整数类型时间戳  ==> 字符串日期
+    :param unixtime:
+    :return:
+    """
+    if int(unixtime) > 1000000000000:
+        unixtime = int(unixtime) / 1000
+    return time.strftime('%Y-%m-%d', time.localtime(unixtime))
+
+
+def get_unix_time_from_time_str(time_str):
+    """
+    字符串时间 ==> unix 整数类型时间戳,精确到秒
+    :param time_str:
+    :return:
+    """
+    try:
+        format_str = "%Y-%m-%d %H:%M:%S"
+        tm_object = time.strptime(str(time_str), format_str)
+        return int(time.mktime(tm_object))
+    except Exception as e:
+        return 0
+    pass
+
+
+def get_unix_timestamp():
+    return int(time.time())
+
+
+def rfc2822_to_china_datetime(rfc2822_time):
+    # 定义RFC 2822格式
+    rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
+
+    # 将RFC 2822时间字符串转换为datetime对象
+    dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
+
+    # 将datetime对象的时区转换为中国时区
+    dt_object_china = dt_object.astimezone(timezone(timedelta(hours=8)))
+    return dt_object_china
+
+
+def rfc2822_to_timestamp(rfc2822_time):
+    # 定义RFC 2822格式
+    rfc2822_format = "%a %b %d %H:%M:%S %z %Y"
+
+    # 将RFC 2822时间字符串转换为datetime对象
+    dt_object = datetime.strptime(rfc2822_time, rfc2822_format)
+
+    # 将datetime对象转换为UTC时间
+    dt_utc = dt_object.replace(tzinfo=timezone.utc)
+
+    # 计算UTC时间对应的Unix时间戳
+    timestamp = int(dt_utc.timestamp())
+
+    return timestamp
+
+
+if __name__ == '__main__':
+    # 示例用法
+    _rfc2822_time = "Sat Dec 23 17:12:54 +0800 2023"
+    print(rfc2822_to_china_datetime(_rfc2822_time))

+ 31 - 0
tools/utils.py

@@ -0,0 +1,31 @@
+import argparse
+import logging
+
+from .crawler_util import *
+from .slider_util import *
+from .time_util import *
+
+
+def init_loging_config():
+    level = logging.INFO
+    logging.basicConfig(
+        level=level,
+        format="%(asctime)s [%(threadName)s] %(name)s %(levelname)s (%(filename)s:%(lineno)d) - %(message)s",
+        datefmt='%Y-%m-%d %H:%M:%S'
+    )
+    _logger = logging.getLogger("MediaCrawler")
+    _logger.setLevel(level)
+    return _logger
+
+
+logger = init_loging_config()
+
+def str2bool(v):
+    if isinstance(v, bool):
+        return v
+    if v.lower() in ('yes', 'true', 't', 'y', '1'):
+        return True
+    elif v.lower() in ('no', 'false', 'f', 'n', '0'):
+        return False
+    else:
+        raise argparse.ArgumentTypeError('Boolean value expected.')

+ 77 - 0
util/lock_util.py

@@ -0,0 +1,77 @@
+"""
+
+"""
+import logging
+import os
+import threading
+import time
+
+import api
+
+
+class LockManager:
+    """
+    全局锁管理,每个手机号只能打开一个上下文相同的浏览器
+    """
+
+    def __init__(self):
+        self.locks = {}
+
+    def acquire_lock(self, key):
+        if key not in self.locks:
+            self.locks[key] = threading.Lock()
+        acquire = self.locks[key].acquire(timeout=300)
+        if acquire:
+            logging.info(f"{key} 获取锁成功")
+
+    def release_lock(self, key):
+        if key in self.locks:
+            self.locks[key].release()
+            logging.info(f"{key} 释放锁成功")
+
+    def is_locked(self, key):
+        """
+        检查给定的键是否处于锁定状态
+        """
+        if key in self.locks:
+            return self.locks[key].locked()
+        else:
+            return False
+
+
+def add_phone(lock_key: str, phones: set):
+    directory = f"./.data/{lock_key}"
+    if not os.path.exists(directory):
+        os.makedirs(directory)
+    for entry in os.listdir(directory):
+        # 构建完整的路径
+        full_path = os.path.join(directory, entry)
+        # 检查是否是文件夹
+        if os.path.isdir(full_path):
+            # 如果是文件夹,将文件夹名称添加到集合中
+            phones.add(entry)
+    print(f"已存在的{lock_key}账号:", phones)
+
+
+lock_manager_dict = {
+    "huitun": LockManager(),
+    "xhs": LockManager()
+}
+
+lock_phone_dict = {
+    "huitun": set(),
+    "xhs": set()
+}
+
+for key in lock_phone_dict.keys():
+    add_phone(key, lock_phone_dict[key])
+
+
+def get_idle_phone(key: str):
+    lock_manager = lock_manager_dict[key]
+    api.assert_not_none(lock_manager, "lock_manager is None")
+    while True:
+        for phone in lock_phone_dict[key]:
+            if not lock_manager.is_locked(phone):
+                return phone
+        time.sleep(1)

+ 12 - 0
util/playwright_util.py

@@ -0,0 +1,12 @@
+"""
+
+"""
+
+
+
+def is_element_present(page, selector):
+    try:
+        page.wait_for_selector(selector, timeout=2000)
+        return True
+    except Exception:
+        return False

+ 77 - 0
xhs/__init__.py

@@ -0,0 +1,77 @@
+"""
+小红书
+"""
+import logging
+from time import sleep
+from typing import Optional
+
+from playwright.sync_api import sync_playwright, Playwright
+
+from browser import BaseBrowser
+from tools import utils
+from util.lock_util import LockManager
+from util.playwright_util import is_element_present
+from .client import XiaoHongShuClient
+from .rotate_ident import RotateIdent
+
+lock_manager = LockManager()
+XHS_URL = 'https://www.xiaohongshu.com'
+
+
+class XhsBrowser(BaseBrowser):
+
+    def __init__(self, phone: str, playwright=None):
+        super().__init__(phone, playwright)
+
+    def __get_name__(self):
+        return 'xhs'
+
+    def __init_browser__(self):
+        super().__init_browser__()
+        self.xhs_client = self.create_xhs_client(None)
+        self.rotate_ident = RotateIdent(self.page)
+        self.page.goto(XHS_URL)
+        self.rotate_ident.handle_rotate()
+
+    def create_xhs_client(self, httpx_proxy: Optional[str]) -> XiaoHongShuClient:
+        """Create xhs client"""
+        utils.logger.info("[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ...")
+        cookie_str, cookie_dict = utils.convert_cookies(self.browser.cookies())
+        xhs_client_obj = XiaoHongShuClient(
+            proxies=httpx_proxy,
+            headers={
+                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
+                "Cookie": cookie_str,
+                "Origin": "https://www.xiaohongshu.com",
+                "Referer": "https://www.xiaohongshu.com",
+                "Content-Type": "application/json;charset=UTF-8"
+            },
+            playwright_page=self.page,
+            cookie_dict=cookie_dict,
+        )
+        return xhs_client_obj
+
+    def login(self):
+        with sync_playwright() as playwright:
+            self.__init_browser__()
+            # 暂时采用手动登录
+            self.page.wait_for_timeout(60_000)
+
+    def polish_huitun_note(self, huitun_notes: []):
+        """
+        补齐灰豚文章数据
+        :param huitun_notes:
+        :return:
+        """
+        self.__init_browser__()
+        if not self.xhs_client.pong():
+            return huitun_notes
+        for huitun_note in huitun_notes:
+            try:
+                note_id = huitun_note.get('noteId')
+                note_info = self.xhs_client.get_note_by_id(note_id=note_id)
+                huitun_note['authorInfo'] = note_info.get('user')
+                huitun_note['imageList'] = [img.get('url_default') for img in note_info.get('image_list')]
+                sleep(1)
+            except Exception as e:
+                utils.logger.error(f"爬取小红书异常 {e}")

+ 437 - 0
xhs/client.py

@@ -0,0 +1,437 @@
+import asyncio
+import json
+import re
+from typing import Any, Callable, Dict, List, Optional, Union
+from urllib.parse import urlencode
+
+import httpx
+from playwright.async_api import BrowserContext, Page
+
+# import config
+from tools import utils
+
+# from .exception import DataFetchError, IPBlockError
+from .field import SearchNoteType, SearchSortType
+from .help import get_search_id, sign
+
+
+class XiaoHongShuClient:
+    def __init__(
+            self,
+            timeout=10,
+            proxies=None,
+            *,
+            headers: Dict[str, str],
+            playwright_page: Page,
+            cookie_dict: Dict[str, str],
+    ):
+        self.xsec_token = None
+        self.proxies = proxies
+        self.timeout = timeout
+        self.headers = headers
+        self._host = "https://edith.xiaohongshu.com"
+        self._domain = "https://www.xiaohongshu.com"
+        self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
+        self.IP_ERROR_CODE = 300012
+        self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
+        self.NOTE_ABNORMAL_CODE = -510001
+        self.playwright_page = playwright_page
+        self.cookie_dict = cookie_dict
+
+    def _pre_headers(self, url: str, data=None) -> Dict:
+        """
+        请求头参数签名
+        Args:
+            url:
+            data:
+
+        Returns:
+
+        """
+        encrypt_params = self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
+        local_storage = self.playwright_page.evaluate("() => window.localStorage")
+        signs = sign(
+            a1=self.cookie_dict.get("a1", ""),
+            b1=local_storage.get("b1", ""),
+            x_s=encrypt_params.get("X-s", ""),
+            x_t=str(encrypt_params.get("X-t", ""))
+        )
+
+        headers = {
+            "X-S": signs["x-s"],
+            "X-T": signs["x-t"],
+            "x-S-Common": signs["x-s-common"],
+            "X-B3-Traceid": signs["x-b3-traceid"]
+        }
+        self.headers.update(headers)
+        return self.headers
+
+    def request(self, method, url, **kwargs) -> Union[str, Any]:
+        """
+        封装httpx的公共请求方法,对请求响应做一些处理
+        Args:
+            method: 请求方法
+            url: 请求的URL
+            **kwargs: 其他请求参数,例如请求头、请求体等
+
+        Returns:
+
+        """
+        # return response.text
+        return_response = kwargs.pop('return_response', False)
+
+        with httpx.Client(proxies=self.proxies) as client:
+            response = client.request(
+                method, url, timeout=self.timeout,
+                **kwargs
+            )
+
+        if return_response:
+            return response.text
+
+        data: Dict = response.json()
+        if data["success"]:
+            return data.get("data", data.get("success", {}))
+        elif data["code"] == self.IP_ERROR_CODE:
+            raise Exception(self.IP_ERROR_STR)
+        else:
+            raise Exception(data.get("msg", None))
+
+    async def get(self, uri: str, params=None) -> Dict:
+        """
+        GET请求,对请求头签名
+        Args:
+            uri: 请求路由
+            params: 请求参数
+
+        Returns:
+
+        """
+        final_uri = uri
+        if isinstance(params, dict):
+            final_uri = (f"{uri}?"
+                         f"{urlencode(params)}")
+        headers = self._pre_headers(final_uri)
+        return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
+
+    def post(self, uri: str, data: dict) -> Dict:
+        """
+        POST请求,对请求头签名
+        Args:
+            uri: 请求路由
+            data: 请求体参数
+
+        Returns:
+
+        """
+        headers = self._pre_headers(uri, data)
+        json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
+        return self.request(method="POST", url=f"{self._host}{uri}",
+                            data=json_str, headers=headers)
+
+    def update_xsec_token(self):
+        """
+        更新token
+        :return:
+        """
+        res = self.get_note_by_keyword('车')
+        self.xsec_token = res.get('items')[0].get('xsec_token')
+
+    async def get_note_media(self, url: str) -> Union[bytes, None]:
+        async with httpx.AsyncClient(proxies=self.proxies) as client:
+            response = await client.request("GET", url, timeout=self.timeout)
+            if not response.reason_phrase == "OK":
+                utils.logger.error(f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}")
+                return None
+            else:
+                return response.content
+
+    def pong(self) -> bool:
+        """
+        用于检查登录态是否失效了
+        Returns:
+
+        """
+        """get a note to check if login state is ok"""
+        utils.logger.info("[XiaoHongShuClient.pong] Begin to pong xhs...")
+        ping_flag = False
+        try:
+            note_card: Dict = self.get_note_by_keyword(keyword="小红书")
+            if note_card.get("items"):
+                ping_flag = True
+                self.xsec_token = note_card.get('items')[0].get('xsec_token')
+        except Exception as e:
+            utils.logger.error(f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again...")
+            ping_flag = False
+        return ping_flag
+
+    async def update_cookies(self, browser_context: BrowserContext):
+        """
+        API客户端提供的更新cookies方法,一般情况下登录成功后会调用此方法
+        Args:
+            browser_context: 浏览器上下文对象
+
+        Returns:
+
+        """
+        cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
+        self.headers["Cookie"] = cookie_str
+        self.cookie_dict = cookie_dict
+
+    def get_note_by_keyword(
+            self, keyword: str,
+            page: int = 1, page_size: int = 20,
+            sort: SearchSortType = SearchSortType.GENERAL,
+            note_type: SearchNoteType = SearchNoteType.ALL
+    ) -> Dict:
+        """
+        根据关键词搜索笔记
+        Args:
+            keyword: 关键词参数
+            page: 分页第几页
+            page_size: 分页数据长度
+            sort: 搜索结果排序指定
+            note_type: 搜索的笔记类型
+
+        Returns:
+
+        """
+        uri = "/api/sns/web/v1/search/notes"
+        data = {
+            "keyword": keyword,
+            "page": page,
+            "page_size": page_size,
+            "search_id": get_search_id(),
+            "sort": sort.value,
+            "note_type": note_type.value
+        }
+        return self.post(uri, data)
+
+    def get_note_by_id(self, note_id: str) -> Dict:
+        """
+        获取笔记详情API
+        Args:
+            note_id:笔记ID
+            xsec_source: 渠道来源
+            xsec_token: 搜索关键字之后返回的比较列表中返回的token
+
+        Returns:
+
+        """
+
+        if self.xsec_token == None:
+            self.update_xsec_token()
+
+        data = {
+            "source_note_id": note_id,
+            "image_formats": ["jpg", "webp", "avif"],
+            "extra": {"need_body_topic": 1},
+            "xsec_source": "pc_search",
+            "xsec_token": self.xsec_token
+        }
+        uri = "/api/sns/web/v1/feed"
+        res = self.post(uri, data)
+        if res and res.get("items"):
+            res_dict: Dict = res["items"][0]["note_card"]
+            return res_dict
+        # 爬取频繁了可能会出现有的笔记能有结果有的没有
+        utils.logger.error(f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}")
+        return dict()
+
+    async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
+        """
+        获取一级评论的API
+        Args:
+            note_id: 笔记ID
+            cursor: 分页游标
+
+        Returns:
+
+        """
+        uri = "/api/sns/web/v2/comment/page"
+        params = {
+            "note_id": note_id,
+            "cursor": cursor,
+            "top_comment_id": "",
+            "image_formats": "jpg,webp,avif"
+        }
+        return await self.get(uri, params)
+
+    async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""):
+        """
+        获取指定父评论下的子评论的API
+        Args:
+            note_id: 子评论的帖子ID
+            root_comment_id: 根评论ID
+            num: 分页数量
+            cursor: 分页游标
+
+        Returns:
+
+        """
+        uri = "/api/sns/web/v2/comment/sub/page"
+        params = {
+            "note_id": note_id,
+            "root_comment_id": root_comment_id,
+            "num": num,
+            "cursor": cursor,
+        }
+        return await self.get(uri, params)
+
+    async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0,
+                                    callback: Optional[Callable] = None) -> List[Dict]:
+        """
+        获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
+        Args:
+            note_id: 笔记ID
+            crawl_interval: 爬取一次笔记的延迟单位(秒)
+            callback: 一次笔记爬取结束后
+
+        Returns:
+
+        """
+        result = []
+        comments_has_more = True
+        comments_cursor = ""
+        while comments_has_more:
+            comments_res = await self.get_note_comments(note_id, comments_cursor)
+            comments_has_more = comments_res.get("has_more", False)
+            comments_cursor = comments_res.get("cursor", "")
+            if "comments" not in comments_res:
+                utils.logger.info(
+                    f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
+                break
+            comments = comments_res["comments"]
+            if callback:
+                await callback(note_id, comments)
+            await asyncio.sleep(crawl_interval)
+            result.extend(comments)
+            sub_comments = await self.get_comments_all_sub_comments(comments, crawl_interval, callback)
+            result.extend(sub_comments)
+        return result
+
+    async def get_comments_all_sub_comments(self, comments: List[Dict], crawl_interval: float = 1.0,
+                                            callback: Optional[Callable] = None) -> List[Dict]:
+        """
+        获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息
+        Args:
+            comments: 评论列表
+            crawl_interval: 爬取一次评论的延迟单位(秒)
+            callback: 一次评论爬取结束后
+
+        Returns:
+        """
+        return []
+        # if True:
+        #     utils.logger.info(
+        #         f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
+        #     return []
+        #
+        # result = []
+        # for comment in comments:
+        #     note_id = comment.get("note_id")
+        #     sub_comments = comment.get("sub_comments")
+        #     if sub_comments and callback:
+        #         await callback(note_id, sub_comments)
+        #
+        #     sub_comment_has_more = comment.get("sub_comment_has_more")
+        #     if not sub_comment_has_more:
+        #         continue
+        #
+        #     root_comment_id = comment.get("id")
+        #     sub_comment_cursor = comment.get("sub_comment_cursor")
+        #
+        #     while sub_comment_has_more:
+        #         comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor)
+        #         sub_comment_has_more = comments_res.get("has_more", False)
+        #         sub_comment_cursor = comments_res.get("cursor", "")
+        #         if "comments" not in comments_res:
+        #             utils.logger.info(
+        #                 f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}")
+        #             break
+        #         comments = comments_res["comments"]
+        #         if callback:
+        #             await callback(note_id, comments)
+        #         await asyncio.sleep(crawl_interval)
+        #         result.extend(comments)
+        # return result
+
+    async def get_creator_info(self, user_id: str) -> Dict:
+        """
+        通过解析网页版的用户主页HTML,获取用户个人简要信息
+        PC端用户主页的网页存在window.__INITIAL_STATE__这个变量上的,解析它即可
+        eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
+        """
+        uri = f"/user/profile/{user_id}"
+        html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers)
+        match = re.search(r'<script>window.__INITIAL_STATE__=(.+)<\/script>', html_content, re.M)
+
+        if match is None:
+            return {}
+
+        info = json.loads(match.group(1).replace(':undefined', ':null'), strict=False)
+        if info is None:
+            return {}
+        return info.get('user').get('userPageData')
+
+    async def get_notes_by_creator(
+            self, creator: str,
+            cursor: str,
+            page_size: int = 30
+    ) -> Dict:
+        """
+        获取博主的笔记
+        Args:
+            creator: 博主ID
+            cursor: 上一页最后一条笔记的ID
+            page_size: 分页数据长度
+
+        Returns:
+
+        """
+        uri = "/api/sns/web/v1/user_posted"
+        data = {
+            "user_id": creator,
+            "cursor": cursor,
+            "num": page_size,
+            "image_formats": "jpg,webp,avif"
+        }
+        return await self.get(uri, data)
+
+    async def get_all_notes_by_creator(self, user_id: str, crawl_interval: float = 1.0,
+                                       callback: Optional[Callable] = None) -> List[Dict]:
+        """
+        获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息
+        Args:
+            user_id: 用户ID
+            crawl_interval: 爬取一次的延迟单位(秒)
+            callback: 一次分页爬取结束后的更新回调函数
+
+        Returns:
+
+        """
+        result = []
+        notes_has_more = True
+        notes_cursor = ""
+        while notes_has_more:
+            notes_res = await self.get_notes_by_creator(user_id, notes_cursor)
+            if not notes_res:
+                utils.logger.error(
+                    f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
+                break
+
+            notes_has_more = notes_res.get("has_more", False)
+            notes_cursor = notes_res.get("cursor", "")
+            if "notes" not in notes_res:
+                utils.logger.info(
+                    f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
+                break
+
+            notes = notes_res["notes"]
+            utils.logger.info(
+                f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}")
+            if callback:
+                await callback(notes)
+            await asyncio.sleep(crawl_interval)
+            result.extend(notes)
+        return result

+ 72 - 0
xhs/field.py

@@ -0,0 +1,72 @@
+from enum import Enum
+from typing import NamedTuple
+
+
+class FeedType(Enum):
+    # 推荐
+    RECOMMEND = "homefeed_recommend"
+    # 穿搭
+    FASION = "homefeed.fashion_v3"
+    # 美食
+    FOOD = "homefeed.food_v3"
+    # 彩妆
+    COSMETICS = "homefeed.cosmetics_v3"
+    # 影视
+    MOVIE = "homefeed.movie_and_tv_v3"
+    # 职场
+    CAREER = "homefeed.career_v3"
+    # 情感
+    EMOTION = "homefeed.love_v3"
+    # 家居
+    HOURSE = "homefeed.household_product_v3"
+    # 游戏
+    GAME = "homefeed.gaming_v3"
+    # 旅行
+    TRAVEL = "homefeed.travel_v3"
+    # 健身
+    FITNESS = "homefeed.fitness_v3"
+
+
+class NoteType(Enum):
+    NORMAL = "normal"
+    VIDEO = "video"
+
+
+class SearchSortType(Enum):
+    """search sort type"""
+    # default
+    GENERAL = "general"
+    # most popular
+    MOST_POPULAR = "popularity_descending"
+    # Latest
+    LATEST = "time_descending"
+
+
+class SearchNoteType(Enum):
+    """search note type
+    """
+    # default
+    ALL = 0
+    # only video
+    VIDEO = 1
+    # only image
+    IMAGE = 2
+
+
+class Note(NamedTuple):
+    """note tuple"""
+    note_id: str
+    title: str
+    desc: str
+    type: str
+    user: dict
+    img_urls: list
+    video_url: str
+    tag_list: list
+    at_user_list: list
+    collected_count: str
+    comment_count: str
+    liked_count: str
+    share_count: str
+    time: int
+    last_update_time: int

+ 287 - 0
xhs/help.py

@@ -0,0 +1,287 @@
+import ctypes
+import json
+import random
+import time
+import urllib.parse
+
+
+def sign(a1="", b1="", x_s="", x_t=""):
+    """
+    takes in a URI (uniform resource identifier), an optional data dictionary, and an optional ctime parameter. It returns a dictionary containing two keys: "x-s" and "x-t".
+    """
+    common = {
+        "s0": 3,  # getPlatformCode
+        "s1": "",
+        "x0": "1",  # localStorage.getItem("b1b1")
+        "x1": "3.7.8-2",  # version
+        "x2": "Mac OS",
+        "x3": "xhs-pc-web",
+        "x4": "4.27.2",
+        "x5": a1,  # cookie of a1
+        "x6": x_t,
+        "x7": x_s,
+        "x8": b1,  # localStorage.getItem("b1")
+        "x9": mrc(x_t + x_s + b1),
+        "x10": 154,  # getSigCount
+    }
+    encode_str = encodeUtf8(json.dumps(common, separators=(',', ':')))
+    x_s_common = b64Encode(encode_str)
+    x_b3_traceid = get_b3_trace_id()
+    return {
+        "x-s": x_s,
+        "x-t": x_t,
+        "x-s-common": x_s_common,
+        "x-b3-traceid": x_b3_traceid
+    }
+
+
+def get_b3_trace_id():
+    re = "abcdef0123456789"
+    je = 16
+    e = ""
+    for t in range(16):
+        e += re[random.randint(0, je - 1)]
+    return e
+
+
+def mrc(e):
+    ie = [
+        0, 1996959894, 3993919788, 2567524794, 124634137, 1886057615, 3915621685,
+        2657392035, 249268274, 2044508324, 3772115230, 2547177864, 162941995,
+        2125561021, 3887607047, 2428444049, 498536548, 1789927666, 4089016648,
+        2227061214, 450548861, 1843258603, 4107580753, 2211677639, 325883990,
+        1684777152, 4251122042, 2321926636, 335633487, 1661365465, 4195302755,
+        2366115317, 997073096, 1281953886, 3579855332, 2724688242, 1006888145,
+        1258607687, 3524101629, 2768942443, 901097722, 1119000684, 3686517206,
+        2898065728, 853044451, 1172266101, 3705015759, 2882616665, 651767980,
+        1373503546, 3369554304, 3218104598, 565507253, 1454621731, 3485111705,
+        3099436303, 671266974, 1594198024, 3322730930, 2970347812, 795835527,
+        1483230225, 3244367275, 3060149565, 1994146192, 31158534, 2563907772,
+        4023717930, 1907459465, 112637215, 2680153253, 3904427059, 2013776290,
+        251722036, 2517215374, 3775830040, 2137656763, 141376813, 2439277719,
+        3865271297, 1802195444, 476864866, 2238001368, 4066508878, 1812370925,
+        453092731, 2181625025, 4111451223, 1706088902, 314042704, 2344532202,
+        4240017532, 1658658271, 366619977, 2362670323, 4224994405, 1303535960,
+        984961486, 2747007092, 3569037538, 1256170817, 1037604311, 2765210733,
+        3554079995, 1131014506, 879679996, 2909243462, 3663771856, 1141124467,
+        855842277, 2852801631, 3708648649, 1342533948, 654459306, 3188396048,
+        3373015174, 1466479909, 544179635, 3110523913, 3462522015, 1591671054,
+        702138776, 2966460450, 3352799412, 1504918807, 783551873, 3082640443,
+        3233442989, 3988292384, 2596254646, 62317068, 1957810842, 3939845945,
+        2647816111, 81470997, 1943803523, 3814918930, 2489596804, 225274430,
+        2053790376, 3826175755, 2466906013, 167816743, 2097651377, 4027552580,
+        2265490386, 503444072, 1762050814, 4150417245, 2154129355, 426522225,
+        1852507879, 4275313526, 2312317920, 282753626, 1742555852, 4189708143,
+        2394877945, 397917763, 1622183637, 3604390888, 2714866558, 953729732,
+        1340076626, 3518719985, 2797360999, 1068828381, 1219638859, 3624741850,
+        2936675148, 906185462, 1090812512, 3747672003, 2825379669, 829329135,
+        1181335161, 3412177804, 3160834842, 628085408, 1382605366, 3423369109,
+        3138078467, 570562233, 1426400815, 3317316542, 2998733608, 733239954,
+        1555261956, 3268935591, 3050360625, 752459403, 1541320221, 2607071920,
+        3965973030, 1969922972, 40735498, 2617837225, 3943577151, 1913087877,
+        83908371, 2512341634, 3803740692, 2075208622, 213261112, 2463272603,
+        3855990285, 2094854071, 198958881, 2262029012, 4057260610, 1759359992,
+        534414190, 2176718541, 4139329115, 1873836001, 414664567, 2282248934,
+        4279200368, 1711684554, 285281116, 2405801727, 4167216745, 1634467795,
+        376229701, 2685067896, 3608007406, 1308918612, 956543938, 2808555105,
+        3495958263, 1231636301, 1047427035, 2932959818, 3654703836, 1088359270,
+        936918000, 2847714899, 3736837829, 1202900863, 817233897, 3183342108,
+        3401237130, 1404277552, 615818150, 3134207493, 3453421203, 1423857449,
+        601450431, 3009837614, 3294710456, 1567103746, 711928724, 3020668471,
+        3272380065, 1510334235, 755167117,
+    ]
+    o = -1
+
+    def right_without_sign(num: int, bit: int=0) -> int:
+        val = ctypes.c_uint32(num).value >> bit
+        MAX32INT = 4294967295
+        return (val + (MAX32INT + 1)) % (2 * (MAX32INT + 1)) - MAX32INT - 1
+
+    for n in range(57):
+        o = ie[(o & 255) ^ ord(e[n])] ^ right_without_sign(o, 8)
+    return o ^ -1 ^ 3988292384
+
+
+lookup = [
+    "Z",
+    "m",
+    "s",
+    "e",
+    "r",
+    "b",
+    "B",
+    "o",
+    "H",
+    "Q",
+    "t",
+    "N",
+    "P",
+    "+",
+    "w",
+    "O",
+    "c",
+    "z",
+    "a",
+    "/",
+    "L",
+    "p",
+    "n",
+    "g",
+    "G",
+    "8",
+    "y",
+    "J",
+    "q",
+    "4",
+    "2",
+    "K",
+    "W",
+    "Y",
+    "j",
+    "0",
+    "D",
+    "S",
+    "f",
+    "d",
+    "i",
+    "k",
+    "x",
+    "3",
+    "V",
+    "T",
+    "1",
+    "6",
+    "I",
+    "l",
+    "U",
+    "A",
+    "F",
+    "M",
+    "9",
+    "7",
+    "h",
+    "E",
+    "C",
+    "v",
+    "u",
+    "R",
+    "X",
+    "5",
+]
+
+
+def tripletToBase64(e):
+    return (
+            lookup[63 & (e >> 18)] +
+            lookup[63 & (e >> 12)] +
+            lookup[(e >> 6) & 63] +
+            lookup[e & 63]
+    )
+
+
+def encodeChunk(e, t, r):
+    m = []
+    for b in range(t, r, 3):
+        n = (16711680 & (e[b] << 16)) + \
+            ((e[b + 1] << 8) & 65280) + (e[b + 2] & 255)
+        m.append(tripletToBase64(n))
+    return ''.join(m)
+
+
+def b64Encode(e):
+    P = len(e)
+    W = P % 3
+    U = []
+    z = 16383
+    H = 0
+    Z = P - W
+    while H < Z:
+        U.append(encodeChunk(e, H, Z if H + z > Z else H + z))
+        H += z
+    if 1 == W:
+        F = e[P - 1]
+        U.append(lookup[F >> 2] + lookup[(F << 4) & 63] + "==")
+    elif 2 == W:
+        F = (e[P - 2] << 8) + e[P - 1]
+        U.append(lookup[F >> 10] + lookup[63 & (F >> 4)] +
+                 lookup[(F << 2) & 63] + "=")
+    return "".join(U)
+
+
+def encodeUtf8(e):
+    b = []
+    m = urllib.parse.quote(e, safe='~()*!.\'')
+    w = 0
+    while w < len(m):
+        T = m[w]
+        if T == "%":
+            E = m[w + 1] + m[w + 2]
+            S = int(E, 16)
+            b.append(S)
+            w += 2
+        else:
+            b.append(ord(T[0]))
+        w += 1
+    return b
+
+
+def base36encode(number, alphabet='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'):
+    """Converts an integer to a base36 string."""
+    if not isinstance(number, int):
+        raise TypeError('number must be an integer')
+
+    base36 = ''
+    sign = ''
+
+    if number < 0:
+        sign = '-'
+        number = -number
+
+    if 0 <= number < len(alphabet):
+        return sign + alphabet[number]
+
+    while number != 0:
+        number, i = divmod(number, len(alphabet))
+        base36 = alphabet[i] + base36
+
+    return sign + base36
+
+
+def base36decode(number):
+    return int(number, 36)
+
+
+def get_search_id():
+    e = int(time.time() * 1000) << 64
+    t = int(random.uniform(0, 2147483646))
+    return base36encode((e + t))
+
+
+img_cdns = [
+    "https://sns-img-qc.xhscdn.com",
+    "https://sns-img-hw.xhscdn.com",
+    "https://sns-img-bd.xhscdn.com",
+    "https://sns-img-qn.xhscdn.com",
+]
+
+def get_img_url_by_trace_id(trace_id: str, format_type: str = "png"):
+    return f"{random.choice(img_cdns)}/{trace_id}?imageView2/format/{format_type}"
+
+
+def get_img_urls_by_trace_id(trace_id: str, format_type: str = "png"):
+    return [f"{cdn}/{trace_id}?imageView2/format/{format_type}" for cdn in img_cdns]
+
+
+def get_trace_id(img_url: str):
+    # 浏览器端上传的图片多了 /spectrum/ 这个路径
+    return f"spectrum/{img_url.split('/')[-1]}" if img_url.find("spectrum") != -1 else img_url.split("/")[-1]
+
+
+if __name__ == '__main__':
+    _img_url = "https://sns-img-bd.xhscdn.com/7a3abfaf-90c1-a828-5de7-022c80b92aa3"
+    # 获取一个图片地址在多个cdn下的url地址
+    # final_img_urls = get_img_urls_by_trace_id(get_trace_id(_img_url))
+    final_img_url = get_img_url_by_trace_id(get_trace_id(_img_url))
+    print(final_img_url)
+
+

+ 149 - 0
xhs/rotate_ident.py

@@ -0,0 +1,149 @@
+"""用于处理小红书旋转验证码"""
+
+import logging
+import os
+import time
+import base64
+from io import BytesIO
+import requests
+from PIL import Image
+from playwright.sync_api import Playwright, Page
+from util import playwright_util
+
+root_dir = os.path.dirname(os.path.abspath(__file__))
+
+
+def pil_base64(img, coding='utf-8'):
+    """
+    PIL图片保存为base64编码
+    """
+    img_format = img.format
+    if img_format is None:
+        img_format = 'JPEG'
+
+    format_str = 'JPEG'
+    if 'png' == img_format.lower():
+        format_str = 'PNG'
+    if 'gif' == img_format.lower():
+        format_str = 'gif'
+
+    if img.mode == "P":
+        img = img.convert('RGB')
+    if img.mode == "RGBA":
+        format_str = 'PNG'
+        img_format = 'PNG'
+
+    output_buffer = BytesIO()
+    # img.save(output_buffer, format=format_str)
+    img.save(output_buffer, quality=100, format=format_str)
+    byte_data = output_buffer.getvalue()
+    base64_str = 'data:image/' + img_format.lower() + ';base64,' + base64.b64encode(byte_data).decode(coding)
+    return base64_str
+
+
+def invoke_ident_api(img):
+    """
+    验证码识别接口
+    """
+    url = "http://www.detayun.cn/openapi/verify_code_identify/"
+    data = {
+        # 用户的key
+        "key": "2XbUYAP0jeiaiBV8uAvg",
+        # 验证码类型
+        "verify_idf_id": "24",
+        # 样例图片
+        "img_base64": pil_base64(img),
+        "img_byte": None,
+        # 中文点选,空间语义类型验证码的文本描述(这里缺省为空字符串)
+        "words": ""
+    }
+    header = {"Content-Type": "application/json"}
+    # 发送请求调用接口
+    response = requests.post(url=url, json=data, headers=header, timeout=30)
+    logging.info('verify_code_identify response: %s', response.text)
+    return response.json()
+
+
+class RotateIdent:
+    """
+    旋转验证码处理类
+    """
+
+    def __init__(self, page: Page):
+        self.page = page
+        self.img_path = None
+
+    def need_ident(self) -> bool:
+        """
+        是否需需要识别
+        """
+        return playwright_util.is_element_present(self.page, '//div[@id="red-captcha-rotate"]/img')
+
+    def handle_rotate(self):
+        """
+        处理旋转验证码的核心方法
+        """
+        try_count = 0
+        while self.need_ident() and try_count < 5:
+            try_count += 1
+            logging.info('开始处理旋转验证码,第 %s 次', try_count)
+            tag2 = self.page.query_selector('//div[@class="red-captcha-slider"]')
+            img = self.download_img()
+            response = invoke_ident_api(img)
+            if response['code'] != 200:
+                logging.error(response['msg'])
+            else:
+                angle = int(str(response['data']['res_str']).replace('顺时针旋转', '').replace('度', ''))
+                # 使用鼠标操作进行点击并保持
+                bbox = tag2.bounding_box()
+                x_center = bbox['x'] + bbox['width'] / 2
+                y_center = bbox['y'] + bbox['height'] / 2
+                self.page.mouse.move(x_center, y_center)
+                self.page.mouse.down()
+                # 等待一段时间
+                time.sleep(1)
+                # 计算实际滑动距离 = 像素距离 + 前面空白距离
+                move_x = angle * 0.79
+                # 滑动鼠标
+                self.page.mouse.move(x_center + move_x, y_center + 5)
+                # 等待一段时间
+                time.sleep(1)
+                # 释放鼠标按钮
+                self.page.mouse.up()
+                time.sleep(5)
+        if self.img_path is not None and os.path.exists(self.img_path):
+            os.remove(self.img_path)
+
+    def download_img(self):
+        """
+        下载待处理的验证码并保存
+        """
+        # 找到【旋转图像】元素
+        tag1 = self.page.query_selector('//div[@id="red-captcha-rotate"]/img')
+        # 获取图像链接
+        img_url = tag1.get_attribute('src')
+        logging.info('ident url: %s', img_url)
+        header = {
+            "Host": "picasso-static.xiaohongshu.com",
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
+            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
+            "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
+            "Accept-Encoding": "gzip, deflate, br",
+            "Connection": "keep-alive",
+            "Cookie": "xsecappid=login; a1=1896916369fehn0yq7nomanvre3fghfkj0zubt7zx50000120287; webId=75af27905db67b6fcb29a4899d200062; web_session=030037a385d8a837e5e590cace234a6e266fd5; gid=yYjKjyK484VKyYjKjyKqK89WjidxI8vAWIl6uuC0IhFdq728ikxiTD888yJ8JYW84DySKW0Y; webBuild=2.17.8; websectiga=634d3ad75ffb42a2ade2c5e1705a73c845837578aeb31ba0e442d75c648da36a; sec_poison_id=41187a04-9f82-4fbc-8b98-d530606b7696",
+            "Upgrade-Insecure-Requests": "1",
+            "If-Modified-Since": "Thu, 06 Jul 2023 11:42:07 GMT",
+            "If-None-Match": '"7e53c313a9f321775e8f5e190de21081"',
+            "TE": "Trailers",
+        }
+        # 下载图片
+        response = requests.get(url=img_url, headers=header, timeout=20)
+        img = Image.open(BytesIO(response.content))
+        img_folder = os.path.join(root_dir, 'train_img')
+        # 如果目标文件夹不存在,则创建
+        if not os.path.exists(img_folder):
+            os.makedirs(img_folder)
+        # 构建图片路径并保存图片
+        self.img_path = os.path.join(img_folder, f'{int(time.time() * 1000)}.jpg')
+        img.convert('RGB').save(self.img_path)
+        return img