data_handler.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. """
  2. """
  3. import json
  4. import logging
  5. import re
  6. import jsonpath
  7. from util import json_util
  8. user_require_fields = ['id', 'name', 'profile_picture', 'profile_url']
  9. def get_post_by_doc(response):
  10. data = get_post_json(response.text())[0]['node']
  11. content_story = jsonpath.jsonpath(data, 'comet_sections.content.story')[0]
  12. logging.info('获取成功')
  13. # item = {k: v for k, v in item.items() if k in blog_require_fields}
  14. # item['cover'] = item['image_versions2']['candidates'][0]['url']
  15. # item['image_versions2'] = None
  16. # item['user'] = {k: v for k, v in item['user'].items() if k in user_require_fields}
  17. # comet_sections.context_layout.story.comet_sections.actor_photo.story.actors
  18. actor = jsonpath.jsonpath(data, '$..comet_sections.context_layout.story.comet_sections.actor_photo.story.actors[0]')[0]
  19. actor = {k: v for k, v in actor.items() if k in user_require_fields}
  20. image_candidates = jsonpath.jsonpath(data, '$..styles.attachment.all_subattachments.nodes')
  21. photo_meida_candidate = jsonpath.jsonpath(data, '$..styles.attachment.media')
  22. attachments = [img['media'] for img in image_candidates[0]] if image_candidates else photo_meida_candidate
  23. result = {
  24. 'text': content_story['message']['text'],
  25. 'attachments': attachments,
  26. 'post_id': content_story['post_id'],
  27. 'actor': actor,
  28. 'creation_time': jsonpath.jsonpath(data, '$..comet_sections.context_layout.story.comet_sections.metadata[0].story.creation_time')[0],
  29. 'id': content_story['id'],
  30. 'reaction_count': jsonpath.jsonpath(data, '$..comet_ufi_summary_and_actions_renderer.feedback.reaction_count.count')[0],
  31. 'comment_count': jsonpath.jsonpath(data, '$..comment_rendering_instance_for_feed_location.comments.total_count')[0],
  32. 'share_count': jsonpath.jsonpath(data, '$..comet_ufi_summary_and_actions_renderer.feedback.share_count.count')[0]
  33. }
  34. # 图片
  35. return result
  36. def get_blog_json(html_content):
  37. # 逐行读取文件
  38. inside_items = False # 标志是否进入 items 部分
  39. items_buffer = "" # 临时保存 JSON 部分
  40. for line in html_content.splitlines():
  41. # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
  42. line = line.strip() # 去掉多余的空白字符
  43. if '"xdt_api__v1__media__shortcode__web_info"' in line:
  44. items_buffer = '{'
  45. inside_items = True # 发现目标字段
  46. continue
  47. # 处理 items 数组的部分
  48. if inside_items:
  49. items_buffer += line # 累积读取多行
  50. # 如果找到了 JSON 数组的结束
  51. if '"items": [' in items_buffer and ']' in items_buffer:
  52. try:
  53. # 尝试解析 JSON
  54. data = json.loads(items_buffer)
  55. # 获取第一个 item
  56. return data
  57. except json.JSONDecodeError:
  58. continue # 如果解析失败,继续读取下一行
  59. return None # 如果没有找到匹配项
  60. def get_post_json(html_content):
  61. # 逐行读取文件
  62. for line in html_content.splitlines():
  63. # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
  64. line = line.strip() # 去掉多余的空白字符
  65. # pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",(\{.+?\})'
  66. pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",\{(.*)'
  67. # pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",(.*)'
  68. match = re.search(pattern, line, re.DOTALL)
  69. if match:
  70. # print(line)
  71. json_part = match.group(1)
  72. pattern = r'\"data\"\:\{(.*)}'
  73. json_part = re.search(pattern, json_part, re.DOTALL)
  74. data = json_util.parse_json_from_string('{' + json_part.group(1))
  75. # print(story)
  76. # jsonpath_expr = '$..xdt_api__v1__media__shortcode__web_info.items[0]' # 寻找第一个 item
  77. # data = jsonpath.jsonpath(data, jsonpath_expr)
  78. # if data:
  79. # return data[0]
  80. return data
  81. return None # 如果没有找到匹配项
  82. def get_user_by_request(response):
  83. response_json = response.json()
  84. if response_json.get('status') == 'ok' and 'data' in response_json:
  85. user = response_json['data']['user']
  86. user = {k: v for k, v in user.items() if k in user_require_fields}
  87. return user
  88. else:
  89. return None