""" """ import json import re import jsonpath from util import json_util blog_require_fields = ['code', 'pk', 'id', 'taken_at', 'image_versions2', 'user', 'media_type', 'carousel_media', 'carousel_media_count', 'like_count', 'comment_count', 'caption', 'caption_is_edited'] user_require_fields = ['pk', 'id', 'username', 'full_name', 'profile_pic_url', 'latest_reel_media', 'follower_count', 'following_count', 'media_count'] def get_blog_by_doc(response): item = get_blog_json2(response.text()) if not item: return None item = handle_item(item) return item def get_blog_json(html_content): # 逐行读取文件 inside_items = False # 标志是否进入 items 部分 items_buffer = "" # 临时保存 JSON 部分 for line in html_content.splitlines(): # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制) line = line.strip() # 去掉多余的空白字符 if '"xdt_api__v1__media__shortcode__web_info"' in line: items_buffer = '{' inside_items = True # 发现目标字段 continue # 处理 items 数组的部分 if inside_items: items_buffer += line # 累积读取多行 # 如果找到了 JSON 数组的结束 if '"items": [' in items_buffer and ']' in items_buffer: try: # 尝试解析 JSON data = json.loads(items_buffer) # 获取第一个 item return data except json.JSONDecodeError: continue # 如果解析失败,继续读取下一行 return None # 如果没有找到匹配项 def get_blog_json2(html_content): # 逐行读取文件 for line in html_content.splitlines(): # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制) line = line.strip() # 去掉多余的空白字符 if '"xdt_api__v1__media__shortcode__web_info"' in line: script_pattern = re.compile(r'<script\s+type="application/json"[^>]*>(.*?)</script>', re.DOTALL) # 查找所有匹配的 <script> 标签内容 json_str = re.findall(script_pattern, line) data = json.loads(json_str[0]) jsonpath_expr = '$..xdt_api__v1__media__shortcode__web_info.items[0]' # 寻找第一个 item data = jsonpath.jsonpath(data, jsonpath_expr) if data: return data[0] return None # 如果没有找到匹配项 def get_user_by_request(response): response_json = response.json() if response_json.get('status') == 'ok' and 'data' in response_json: user = response_json['data']['user'] user = {k: v for k, v in user.items() if k in user_require_fields} return user else: return None def get_blog_by_rsp(response): response_json = response.json() item = response_json['items'][0] if not item: return None item = handle_item(item) return item def handle_item(item): item = {k: v for k, v in item.items() if k in blog_require_fields} item['cover'] = item['image_versions2']['candidates'][0]['url'] item['image_versions2'] = None item['user'] = {k: v for k, v in item['user'].items() if k in user_require_fields} return item