rotate_ident.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. """用于处理小红书旋转验证码"""
  2. import logging
  3. import os
  4. import time
  5. import base64
  6. from io import BytesIO
  7. import requests
  8. from PIL import Image
  9. from playwright.sync_api import Playwright, Page
  10. from util import playwright_util
  11. root_dir = os.path.dirname(os.path.abspath(__file__))
  12. def pil_base64(img, coding='utf-8'):
  13. """
  14. PIL图片保存为base64编码
  15. """
  16. img_format = img.format
  17. if img_format is None:
  18. img_format = 'JPEG'
  19. format_str = 'JPEG'
  20. if 'png' == img_format.lower():
  21. format_str = 'PNG'
  22. if 'gif' == img_format.lower():
  23. format_str = 'gif'
  24. if img.mode == "P":
  25. img = img.convert('RGB')
  26. if img.mode == "RGBA":
  27. format_str = 'PNG'
  28. img_format = 'PNG'
  29. output_buffer = BytesIO()
  30. # img.save(output_buffer, format=format_str)
  31. img.save(output_buffer, quality=100, format=format_str)
  32. byte_data = output_buffer.getvalue()
  33. base64_str = 'data:image/' + img_format.lower() + ';base64,' + base64.b64encode(byte_data).decode(coding)
  34. return base64_str
  35. def invoke_ident_api(img):
  36. """
  37. 验证码识别接口
  38. """
  39. url = "http://www.detayun.cn/openapi/verify_code_identify/"
  40. data = {
  41. # 用户的key
  42. "key": "2XbUYAP0jeiaiBV8uAvg",
  43. # 验证码类型
  44. "verify_idf_id": "24",
  45. # 样例图片
  46. "img_base64": pil_base64(img),
  47. "img_byte": None,
  48. # 中文点选,空间语义类型验证码的文本描述(这里缺省为空字符串)
  49. "words": ""
  50. }
  51. header = {"Content-Type": "application/json"}
  52. # 发送请求调用接口
  53. response = requests.post(url=url, json=data, headers=header, timeout=30)
  54. logging.info('verify_code_identify response: %s', response.text)
  55. return response.json()
  56. class RotateIdent:
  57. """
  58. 旋转验证码处理类
  59. """
  60. def __init__(self, page: Page):
  61. self.page = page
  62. self.img_path = None
  63. def need_ident(self) -> bool:
  64. """
  65. 是否需需要识别
  66. """
  67. return playwright_util.is_element_present(self.page, '//div[@id="red-captcha-rotate"]/img')
  68. def handle_rotate(self):
  69. """
  70. 处理旋转验证码的核心方法
  71. """
  72. try_count = 0
  73. while self.need_ident() and try_count < 4:
  74. try_count += 1
  75. logging.info('开始处理旋转验证码,第 %s 次', try_count)
  76. tag2 = self.page.query_selector('//div[@class="red-captcha-slider"]')
  77. img = self.download_img()
  78. response = invoke_ident_api(img)
  79. if response['code'] != 200:
  80. logging.error(response['msg'])
  81. else:
  82. angle = response['data']['angle']
  83. # 使用鼠标操作进行点击并保持
  84. bbox = tag2.bounding_box()
  85. x_center = bbox['x'] + bbox['width'] / 2
  86. y_center = bbox['y'] + bbox['height'] / 2
  87. self.page.mouse.move(x_center, y_center)
  88. self.page.mouse.down()
  89. # 计算实际滑动距离 = 像素距离 + 前面空白距离
  90. move_x = angle * 0.79
  91. # 滑动滑块,且坐标多增加一个像素,添加误差
  92. self.page.mouse.move(x_center + move_x + 1, y_center + 5, steps=10)
  93. # 等待一段时间
  94. time.sleep(1)
  95. # 释放鼠标按钮
  96. self.page.mouse.up()
  97. time.sleep(5)
  98. if self.img_path is not None and os.path.exists(self.img_path):
  99. os.remove(self.img_path)
  100. def download_img(self):
  101. """
  102. 下载待处理的验证码并保存
  103. """
  104. # 找到【旋转图像】元素
  105. tag1 = self.page.query_selector('//div[@id="red-captcha-rotate"]/img')
  106. # 获取图像链接
  107. img_url = tag1.get_attribute('src')
  108. logging.info('ident url: %s', img_url)
  109. header = {
  110. "Host": "picasso-static.xiaohongshu.com",
  111. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
  112. "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
  113. "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
  114. "Accept-Encoding": "gzip, deflate, br",
  115. "Connection": "keep-alive",
  116. "Cookie": "xsecappid=login; a1=1896916369fehn0yq7nomanvre3fghfkj0zubt7zx50000120287; webId=75af27905db67b6fcb29a4899d200062; web_session=030037a385d8a837e5e590cace234a6e266fd5; gid=yYjKjyK484VKyYjKjyKqK89WjidxI8vAWIl6uuC0IhFdq728ikxiTD888yJ8JYW84DySKW0Y; webBuild=2.17.8; websectiga=634d3ad75ffb42a2ade2c5e1705a73c845837578aeb31ba0e442d75c648da36a; sec_poison_id=41187a04-9f82-4fbc-8b98-d530606b7696",
  117. "Upgrade-Insecure-Requests": "1",
  118. "If-Modified-Since": "Thu, 06 Jul 2023 11:42:07 GMT",
  119. "If-None-Match": '"7e53c313a9f321775e8f5e190de21081"',
  120. "TE": "Trailers",
  121. }
  122. # 下载图片
  123. return self.do_download_img(img_url)
  124. def do_download_img(self, img_url):
  125. # 下载图片
  126. response = requests.get(url=img_url, timeout=20)
  127. img = Image.open(BytesIO(response.content))
  128. img_folder = os.path.join(root_dir, 'train_img')
  129. # 如果目标文件夹不存在,则创建
  130. if not os.path.exists(img_folder):
  131. os.makedirs(img_folder)
  132. # 构建图片路径并保存图片
  133. self.img_path = os.path.join(img_folder, f'{int(time.time() * 1000)}.jpg')
  134. img.convert('RGB').save(self.img_path)
  135. return img