|
@@ -0,0 +1,143 @@
|
|
|
+"""
|
|
|
+
|
|
|
+"""
|
|
|
+import logging
|
|
|
+import threading
|
|
|
+
|
|
|
+from playwright.sync_api import sync_playwright, Page, Playwright
|
|
|
+
|
|
|
+import api
|
|
|
+
|
|
|
+HUITUN_URL = 'https://xhs.huitun.com/'
|
|
|
+
|
|
|
+
|
|
|
+def is_element_present(page, selector):
|
|
|
+ try:
|
|
|
+ page.wait_for_selector(selector, timeout=2000)
|
|
|
+ return True
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+class LockManager():
|
|
|
+ """
|
|
|
+ 全局锁管理,每个手机号只能打开一个上下文相同的浏览器
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.locks = {}
|
|
|
+
|
|
|
+ def acquire_lock(self, key):
|
|
|
+ if key not in self.locks:
|
|
|
+ self.locks[key] = threading.Lock()
|
|
|
+ acquire = self.locks[key].acquire(timeout=300)
|
|
|
+ if acquire:
|
|
|
+ logging.info(f"{key} 获取锁成功")
|
|
|
+
|
|
|
+ def release_lock(self, key):
|
|
|
+ if key in self.locks:
|
|
|
+ self.locks[key].release()
|
|
|
+ logging.info(f"{key} 释放锁成功")
|
|
|
+
|
|
|
+ def is_locked(self, key):
|
|
|
+ """
|
|
|
+ 检查给定的键是否处于锁定状态
|
|
|
+ """
|
|
|
+ if key in self.locks:
|
|
|
+ return self.locks[key].locked()
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+lock_manager = LockManager()
|
|
|
+
|
|
|
+
|
|
|
+class HuiTunBrowser:
|
|
|
+ def __init__(self, phone: str):
|
|
|
+ api.assert_not_none(phone, "手机号不能为空")
|
|
|
+ self.phone = phone
|
|
|
+ self.browser = None
|
|
|
+ self.page = None
|
|
|
+ self.result = None
|
|
|
+ self.list_result = []
|
|
|
+ self.has_more = False
|
|
|
+
|
|
|
+ def __init_browser__(self, playwright: Playwright):
|
|
|
+ self.browser = playwright.chromium.launch_persistent_context(
|
|
|
+ user_data_dir=f'./.data/huitun/{self.phone}',
|
|
|
+ headless=False,
|
|
|
+ slow_mo=1000,
|
|
|
+ channel="chrome",
|
|
|
+ ignore_https_errors=True,
|
|
|
+ args=[
|
|
|
+ '--disable-blink-features=AutomationControlled',
|
|
|
+ '--incognito',
|
|
|
+ '--ignore-certificate-errors-spki-list',
|
|
|
+ '--disable-web-security', # 禁用 Web 安全性,类似于 ChromeOptions 中的 --ignore-certificate-errors-spki-list
|
|
|
+ '--no-sandbox', # 禁用沙盒模式
|
|
|
+ '--disable-dev-shm-usage', # 禁用/dev/shm使用
|
|
|
+ '--disable-features=site-per-process', # 禁用每个站点的进程,类似于 ChromeOptions 中的 --no-sandbox
|
|
|
+ '--ignore-certificate-errors', # 忽略证书错误
|
|
|
+ '--disable-features=AutomationControlled' # 禁用与自动化相关的特性
|
|
|
+ ])
|
|
|
+ self.browser.add_init_script(path="./stealth.min.js")
|
|
|
+ self.page = self.browser.new_page()
|
|
|
+
|
|
|
+ def close(self):
|
|
|
+ if self.browser is not None:
|
|
|
+ self.browser.close()
|
|
|
+ if self.page is not None:
|
|
|
+ self.page.close()
|
|
|
+
|
|
|
+ def login(self, password: str):
|
|
|
+ """
|
|
|
+ 登录抖音,一个登录之后,全部的页面都有了登录状态
|
|
|
+ :return: 2- 需要验证码 1-登录成功
|
|
|
+ """
|
|
|
+ with sync_playwright() as playwright:
|
|
|
+ self.__init_browser__(playwright)
|
|
|
+ self.page.goto(HUITUN_URL)
|
|
|
+ if is_element_present(self.page, '.ant-modal-body'):
|
|
|
+ if not is_element_present(self.page, 'text=密码登录'):
|
|
|
+ pwd_login = self.page.query_selector('.b9dOaTo9gfF3wLAi7jlXTg\=\=')
|
|
|
+ if pwd_login is not None:
|
|
|
+ pwd_login.click()
|
|
|
+ self.page.get_by_placeholder('请输入手机号').type(self.phone)
|
|
|
+ self.page.get_by_placeholder('6-15位数字与字母组合').type(password)
|
|
|
+ self.page.get_by_text('登 录', exact=True).click()
|
|
|
+ self.page.wait_for_timeout(30_000)
|
|
|
+
|
|
|
+ def search_note(self, tag_name: str, size: int):
|
|
|
+ lock_manager.acquire_lock(self.phone)
|
|
|
+ try:
|
|
|
+ with sync_playwright() as playwright:
|
|
|
+ self.__init_browser__(playwright)
|
|
|
+ self.list_result = []
|
|
|
+ api.assert_not_none(tag_name, "标签不能为空")
|
|
|
+ self.page.on('response', self.search_note_handler)
|
|
|
+ self.page.goto('https://xhs.huitun.com/#/note/note_search')
|
|
|
+ self.page.wait_for_timeout(3000)
|
|
|
+ while size is None or len(self.list_result) < size:
|
|
|
+ logging.info('继续搜索用户主页')
|
|
|
+ self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
|
+ self.page.wait_for_timeout(2000)
|
|
|
+ logging.info('搜索用户主页图文结果数:%s', len(self.list_result))
|
|
|
+ self.close()
|
|
|
+ return self.list_result
|
|
|
+ finally:
|
|
|
+ lock_manager.release_lock(self.phone)
|
|
|
+
|
|
|
+ def search_note_handler(self, response):
|
|
|
+ """
|
|
|
+ 处理用户主页搜索图文请求响应
|
|
|
+ :param response:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ if response is not None and '/note/search' in response.url:
|
|
|
+ response_body = response.json()
|
|
|
+ if response_body.get('status') == 0:
|
|
|
+ note_list = response_body.get('extData').get('list')
|
|
|
+ if len(self.list_result) == 0:
|
|
|
+ self.list_result = note_list
|
|
|
+ else:
|
|
|
+ self.list_result.extend(note_list)
|