"""用于处理小红书旋转验证码""" import logging import os import time import base64 from io import BytesIO import requests from PIL import Image from playwright.sync_api import Playwright, Page from util import playwright_util root_dir = os.path.dirname(os.path.abspath(__file__)) def pil_base64(img, coding='utf-8'): """ PIL图片保存为base64编码 """ img_format = img.format if img_format is None: img_format = 'JPEG' format_str = 'JPEG' if 'png' == img_format.lower(): format_str = 'PNG' if 'gif' == img_format.lower(): format_str = 'gif' if img.mode == "P": img = img.convert('RGB') if img.mode == "RGBA": format_str = 'PNG' img_format = 'PNG' output_buffer = BytesIO() # img.save(output_buffer, format=format_str) img.save(output_buffer, quality=100, format=format_str) byte_data = output_buffer.getvalue() base64_str = 'data:image/' + img_format.lower() + ';base64,' + base64.b64encode(byte_data).decode(coding) return base64_str def invoke_ident_api(img): """ 验证码识别接口 """ url = "http://www.detayun.cn/openapi/verify_code_identify/" data = { # 用户的key "key": "2XbUYAP0jeiaiBV8uAvg", # 验证码类型 "verify_idf_id": "24", # 样例图片 "img_base64": pil_base64(img), "img_byte": None, # 中文点选,空间语义类型验证码的文本描述(这里缺省为空字符串) "words": "" } header = {"Content-Type": "application/json"} # 发送请求调用接口 response = requests.post(url=url, json=data, headers=header, timeout=30) logging.info('verify_code_identify response: %s', response.text) return response.json() class RotateIdent: """ 旋转验证码处理类 """ def __init__(self, page: Page): self.page = page self.img_path = None def need_ident(self) -> bool: """ 是否需需要识别 """ return playwright_util.is_element_present(self.page, '//div[@id="red-captcha-rotate"]/img') def handle_rotate(self): """ 处理旋转验证码的核心方法 """ try_count = 0 while self.need_ident() and try_count < 4: try_count += 1 logging.info('开始处理旋转验证码,第 %s 次', try_count) tag2 = self.page.query_selector('//div[@class="red-captcha-slider"]') img = self.download_img() response = invoke_ident_api(img) if response['code'] != 200: logging.error(response['msg']) else: angle = response['data']['angle'] # 使用鼠标操作进行点击并保持 bbox = tag2.bounding_box() x_center = bbox['x'] + bbox['width'] / 2 y_center = bbox['y'] + bbox['height'] / 2 self.page.mouse.move(x_center, y_center) self.page.mouse.down() # 计算实际滑动距离 = 像素距离 + 前面空白距离 move_x = angle * 0.79 # 滑动滑块,且坐标多增加一个像素,添加误差 self.page.mouse.move(x_center + move_x + 1, y_center + 5, steps=10) # 等待一段时间 time.sleep(1) # 释放鼠标按钮 self.page.mouse.up() time.sleep(5) if self.img_path is not None and os.path.exists(self.img_path): os.remove(self.img_path) def download_img(self): """ 下载待处理的验证码并保存 """ # 找到【旋转图像】元素 tag1 = self.page.query_selector('//div[@id="red-captcha-rotate"]/img') # 获取图像链接 img_url = tag1.get_attribute('src') logging.info('ident url: %s', img_url) header = { "Host": "picasso-static.xiaohongshu.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Cookie": "xsecappid=login; a1=1896916369fehn0yq7nomanvre3fghfkj0zubt7zx50000120287; webId=75af27905db67b6fcb29a4899d200062; web_session=030037a385d8a837e5e590cace234a6e266fd5; gid=yYjKjyK484VKyYjKjyKqK89WjidxI8vAWIl6uuC0IhFdq728ikxiTD888yJ8JYW84DySKW0Y; webBuild=2.17.8; websectiga=634d3ad75ffb42a2ade2c5e1705a73c845837578aeb31ba0e442d75c648da36a; sec_poison_id=41187a04-9f82-4fbc-8b98-d530606b7696", "Upgrade-Insecure-Requests": "1", "If-Modified-Since": "Thu, 06 Jul 2023 11:42:07 GMT", "If-None-Match": '"7e53c313a9f321775e8f5e190de21081"', "TE": "Trailers", } # 下载图片 return self.do_download_img(img_url) def do_download_img(self, img_url): # 下载图片 response = requests.get(url=img_url, timeout=20) img = Image.open(BytesIO(response.content)) img_folder = os.path.join(root_dir, 'train_img') # 如果目标文件夹不存在,则创建 if not os.path.exists(img_folder): os.makedirs(img_folder) # 构建图片路径并保存图片 self.img_path = os.path.join(img_folder, f'{int(time.time() * 1000)}.jpg') img.convert('RGB').save(self.img_path) return img