""" """ import json import logging from playwright.sync_api import sync_playwright, Page, Playwright import api from browser import BaseBrowser from util.lock_util import LockManager from tiktok.data_handler import * IG_URL = 'https://www.tiktok.com/' lock_manager = LockManager() class TikTokBrowser(BaseBrowser): def __init__(self, account: str, playwright=None): super().__init__(account, playwright) self.id = None def __get_name__(self): return 'tiktok' def invoke(self, func, *args, **kwargs): lock_manager.acquire_lock(self.account) try: with sync_playwright() as playwright: self.__init_browser__(playwright) return func(*args, **kwargs) finally: lock_manager.release_lock(self.account) def search_item(self, url): api.assert_not_none(url, 'url不能为空') self.result = None self.map_result = {} self.id = url.lstrip('/').split('/')[-1] self.browser.on('response', self.item_handler) self.page.goto(url=url, wait_until='commit') self.page.wait_for_timeout(2000) return self.result def item_handler(self, response): if response is None or response.status != 200 or self.result is not None: return if self.id in response.url: if '/api/item/detail/' in response.url: self.result = get_detail_by_response(response) elif response.request.resource_type == 'document': logging.info(f'get {self.id} item response') self.result = get_video_by_response(response)