123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- """用于处理小红书旋转验证码"""
- import logging
- import os
- import time
- import base64
- from io import BytesIO
- import requests
- from PIL import Image
- from playwright.sync_api import Playwright, Page
- from util import playwright_util
- root_dir = os.path.dirname(os.path.abspath(__file__))
- def pil_base64(img, coding='utf-8'):
- """
- PIL图片保存为base64编码
- """
- img_format = img.format
- if img_format is None:
- img_format = 'JPEG'
- format_str = 'JPEG'
- if 'png' == img_format.lower():
- format_str = 'PNG'
- if 'gif' == img_format.lower():
- format_str = 'gif'
- if img.mode == "P":
- img = img.convert('RGB')
- if img.mode == "RGBA":
- format_str = 'PNG'
- img_format = 'PNG'
- output_buffer = BytesIO()
- # img.save(output_buffer, format=format_str)
- img.save(output_buffer, quality=100, format=format_str)
- byte_data = output_buffer.getvalue()
- base64_str = 'data:image/' + img_format.lower() + ';base64,' + base64.b64encode(byte_data).decode(coding)
- return base64_str
- def invoke_ident_api(img):
- """
- 验证码识别接口
- """
- url = "http://www.detayun.cn/openapi/verify_code_identify/"
- data = {
- # 用户的key
- "key": "2XbUYAP0jeiaiBV8uAvg",
- # 验证码类型
- "verify_idf_id": "24",
- # 样例图片
- "img_base64": pil_base64(img),
- "img_byte": None,
- # 中文点选,空间语义类型验证码的文本描述(这里缺省为空字符串)
- "words": ""
- }
- header = {"Content-Type": "application/json"}
- # 发送请求调用接口
- response = requests.post(url=url, json=data, headers=header, timeout=30)
- logging.info('verify_code_identify response: %s', response.text)
- return response.json()
- class RotateIdent:
- """
- 旋转验证码处理类
- """
- def __init__(self, page: Page):
- self.page = page
- self.img_path = None
- def need_ident(self) -> bool:
- """
- 是否需需要识别
- """
- return playwright_util.is_element_present(self.page, '//div[@id="red-captcha-rotate"]/img')
- def handle_rotate(self):
- """
- 处理旋转验证码的核心方法
- """
- try_count = 0
- while self.need_ident() and try_count < 4:
- try_count += 1
- logging.info('开始处理旋转验证码,第 %s 次', try_count)
- tag2 = self.page.query_selector('//div[@class="red-captcha-slider"]')
- img = self.download_img()
- response = invoke_ident_api(img)
- if response['code'] != 200:
- logging.error(response['msg'])
- else:
- angle = response['data']['angle']
- # 使用鼠标操作进行点击并保持
- bbox = tag2.bounding_box()
- x_center = bbox['x'] + bbox['width'] / 2
- y_center = bbox['y'] + bbox['height'] / 2
- self.page.mouse.move(x_center, y_center)
- self.page.mouse.down()
- # 计算实际滑动距离 = 像素距离 + 前面空白距离
- move_x = angle * 0.79
- # 滑动滑块,且坐标多增加一个像素,添加误差
- self.page.mouse.move(x_center + move_x + 1, y_center + 5, steps=10)
- # 等待一段时间
- time.sleep(1)
- # 释放鼠标按钮
- self.page.mouse.up()
- time.sleep(5)
- if self.img_path is not None and os.path.exists(self.img_path):
- os.remove(self.img_path)
- def download_img(self):
- """
- 下载待处理的验证码并保存
- """
- # 找到【旋转图像】元素
- tag1 = self.page.query_selector('//div[@id="red-captcha-rotate"]/img')
- # 获取图像链接
- img_url = tag1.get_attribute('src')
- logging.info('ident url: %s', img_url)
- header = {
- "Host": "picasso-static.xiaohongshu.com",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
- "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Cookie": "xsecappid=login; a1=1896916369fehn0yq7nomanvre3fghfkj0zubt7zx50000120287; webId=75af27905db67b6fcb29a4899d200062; web_session=030037a385d8a837e5e590cace234a6e266fd5; gid=yYjKjyK484VKyYjKjyKqK89WjidxI8vAWIl6uuC0IhFdq728ikxiTD888yJ8JYW84DySKW0Y; webBuild=2.17.8; websectiga=634d3ad75ffb42a2ade2c5e1705a73c845837578aeb31ba0e442d75c648da36a; sec_poison_id=41187a04-9f82-4fbc-8b98-d530606b7696",
- "Upgrade-Insecure-Requests": "1",
- "If-Modified-Since": "Thu, 06 Jul 2023 11:42:07 GMT",
- "If-None-Match": '"7e53c313a9f321775e8f5e190de21081"',
- "TE": "Trailers",
- }
- # 下载图片
- return self.do_download_img(img_url)
- def do_download_img(self, img_url):
- # 下载图片
- response = requests.get(url=img_url, timeout=20)
- img = Image.open(BytesIO(response.content))
- img_folder = os.path.join(root_dir, 'train_img')
- # 如果目标文件夹不存在,则创建
- if not os.path.exists(img_folder):
- os.makedirs(img_folder)
- # 构建图片路径并保存图片
- self.img_path = os.path.join(img_folder, f'{int(time.time() * 1000)}.jpg')
- img.convert('RGB').save(self.img_path)
- return img
|