Browse Source

feat: 支持灰豚搜索热门标签文章

wuwenyi 4 months ago
parent
commit
eee13281e2
4 changed files with 92 additions and 6 deletions
  1. 8 0
      api/__init__.py
  2. 14 0
      api/search.py
  3. 1 0
      browser/__init__.py
  4. 69 6
      huitun/__init__.py

+ 8 - 0
api/__init__.py

@@ -45,6 +45,14 @@ def assert_not_none(data, msg):
         raise BusinessException(msg)
 
 
+def assert_not_empty(data, msg):
+    """
+    断言方法
+    """
+    if data is None or len(data) == 0:
+        raise BusinessException(msg)
+
+
 def success(data=None):
     if data is None:
         return SUCCESS_RESPONSE

+ 14 - 0
api/search.py

@@ -27,6 +27,20 @@ def search_note():
     playwright.stop()
     return api.success(result)
 
+@search_opt.route('/hot-tag-note', methods=["POST"])
+def search_hot_tag_note():
+    """
+    获取热门标签笔记
+    :return:
+    """
+    request_body = request.json
+    playwright = sync_playwright().start()
+    browser = HuiTunBrowser(lock_util.get_idle_phone('huitun'), playwright)
+    result = browser.search_note_by_hot_tag(int(request_body.get('searchLimit')))
+    playwright.stop()
+    return api.success(result)
+
+
 
 @search_opt.route('/note-info', methods=["POST"])
 def search_note_info():

+ 1 - 0
browser/__init__.py

@@ -15,6 +15,7 @@ class BaseBrowser:
         self.page = None
         self.result = None
         self.list_result = []
+        self.map_result = {}
         self.has_more = False
         self.playwright = playwright
 

+ 69 - 6
huitun/__init__.py

@@ -10,16 +10,14 @@ import api
 from browser import BaseBrowser
 from util.lock_util import LockManager
 from util.playwright_util import is_element_present
+from urllib.parse import urlparse, parse_qs
 
 HUITUN_URL = 'https://xhs.huitun.com/'
 lock_manager = LockManager()
+password_dict = {}
 
 
 class HuiTunBrowser(BaseBrowser):
-    def __init__(self, phone: str, playwright=None):
-        super().__init__(phone, playwright)
-        self.password = None
-
     def __get_name__(self):
         return 'huitun'
 
@@ -30,7 +28,7 @@ class HuiTunBrowser(BaseBrowser):
         """
         self.__init_browser__()
         self.page.goto(HUITUN_URL)
-        self.password = password
+        password_dict[self.phone] = password
         self.login_if_need()
         self.page.wait_for_timeout(30_000)
         self.close()
@@ -49,7 +47,7 @@ class HuiTunBrowser(BaseBrowser):
                 if pwd_login is not None:
                     pwd_login.click()
             self.page.get_by_placeholder('请输入手机号').type(self.phone)
-            self.page.get_by_placeholder('6-15位数字与字母组合').type(self.password)
+            self.page.get_by_placeholder('6-15位数字与字母组合').type(password_dict.get(self.phone))
             self.page.get_by_text('登 录', exact=True).click()
             # 验证码登录
             captcha_frame = self.page.frames[1]
@@ -112,3 +110,68 @@ class HuiTunBrowser(BaseBrowser):
                     self.list_result.extend(note_list)
             else:
                 self.has_more = False
+
+    def search_note_by_hot_tag(self, size: int):
+        """抓取热词搜索文章"""
+        lock_manager.acquire_lock(self.phone)
+        try:
+            self.__init_browser__()
+            self.list_result = []
+            self.has_more = True
+            self.page.on('response', self.hot_tag_handler)
+            self.page.goto('https://xhs.huitun.com/#/hot/topic_list')
+            self.login_if_need()
+            self.page.wait_for_timeout(3000)
+            api.assert_not_empty(self.list_result,"获取标签失败")
+            self.list_result = self.list_result[:size]
+            topic_map = {}
+            for tag in self.list_result:
+                logging.info(f'搜索标签:{tag.get("topicName")}')
+                topic_id = tag.get("topicId")
+                topic_map[topic_id] = tag
+                self.page.goto(f'https://xhs.huitun.com/#/anchor/topic_detail?id={topic_id}')
+                self.page.reload()
+                self.page.get_by_text('笔记分析',exact=True).click()
+                self.page.on('response', self.hot_tag_note_handler)
+                self.page.get_by_text('近7天', exact=True).click()
+                page_num = 1
+                # 目前版本最多只能翻 50页
+                while self.map_result.get(topic_id) is not None and len(self.map_result.get(topic_id)) < 30 and page_num <= 50:
+                    logging.info(f'继续搜索热标签图文,当前文章数量:{len(self.map_result.get(topic_id))}, 页数:{page_num}')
+                    self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
+                    self.page.wait_for_timeout(1000)
+                    self.page.query_selector('.ant-pagination-next').click()
+                    self.page.wait_for_timeout(1000)
+                    page_num += 1
+            return {
+                'tagList': self.list_result,
+                'tagNotes': self.map_result
+            }
+        finally:
+            lock_manager.release_lock(self.phone)
+            self.close()
+
+    def hot_tag_handler(self, response):
+        """处理热词搜索请求响应"""
+        if response is not None and ('/topic/search' in response.url or '/rank/topic/add' in response.url):
+            response_body = response.json()
+            if response_body.get('status') == 0:
+                tag_list = response_body.get('extData').get('list')
+                self.list_result = tag_list[0:19]
+
+    def hot_tag_note_handler(self, response):
+        """处理热词搜索文章请求响应"""
+        if response is not None and '/topic/detail/notes/' in response.url:
+            response_body = response.json()
+            if response_body.get('status') == 0:
+                parsed_url = urlparse( response.url)
+                query_params = parse_qs(parsed_url.query)
+                topic_id = query_params.get('topicId', [None])[0]
+                note_list = response_body.get('extData').get('list')
+                # 只筛选图文笔记
+                note_list = [note for note in note_list if note.get('type') == 'normal']
+                exist_note_list = self.map_result.get(topic_id)
+                if exist_note_list is None:
+                    self.map_result[topic_id] = note_list
+                else:
+                    exist_note_list.extend(note_list)