123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- """
- """
- import json
- import logging
- import re
- import jsonpath
- from util import json_util
- user_require_fields = ['id', 'name', 'profile_picture', 'profile_url']
- def get_post_by_doc(response):
- data = get_post_json(response.text())[0]['node']
- content_story = jsonpath.jsonpath(data, 'comet_sections.content.story')[0]
- logging.info('获取成功')
- # item = {k: v for k, v in item.items() if k in blog_require_fields}
- # item['cover'] = item['image_versions2']['candidates'][0]['url']
- # item['image_versions2'] = None
- # item['user'] = {k: v for k, v in item['user'].items() if k in user_require_fields}
- # comet_sections.context_layout.story.comet_sections.actor_photo.story.actors
- actor = jsonpath.jsonpath(data, '$..comet_sections.context_layout.story.comet_sections.actor_photo.story.actors[0]')[0]
- actor = {k: v for k, v in actor.items() if k in user_require_fields}
- image_candidates = jsonpath.jsonpath(data, '$..styles.attachment.all_subattachments.nodes')
- photo_meida_candidate = jsonpath.jsonpath(data, '$..styles.attachment.media')
- attachments = [img['media'] for img in image_candidates[0]] if image_candidates else photo_meida_candidate
- result = {
- 'text': content_story['message']['text'],
- 'attachments': attachments,
- 'post_id': content_story['post_id'],
- 'actor': actor,
- 'creation_time': jsonpath.jsonpath(data, '$..comet_sections.context_layout.story.comet_sections.metadata[0].story.creation_time')[0],
- 'id': content_story['id'],
- 'reaction_count': jsonpath.jsonpath(data, '$..comet_ufi_summary_and_actions_renderer.feedback.reaction_count.count')[0],
- 'comment_count': jsonpath.jsonpath(data, '$..comment_rendering_instance_for_feed_location.comments.total_count')[0],
- 'share_count': jsonpath.jsonpath(data, '$..comet_ufi_summary_and_actions_renderer.feedback.share_count.count')[0]
- }
- # 图片
- return result
- def get_blog_json(html_content):
- # 逐行读取文件
- inside_items = False # 标志是否进入 items 部分
- items_buffer = "" # 临时保存 JSON 部分
- for line in html_content.splitlines():
- # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
- line = line.strip() # 去掉多余的空白字符
- if '"xdt_api__v1__media__shortcode__web_info"' in line:
- items_buffer = '{'
- inside_items = True # 发现目标字段
- continue
- # 处理 items 数组的部分
- if inside_items:
- items_buffer += line # 累积读取多行
- # 如果找到了 JSON 数组的结束
- if '"items": [' in items_buffer and ']' in items_buffer:
- try:
- # 尝试解析 JSON
- data = json.loads(items_buffer)
- # 获取第一个 item
- return data
- except json.JSONDecodeError:
- continue # 如果解析失败,继续读取下一行
- return None # 如果没有找到匹配项
- def get_post_json(html_content):
- # 逐行读取文件
- for line in html_content.splitlines():
- # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
- line = line.strip() # 去掉多余的空白字符
- # pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",(\{.+?\})'
- pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",\{(.*)'
- # pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",(.*)'
- match = re.search(pattern, line, re.DOTALL)
- if match:
- # print(line)
- json_part = match.group(1)
- pattern = r'\"data\"\:\{(.*)}'
- json_part = re.search(pattern, json_part, re.DOTALL)
- data = json_util.parse_json_from_string('{' + json_part.group(1))
- # print(story)
- # jsonpath_expr = '$..xdt_api__v1__media__shortcode__web_info.items[0]' # 寻找第一个 item
- # data = jsonpath.jsonpath(data, jsonpath_expr)
- # if data:
- # return data[0]
- return data
- return None # 如果没有找到匹配项
- def get_user_by_request(response):
- response_json = response.json()
- if response_json.get('status') == 'ok' and 'data' in response_json:
- user = response_json['data']['user']
- user = {k: v for k, v in user.items() if k in user_require_fields}
- return user
- else:
- return None
|