__init__.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. """
  2. """
  3. import json
  4. import logging
  5. from playwright.sync_api import sync_playwright, Page, Playwright
  6. import api
  7. from browser import BaseBrowser
  8. from util.lock_util import LockManager
  9. from instagram.data_handler import *
  10. IG_URL = 'https://www.instagram.com/'
  11. lock_manager = LockManager()
  12. class InstagramBrowser(BaseBrowser):
  13. def __init__(self, account: str, playwright=None):
  14. super().__init__(account, playwright)
  15. self.id = None
  16. def __get_name__(self):
  17. return 'instagram'
  18. def __invoke__(self, lambda_func, *args, **kwargs):
  19. lock_manager.acquire_lock(self.account)
  20. try:
  21. with sync_playwright() as playwright:
  22. self.__init_browser__(playwright)
  23. return lambda_func(*args, **kwargs)
  24. finally:
  25. lock_manager.release_lock(self.account)
  26. def search_blog(self, url):
  27. api.assert_not_none(url, 'url不能为空')
  28. self.result = None
  29. self.map_result = {}
  30. self.id = url.lstrip('/').split('/')[-1]
  31. self.browser.on('response', self.blog_info_handler)
  32. self.page.goto(url)
  33. self.page.wait_for_timeout(1000)
  34. self.browser.on('response', self.user_info_handler)
  35. if self.result is not None:
  36. # 将鼠标光标放到头像上,获取粉丝等数据信息
  37. username = self.result['user'].get('username')
  38. head_ele = self.page.locator(f'img[alt="{username}的头像"]')
  39. head_ele.nth(0).hover()
  40. self.page.wait_for_timeout(1000)
  41. if self.map_result.get('author') is not None:
  42. self.result['user'].update(self.map_result['author'])
  43. return self.result
  44. def blog_info_handler(self, response):
  45. if response is None or response.status != 200:
  46. return
  47. if self.id in response.url:
  48. logging.info(f'get {self.id} blog response')
  49. self.result = get_blog_by_doc(response)
  50. def user_info_handler(self, response):
  51. if response is None or response.status != 200:
  52. return
  53. if '/graphql/query' in response.url:
  54. req_params = response.request.post_data_json.get('variables')
  55. if req_params is not None:
  56. req_body = json.loads(req_params)
  57. if 'userID' in req_body:
  58. self.map_result['author'] = get_user_by_request(response)