123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- """
- """
- import json
- import logging
- from playwright.sync_api import sync_playwright, Page, Playwright
- import api
- from browser import BaseBrowser
- from util.lock_util import LockManager
- from instagram.data_handler import *
- IG_URL = 'https://www.instagram.com/'
- lock_manager = LockManager()
- def get_post_id(url: str) -> str:
- match = re.search(r'instagram\.com/(?:[^/]+/)?p/([^/?]+)', url)
- return match.group(1) if match else None
- class InstagramBrowser(BaseBrowser):
- def __init__(self, account: str, playwright=None):
- super().__init__(account, playwright)
- self.id = None
- def __get_name__(self):
- return 'instagram'
- def __invoke__(self, lambda_func, *args, **kwargs):
- lock_manager.acquire_lock(self.account)
- try:
- with sync_playwright() as playwright:
- self.__init_browser__(playwright)
- return lambda_func(*args, **kwargs)
- finally:
- lock_manager.release_lock(self.account)
- def search_blog(self, url):
- api.assert_not_none(url, 'url不能为空')
- self.result = None
- self.map_result = {}
- self.id = get_post_id(url)
- self.browser.on('response', self.blog_info_handler)
- self.page.goto(url)
- self.page.wait_for_timeout(1000)
- self.browser.on('response', self.user_info_handler)
- if self.result is not None:
- # 将鼠标光标放到头像上,获取粉丝等数据信息
- username = self.result['user'].get('username')
- head_ele = self.page.locator(f'img[alt="{username}的头像"]')
- head_ele.nth(0).hover()
- self.page.wait_for_timeout(1000)
- if self.map_result.get('author') is not None:
- self.result['user'].update(self.map_result['author'])
- return self.result
- def blog_info_handler(self, response):
- if response is None or response.status != 200:
- return
- if '/info' in response.url:
- info = get_blog_by_rsp(response)
- if info is not None:
- self.result = info
- elif self.id in response.url:
- logging.info(f'get {self.id} blog response')
- doc = get_blog_by_doc(response)
- if doc is not None:
- self.result = doc
- def user_info_handler(self, response):
- if response is None or response.status != 200:
- return
- if '/graphql/query' in response.url:
- req_params = response.request.post_data_json.get('variables')
- if req_params is not None:
- req_body = json.loads(req_params)
- if 'userID' in req_body:
- self.map_result['author'] = get_user_by_request(response)
|