123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- """
- """
- import logging
- from huitun.captcha_ident import CaptchaIdent
- from playwright.sync_api import sync_playwright, Page, Playwright
- import api
- from browser import BaseBrowser
- from util.lock_util import LockManager
- from util.playwright_util import is_element_present
- from urllib.parse import urlparse, parse_qs
- HUITUN_URL = 'https://xhs.huitun.com/'
- lock_manager = LockManager()
- password_dict = {}
- class HuiTunBrowser(BaseBrowser):
- def __get_name__(self):
- return 'huitun'
- def login(self, password: str):
- """
- 登录抖音,一个登录之后,全部的页面都有了登录状态
- :return: 2- 需要验证码 1-登录成功
- """
- self.__init_browser__()
- self.page.goto(HUITUN_URL)
- password_dict[self.phone] = password
- self.login_if_need()
- self.page.wait_for_timeout(30_000)
- self.close()
- def login_if_need(self):
- """
- 登录灰豚
- """
- login_info_expired = self.page.query_selector('.ant-btn-primary:has-text("知道了")')
- if login_info_expired is not None:
- login_info_expired.click()
- if is_element_present(self.page, '.ant-modal-body'):
- logging.info('灰豚需要重新登录')
- if not is_element_present(self.page, 'text=密码登录'):
- pwd_login = self.page.query_selector('.b9dOaTo9gfF3wLAi7jlXTg\=\=')
- if pwd_login is not None:
- pwd_login.click()
- self.page.get_by_placeholder('请输入手机号').type(self.phone)
- self.page.get_by_placeholder('6-15位数字与字母组合').type(password_dict.get(self.phone))
- self.page.get_by_text('登 录', exact=True).click()
- # 验证码登录
- captcha_frame = self.page.frames[1]
- if captcha_frame is not None:
- captcha_tool = CaptchaIdent(self.page)
- captcha_tool.start()
- def search_note(self, tag_name: str, size: int):
- lock_manager.acquire_lock(self.phone)
- try:
- self.__init_browser__()
- self.list_result = []
- self.has_more = True
- api.assert_not_none(tag_name, "标签不能为空")
- self.page.goto('https://xhs.huitun.com/#/note/note_search')
- self.page.wait_for_timeout(2000)
- self.login_if_need()
- # 展开全部标签
- self.page.query_selector('.zgInWFcVVDjRN6BUMm3N0g\=\=').click()
- last_tag = self.page.query_selector('.fyBvQcyA81sogVJY0YVnhg\=\=')
- if last_tag is not None:
- last_tag.click()
- tag_ele = self.page.query_selector(f'.IRk6XOEYweiS9APLHrOp-w\=\=:has-text("{tag_name}")')
- if tag_ele is not None:
- tag_ele.click()
- self.page.get_by_text('图文笔记', exact=True).click()
- self.page.wait_for_timeout(500)
- self.page.on('response', self.search_note_handler)
- self.page.get_by_text('近3天', exact=True).click()
- # 限定一个上限
- page_num = int(2 * size / 10)
- for i in range(page_num):
- if size is not None and len(self.list_result) >= size:
- break
- logging.info('继续搜索灰豚')
- self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- self.page.wait_for_timeout(2000)
- logging.info('搜索灰豚结果数:%s', len(self.list_result))
- if not self.has_more:
- break
- return self.list_result
- finally:
- lock_manager.release_lock(self.phone)
- self.close()
- def search_note_handler(self, response):
- """
- 处理用户主页搜索图文请求响应
- :param response:
- :return:
- """
- if response is not None and '/note/search' in response.url:
- response_body = response.json()
- if response_body.get('status') == 0:
- note_list = response_body.get('extData').get('list')
- self.has_more = len(note_list) > 0
- if len(self.list_result) == 0:
- self.list_result = note_list
- else:
- self.list_result.extend(note_list)
- else:
- self.has_more = False
- def search_note_by_hot_tag(self, size: int):
- """抓取热词搜索文章"""
- lock_manager.acquire_lock(self.phone)
- try:
- self.__init_browser__()
- self.list_result = []
- self.has_more = True
- self.page.on('response', self.hot_tag_handler)
- self.page.goto('https://xhs.huitun.com/#/hot/topic_list')
- self.login_if_need()
- self.page.wait_for_timeout(3000)
- api.assert_not_empty(self.list_result,"获取标签失败")
- self.list_result = self.list_result[:size]
- topic_map = {}
- for tag in self.list_result:
- logging.info(f'搜索标签:{tag.get("topicName")}')
- topic_id = tag.get("topicId")
- topic_map[topic_id] = tag
- self.page.goto(f'https://xhs.huitun.com/#/anchor/topic_detail?id={topic_id}')
- self.page.reload()
- self.page.get_by_text('笔记分析',exact=True).click()
- self.page.on('response', self.hot_tag_note_handler)
- self.page.get_by_text('近7天', exact=True).click()
- page_num = 1
- # 目前版本最多只能翻 50页
- while self.map_result.get(topic_id) is not None and len(self.map_result.get(topic_id)) < 30 and page_num <= 50:
- logging.info(f'继续搜索热标签图文,当前文章数量:{len(self.map_result.get(topic_id))}, 页数:{page_num}')
- self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- self.page.wait_for_timeout(1000)
- self.page.query_selector('.ant-pagination-next').click()
- self.page.wait_for_timeout(1000)
- page_num += 1
- return {
- 'tagList': self.list_result,
- 'tagNotes': self.map_result
- }
- finally:
- lock_manager.release_lock(self.phone)
- self.close()
- def hot_tag_handler(self, response):
- """处理热词搜索请求响应"""
- if response is not None and ('/topic/search' in response.url or '/rank/topic/add' in response.url):
- response_body = response.json()
- if response_body.get('status') == 0:
- tag_list = response_body.get('extData').get('list')
- self.list_result = tag_list[0:19]
- def hot_tag_note_handler(self, response):
- """处理热词搜索文章请求响应"""
- if response is not None and '/topic/detail/notes/' in response.url:
- response_body = response.json()
- if response_body.get('status') == 0:
- parsed_url = urlparse( response.url)
- query_params = parse_qs(parsed_url.query)
- topic_id = query_params.get('topicId', [None])[0]
- note_list = response_body.get('extData').get('list')
- # 只筛选图文笔记
- note_list = [note for note in note_list if note.get('type') == 'normal']
- exist_note_list = self.map_result.get(topic_id)
- if exist_note_list is None:
- self.map_result[topic_id] = note_list
- else:
- exist_note_list.extend(note_list)
|