data_handler.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. """
  2. """
  3. import json
  4. import re
  5. import jsonpath
  6. from util import json_util
  7. blog_require_fields = ['code', 'pk', 'id', 'taken_at', 'image_versions2', 'user', 'media_type', 'carousel_media',
  8. 'carousel_media_count',
  9. 'like_count', 'comment_count', 'caption', 'caption_is_edited']
  10. user_require_fields = ['pk', 'id', 'username', 'full_name', 'profile_pic_url', 'latest_reel_media', 'follower_count',
  11. 'following_count', 'media_count']
  12. def get_blog_by_doc(response):
  13. item = get_blog_json2(response.text())
  14. if not item:
  15. return None
  16. item = handle_item(item)
  17. return item
  18. def get_blog_json(html_content):
  19. # 逐行读取文件
  20. inside_items = False # 标志是否进入 items 部分
  21. items_buffer = "" # 临时保存 JSON 部分
  22. for line in html_content.splitlines():
  23. # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
  24. line = line.strip() # 去掉多余的空白字符
  25. if '"xdt_api__v1__media__shortcode__web_info"' in line:
  26. items_buffer = '{'
  27. inside_items = True # 发现目标字段
  28. continue
  29. # 处理 items 数组的部分
  30. if inside_items:
  31. items_buffer += line # 累积读取多行
  32. # 如果找到了 JSON 数组的结束
  33. if '"items": [' in items_buffer and ']' in items_buffer:
  34. try:
  35. # 尝试解析 JSON
  36. data = json.loads(items_buffer)
  37. # 获取第一个 item
  38. return data
  39. except json.JSONDecodeError:
  40. continue # 如果解析失败,继续读取下一行
  41. return None # 如果没有找到匹配项
  42. def get_blog_json2(html_content):
  43. # 逐行读取文件
  44. for line in html_content.splitlines():
  45. # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
  46. line = line.strip() # 去掉多余的空白字符
  47. if '"xdt_api__v1__media__shortcode__web_info"' in line:
  48. script_pattern = re.compile(r'<script\s+type="application/json"[^>]*>(.*?)</script>', re.DOTALL)
  49. # 查找所有匹配的 <script> 标签内容
  50. json_str = re.findall(script_pattern, line)
  51. data = json.loads(json_str[0])
  52. jsonpath_expr = '$..xdt_api__v1__media__shortcode__web_info.items[0]' # 寻找第一个 item
  53. data = jsonpath.jsonpath(data, jsonpath_expr)
  54. if data:
  55. return data[0]
  56. return None # 如果没有找到匹配项
  57. def get_user_by_request(response):
  58. response_json = response.json()
  59. if response_json.get('status') == 'ok' and 'data' in response_json:
  60. user = response_json['data']['user']
  61. user = {k: v for k, v in user.items() if k in user_require_fields}
  62. return user
  63. else:
  64. return None
  65. def get_blog_by_rsp(response):
  66. response_json = response.json()
  67. item = response_json['items'][0]
  68. if not item:
  69. return None
  70. item = handle_item(item)
  71. return item
  72. def handle_item(item):
  73. item = {k: v for k, v in item.items() if k in blog_require_fields}
  74. item['cover'] = item['image_versions2']['candidates'][0]['url']
  75. item['image_versions2'] = None
  76. item['user'] = {k: v for k, v in item['user'].items() if k in user_require_fields}
  77. return item