__init__.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. """
  2. """
  3. import json
  4. import logging
  5. from playwright.sync_api import sync_playwright, Page, Playwright
  6. import api
  7. from browser import BaseBrowser
  8. from util.lock_util import LockManager
  9. from util.url_util import get_id_by_url
  10. from instagram.data_handler import *
  11. IG_URL = 'https://www.instagram.com/'
  12. lock_manager = LockManager()
  13. def get_post_id(url: str) -> str:
  14. match = re.search(r'instagram\.com/(?:[^/]+/)?p/([^/?]+)', url)
  15. return match.group(1) if match else None
  16. class InstagramBrowser(BaseBrowser):
  17. def __init__(self, account: str, playwright=None):
  18. super().__init__(account, playwright)
  19. self.id = None
  20. def __get_name__(self):
  21. return 'instagram'
  22. def __invoke__(self, lambda_func, *args, **kwargs):
  23. lock_manager.acquire_lock(self.account)
  24. try:
  25. with sync_playwright() as playwright:
  26. self.__init_browser__(playwright)
  27. return lambda_func(*args, **kwargs)
  28. finally:
  29. lock_manager.release_lock(self.account)
  30. def search_blog(self, url):
  31. api.assert_not_none(url, 'url不能为空')
  32. self.result = None
  33. self.map_result = {}
  34. self.id = get_id_by_url(url)
  35. api.assert_not_none(self.id, 'cannot get post id from url')
  36. self.browser.on('response', self.blog_info_handler)
  37. self.page.goto(url)
  38. self.page.wait_for_timeout(1000)
  39. self.browser.on('response', self.user_info_handler)
  40. if self.result is not None:
  41. # 将鼠标光标放到头像上,获取粉丝等数据信息
  42. username = self.result['user'].get('username')
  43. head_ele = self.page.locator(f'img[alt="{username}的头像"]')
  44. head_ele.nth(0).hover()
  45. self.page.wait_for_timeout(1000)
  46. if self.map_result.get('author') is not None:
  47. self.result['user'].update(self.map_result['author'])
  48. return self.result
  49. def blog_info_handler(self, response):
  50. if response is None or response.status != 200:
  51. return
  52. content_type = response.headers.get('content-type', '')
  53. if '/info' in response.url and 'application/json' in content_type:
  54. info = get_blog_by_rsp(response)
  55. if info is not None:
  56. self.result = info
  57. elif self.id in response.url and 'text/html' in content_type:
  58. logging.info(f'get {self.id} blog response')
  59. doc = get_blog_by_doc(response)
  60. if doc is not None:
  61. self.result = doc
  62. def user_info_handler(self, response):
  63. if response is None or response.status != 200:
  64. return
  65. if '/graphql/query' in response.url:
  66. req_params = response.request.post_data_json.get('variables')
  67. if req_params is not None:
  68. req_body = json.loads(req_params)
  69. if 'userID' in req_body:
  70. self.map_result['author'] = get_user_by_request(response)