data_handler.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. """
  2. """
  3. import json
  4. import re
  5. import jsonpath
  6. item_require_fields = [
  7. 'id', 'author', 'authorStats', 'createTime', 'desc', 'imagePost', 'stats',' bitrateInfo', 'cover', 'CategoryType'
  8. ]
  9. def get_video_by_response(response):
  10. item_struct = get_item_json2(response.text())
  11. item_struct['cover'] = item_struct.get('video').get('cover')
  12. item_struct = {k: v for k, v in item_struct.items() if k in item_require_fields}
  13. return item_struct
  14. def get_video_json(html_content):
  15. # 逐行读取文件
  16. inside_items = False # 标志是否进入 items 部分
  17. items_buffer = "" # 临时保存 JSON 部分
  18. for line in html_content.splitlines():
  19. # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
  20. line = line.strip() # 去掉多余的空白字符
  21. if '"itemStruct"' in line:
  22. items_buffer = '{'
  23. inside_items = True # 发现目标字段
  24. continue
  25. # 处理 items 数组的部分
  26. if inside_items:
  27. items_buffer += line # 累积读取多行
  28. # 如果找到了 JSON 数组的结束
  29. if '}' in items_buffer:
  30. try:
  31. # 尝试解析 JSON
  32. data = json.loads(items_buffer)
  33. # 获取第一个 item
  34. return data
  35. except json.JSONDecodeError:
  36. continue # 如果解析失败,继续读取下一行
  37. # '<script id="__UNIVERSAL_DATA_FOR_REHYDRATION__" type="application/json">'
  38. return None # 如果没有找到匹配项
  39. def get_item_json2(html_content):
  40. # 逐行读取文件
  41. for line in html_content.splitlines():
  42. # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
  43. line = line.strip() # 去掉多余的空白字符
  44. if '"itemStruct"' in line:
  45. script_pattern = re.compile(r'<script\s+id="__UNIVERSAL_DATA_FOR_REHYDRATION__"\s+type="application/json"[^>]*>(.*?)</script>', re.DOTALL)
  46. # 查找所有匹配的 <script> 标签内容
  47. json_str = re.findall(script_pattern, line)
  48. data = json.loads(json_str[0])
  49. jsonpath_expr = '$..itemStruct'
  50. data = jsonpath.jsonpath(data, jsonpath_expr)
  51. if data:
  52. return data[0]
  53. return None # 如果没有找到匹配项
  54. def get_detail_by_response(response):
  55. json = response.json()
  56. item_struct = json.get('itemInfo').get('itemStruct')
  57. item_struct = {k: v for k, v in item_struct.items() if k in item_require_fields}
  58. return item_struct
  59. # with open('../aweme_video.txt', 'r') as file:
  60. # data = get_video_json(file.read())
  61. # print(data)