""" """ import json import logging import re import jsonpath from util import json_util user_require_fields = ['id', 'name', 'profile_picture', 'profile_url'] def get_post_by_doc(response): data = get_post_json(response.text())[0]['node'] content_story = jsonpath.jsonpath(data, 'comet_sections.content.story')[0] logging.info('获取成功') # item = {k: v for k, v in item.items() if k in blog_require_fields} # item['cover'] = item['image_versions2']['candidates'][0]['url'] # item['image_versions2'] = None # item['user'] = {k: v for k, v in item['user'].items() if k in user_require_fields} # comet_sections.context_layout.story.comet_sections.actor_photo.story.actors actor = jsonpath.jsonpath(data, '$..comet_sections.context_layout.story.comet_sections.actor_photo.story.actors[0]')[0] actor = {k: v for k, v in actor.items() if k in user_require_fields} image_candidates = jsonpath.jsonpath(data, '$..styles.attachment.all_subattachments.nodes') photo_meida_candidate = jsonpath.jsonpath(data, '$..styles.attachment.media') attachments = [img['media'] for img in image_candidates[0]] if image_candidates else photo_meida_candidate result = { 'text': content_story['message']['text'], 'attachments': attachments, 'post_id': content_story['post_id'], 'actor': actor, 'creation_time': jsonpath.jsonpath(data, '$..comet_sections.context_layout.story.comet_sections.metadata[0].story.creation_time')[0], 'id': content_story['id'], 'reaction_count': jsonpath.jsonpath(data, '$..comet_ufi_summary_and_actions_renderer.feedback.reaction_count.count')[0], 'comment_count': jsonpath.jsonpath(data, '$..comment_rendering_instance_for_feed_location.comments.total_count')[0], 'share_count': jsonpath.jsonpath(data, '$..comet_ufi_summary_and_actions_renderer.feedback.share_count.count')[0] } # 图片 return result def get_blog_json(html_content): # 逐行读取文件 inside_items = False # 标志是否进入 items 部分 items_buffer = "" # 临时保存 JSON 部分 for line in html_content.splitlines(): # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制) line = line.strip() # 去掉多余的空白字符 if '"xdt_api__v1__media__shortcode__web_info"' in line: items_buffer = '{' inside_items = True # 发现目标字段 continue # 处理 items 数组的部分 if inside_items: items_buffer += line # 累积读取多行 # 如果找到了 JSON 数组的结束 if '"items": [' in items_buffer and ']' in items_buffer: try: # 尝试解析 JSON data = json.loads(items_buffer) # 获取第一个 item return data except json.JSONDecodeError: continue # 如果解析失败,继续读取下一行 return None # 如果没有找到匹配项 def get_post_json(html_content): # 逐行读取文件 for line in html_content.splitlines(): # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制) line = line.strip() # 去掉多余的空白字符 # pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",(\{.+?\})' pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",\{(.*)' # pattern = r'adp_Comet\w+ContentQueryRelayPreloader_\w+",(.*)' match = re.search(pattern, line, re.DOTALL) if match: # print(line) json_part = match.group(1) pattern = r'\"data\"\:\{(.*)}' json_part = re.search(pattern, json_part, re.DOTALL) data = json_util.parse_json_from_string('{' + json_part.group(1)) # print(story) # jsonpath_expr = '$..xdt_api__v1__media__shortcode__web_info.items[0]' # 寻找第一个 item # data = jsonpath.jsonpath(data, jsonpath_expr) # if data: # return data[0] return data return None # 如果没有找到匹配项 def get_user_by_request(response): response_json = response.json() if response_json.get('status') == 'ok' and 'data' in response_json: user = response_json['data']['user'] user = {k: v for k, v in user.items() if k in user_require_fields} return user else: return None