""" """ import json import logging from playwright.sync_api import sync_playwright, Page, Playwright import api from browser import BaseBrowser from util.lock_util import LockManager from facebook.data_handler import * FACEBOOK_URL = 'https://www.facebook.com' lock_manager = LockManager() class FackBookBrowser(BaseBrowser): def __init__(self, account: str, playwright=None): super().__init__(account, playwright) self.id = None def __get_name__(self): return 'facebook' def invoke(self, lambda_func, *args, **kwargs): lock_manager.acquire_lock(self.account) try: with sync_playwright() as playwright: self.__init_browser__(playwright) return lambda_func(*args, **kwargs) finally: lock_manager.release_lock(self.account) def login(self): self.page.goto(FACEBOOK_URL) self.page.wait_for_timeout(80000) def search_post(self, url): api.assert_not_none(url, 'url不能为空') self.result = None self.map_result = {} self.id = url.lstrip('/').split('/')[-1] self.browser.on('response', self.post_info_handler) self.page.goto(url) self.page.wait_for_timeout(2000) # if self.result is not None: # # 将鼠标光标放到头像上,获取粉丝等数据信息 # username = self.result['actor'].get('name') # head_ele = self.page.locator(f'img[alt="{username}的头像"]') # head_ele.nth(0).hover() # self.page.wait_for_timeout(1000) return self.result def post_info_handler(self, response): if response is None or response.status != 200: return if self.id in response.url and response.request.resource_type == 'document': logging.info(f'get {self.id} post response') self.result = get_post_by_doc(response) def user_info_handler(self, response): if response is None or response.status != 200: return if '/graphql/query' in response.url: req_params = response.request.post_data_json.get('variables') if req_params is not None: req_body = json.loads(req_params) if 'userID' in req_body: self.map_result['author'] = get_user_by_request(response)