123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- """
- """
- import json
- import logging
- from playwright.sync_api import sync_playwright, Page, Playwright
- import api
- from browser import BaseBrowser
- from util.lock_util import LockManager
- from tiktok.data_handler import *
- IG_URL = 'https://www.tiktok.com/'
- lock_manager = LockManager()
- class TikTokBrowser(BaseBrowser):
- def __init__(self, account: str, playwright=None):
- super().__init__(account, playwright)
- self.id = None
- def __get_name__(self):
- return 'tiktok'
- def invoke(self, func, *args, **kwargs):
- lock_manager.acquire_lock(self.account)
- try:
- with sync_playwright() as playwright:
- self.__init_browser__(playwright)
- return func(*args, **kwargs)
- finally:
- lock_manager.release_lock(self.account)
- def search_item(self, url):
- api.assert_not_none(url, 'url不能为空')
- self.result = None
- self.map_result = {}
- self.id = url.lstrip('/').split('/')[-1]
- self.browser.on('response', self.item_handler)
- self.page.goto(url)
- self.page.wait_for_timeout(2000)
- return self.result
- def item_handler(self, response):
- if response is None or response.status != 200 or self.result is not None:
- return
- if self.id in response.url:
- if '/api/item/detail/' in response.url:
- self.result = get_detail_by_response(response)
- elif response.request.resource_type == 'document':
- logging.info(f'get {self.id} item response')
- self.result = get_video_by_response(response)
|