123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- """
- """
- import logging
- import threading
- from playwright.sync_api import sync_playwright, Page, Playwright
- import api
- HUITUN_URL = 'https://xhs.huitun.com/'
- def is_element_present(page, selector):
- try:
- page.wait_for_selector(selector, timeout=2000)
- return True
- except Exception:
- return False
- class LockManager():
- """
- 全局锁管理,每个手机号只能打开一个上下文相同的浏览器
- """
- def __init__(self):
- self.locks = {}
- def acquire_lock(self, key):
- if key not in self.locks:
- self.locks[key] = threading.Lock()
- acquire = self.locks[key].acquire(timeout=300)
- if acquire:
- logging.info(f"{key} 获取锁成功")
- def release_lock(self, key):
- if key in self.locks:
- self.locks[key].release()
- logging.info(f"{key} 释放锁成功")
- def is_locked(self, key):
- """
- 检查给定的键是否处于锁定状态
- """
- if key in self.locks:
- return self.locks[key].locked()
- else:
- return False
- lock_manager = LockManager()
- class HuiTunBrowser:
- def __init__(self, phone: str):
- api.assert_not_none(phone, "手机号不能为空")
- self.phone = phone
- self.browser = None
- self.page = None
- self.result = None
- self.list_result = []
- self.has_more = False
- def __init_browser__(self, playwright: Playwright):
- self.browser = playwright.chromium.launch_persistent_context(
- user_data_dir=f'./.data/huitun/{self.phone}',
- headless=False,
- slow_mo=1000,
- channel="chrome",
- ignore_https_errors=True,
- args=[
- '--disable-blink-features=AutomationControlled',
- '--incognito',
- '--ignore-certificate-errors-spki-list',
- '--disable-web-security', # 禁用 Web 安全性,类似于 ChromeOptions 中的 --ignore-certificate-errors-spki-list
- '--no-sandbox', # 禁用沙盒模式
- '--disable-dev-shm-usage', # 禁用/dev/shm使用
- '--disable-features=site-per-process', # 禁用每个站点的进程,类似于 ChromeOptions 中的 --no-sandbox
- '--ignore-certificate-errors', # 忽略证书错误
- '--disable-features=AutomationControlled' # 禁用与自动化相关的特性
- ])
- self.browser.add_init_script(path="./stealth.min.js")
- self.page = self.browser.new_page()
- def close(self):
- if self.browser is not None:
- self.browser.close()
- if self.page is not None:
- self.page.close()
- def login(self, password: str):
- """
- 登录抖音,一个登录之后,全部的页面都有了登录状态
- :return: 2- 需要验证码 1-登录成功
- """
- with sync_playwright() as playwright:
- self.__init_browser__(playwright)
- self.page.goto(HUITUN_URL)
- if is_element_present(self.page, '.ant-modal-body'):
- if not is_element_present(self.page, 'text=密码登录'):
- pwd_login = self.page.query_selector('.b9dOaTo9gfF3wLAi7jlXTg\=\=')
- if pwd_login is not None:
- pwd_login.click()
- self.page.get_by_placeholder('请输入手机号').type(self.phone)
- self.page.get_by_placeholder('6-15位数字与字母组合').type(password)
- self.page.get_by_text('登 录', exact=True).click()
- self.page.wait_for_timeout(30_000)
- def search_note(self, tag_name: str, size: int):
- lock_manager.acquire_lock(self.phone)
- try:
- with sync_playwright() as playwright:
- self.__init_browser__(playwright)
- self.list_result = []
- api.assert_not_none(tag_name, "标签不能为空")
- self.page.on('response', self.search_note_handler)
- self.page.goto('https://xhs.huitun.com/#/note/note_search')
- self.page.wait_for_timeout(3000)
- while size is None or len(self.list_result) < size:
- logging.info('继续搜索用户主页')
- self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- self.page.wait_for_timeout(2000)
- logging.info('搜索用户主页图文结果数:%s', len(self.list_result))
- self.close()
- return self.list_result
- finally:
- lock_manager.release_lock(self.phone)
- def search_note_handler(self, response):
- """
- 处理用户主页搜索图文请求响应
- :param response:
- :return:
- """
- if response is not None and '/note/search' in response.url:
- response_body = response.json()
- if response_body.get('status') == 0:
- note_list = response_body.get('extData').get('list')
- if len(self.list_result) == 0:
- self.list_result = note_list
- else:
- self.list_result.extend(note_list)
|