|
@@ -0,0 +1,70 @@
|
|
|
+"""
|
|
|
+
|
|
|
+"""
|
|
|
+import json
|
|
|
+import logging
|
|
|
+from playwright.sync_api import sync_playwright, Page, Playwright
|
|
|
+
|
|
|
+import api
|
|
|
+from browser import BaseBrowser
|
|
|
+from util.lock_util import LockManager
|
|
|
+from instagram.data_handler import *
|
|
|
+
|
|
|
+IG_URL = 'https://www.instagram.com/'
|
|
|
+
|
|
|
+lock_manager = LockManager()
|
|
|
+
|
|
|
+
|
|
|
+class InstagramBrowser(BaseBrowser):
|
|
|
+
|
|
|
+ def __init__(self, account: str, playwright=None):
|
|
|
+ super().__init__(account, playwright)
|
|
|
+ self.id = None
|
|
|
+
|
|
|
+ def __get_name__(self):
|
|
|
+ return 'instagram'
|
|
|
+
|
|
|
+ def __invoke__(self, lambda_func, *args, **kwargs):
|
|
|
+ lock_manager.acquire_lock(self.account)
|
|
|
+ try:
|
|
|
+ with sync_playwright() as playwright:
|
|
|
+ self.__init_browser__(playwright)
|
|
|
+ return lambda_func(*args, **kwargs)
|
|
|
+ finally:
|
|
|
+ lock_manager.release_lock(self.account)
|
|
|
+
|
|
|
+ def search_blog(self, url):
|
|
|
+ api.assert_not_none(url, 'url不能为空')
|
|
|
+ self.result = None
|
|
|
+ self.map_result = {}
|
|
|
+ self.id = url.lstrip('/').split('/')[-1]
|
|
|
+ self.browser.on('response', self.blog_info_handler)
|
|
|
+ self.page.goto(url)
|
|
|
+ self.page.wait_for_timeout(1000)
|
|
|
+ self.browser.on('response', self.user_info_handler)
|
|
|
+ if self.result is not None:
|
|
|
+
|
|
|
+ username = self.result['user'].get('username')
|
|
|
+ head_ele = self.page.locator(f'img[alt="{username}的头像"]')
|
|
|
+ head_ele.nth(0).hover()
|
|
|
+ self.page.wait_for_timeout(1000)
|
|
|
+ if self.map_result.get('author') is not None:
|
|
|
+ self.result['user'].update(self.map_result['author'])
|
|
|
+ return self.result
|
|
|
+
|
|
|
+ def blog_info_handler(self, response):
|
|
|
+ if response is None or response.status != 200:
|
|
|
+ return
|
|
|
+ if self.id in response.url:
|
|
|
+ logging.info(f'get {self.id} blog response')
|
|
|
+ self.result = get_blog_by_doc(response)
|
|
|
+
|
|
|
+ def user_info_handler(self, response):
|
|
|
+ if response is None or response.status != 200:
|
|
|
+ return
|
|
|
+ if '/graphql/query' in response.url:
|
|
|
+ req_params = response.request.post_data_json.get('variables')
|
|
|
+ if req_params is not None:
|
|
|
+ req_body = json.loads(req_params)
|
|
|
+ if 'userID' in req_body:
|
|
|
+ self.map_result['author'] = get_user_by_request(response)
|