Explorar el Código

feat: 灰豚自动检测登录状态并登录

wuwenyi hace 4 meses
padre
commit
b357b44c26
Se han modificado 3 ficheros con 215 adiciones y 4 borrados
  1. 13 1
      README.md
  2. 21 3
      huitun/__init__.py
  3. 181 0
      huitun/captcha_ident.py

+ 13 - 1
README.md

@@ -1,3 +1,15 @@
 # py-huitun-robot
 
-灰豚数据rpa
+灰豚数据rpa
+
+page-test
+
+```python
+from playwright.sync_api import sync_playwright, Page, Playwright
+import huitun
+browser = huitun.HuiTunBrowser('13417350072')
+
+browser.__init_browser__()
+page = browser.page
+page.goto(huitun.HUITUN_URL)
+```

+ 21 - 3
huitun/__init__.py

@@ -2,6 +2,7 @@
 
 """
 import logging
+from huitun.captcha_ident import CaptchaIdent
 
 from playwright.sync_api import sync_playwright, Page, Playwright
 
@@ -15,6 +16,10 @@ lock_manager = LockManager()
 
 
 class HuiTunBrowser(BaseBrowser):
+    def __init__(self, phone: str, playwright=None):
+        super().__init__(phone, playwright)
+        self.password = None
+
     def __get_name__(self):
         return 'huitun'
 
@@ -25,6 +30,15 @@ class HuiTunBrowser(BaseBrowser):
         """
         self.__init_browser__()
         self.page.goto(HUITUN_URL)
+        self.password = password
+        self.login_if_need()
+        self.page.wait_for_timeout(30_000)
+        self.close()
+
+    def login_if_need(self):
+        """
+        登录灰豚
+        """
         login_info_expired = self.page.query_selector('.ant-btn-primary:has-text("知道了")')
         if login_info_expired is not None:
             login_info_expired.click()
@@ -34,15 +48,19 @@ class HuiTunBrowser(BaseBrowser):
                 if pwd_login is not None:
                     pwd_login.click()
             self.page.get_by_placeholder('请输入手机号').type(self.phone)
-            self.page.get_by_placeholder('6-15位数字与字母组合').type(password)
+            self.page.get_by_placeholder('6-15位数字与字母组合').type(self.password)
             self.page.get_by_text('登 录', exact=True).click()
-            self.page.wait_for_timeout(30_000)
-        self.close()
+            # 验证码登录
+            captcha_frame = self.page.frames[1]
+            if captcha_frame is not None:
+                captcha_tool = CaptchaIdent(self.page)
+                captcha_tool.start()
 
     def search_note(self, tag_name: str, size: int):
         lock_manager.acquire_lock(self.phone)
         try:
             self.__init_browser__()
+            self.login_if_need()
             self.list_result = []
             self.has_more = True
             api.assert_not_none(tag_name, "标签不能为空")

+ 181 - 0
huitun/captcha_ident.py

@@ -0,0 +1,181 @@
+import random
+import re
+
+import requests
+
+import cv2 as cv
+import numpy as np
+from PIL import Image
+from playwright.sync_api import sync_playwright, Page
+
+from util.playwright_util import is_element_present
+
+
+class CaptchaIdent:
+    """
+    处理灰豚登录的滑块验证码
+    """
+    def __init__(self, page):
+        self.page = page
+        self.frame = self.page.frames[1]
+
+    def start(self):
+        for i in range(5):
+            self.page.wait_for_timeout(1000)
+            self.get_slide_bg_img()
+            start_x = self.get_slide_block_img_and_start_x()
+            distance1, distance2 = self.get_slide_distance(start_x)
+            slide_result = self.move_to_notch(distance1, distance2)
+            if not slide_result:
+                self.refresh_captcha()
+            else:
+                return
+        raise Exception("滑块验证失败")
+
+    def get_slide_bg_img(self):
+        """截取滑动验证码背景图片"""
+        if self.frame is not None:
+            bg_ele = self.frame.query_selector('.tc-bg-img')
+            bg_style = bg_ele.evaluate(
+                "element => window.getComputedStyle(element).getPropertyValue('background-image')")
+            bg_img = re.search(r'url\("([^"]+)"\)', bg_style).group(1)
+            r = requests.get(bg_img)
+            with open("./slide_bg.png", "wb") as f:
+                f.write(r.content)
+
+
+    def get_slide_block_img_and_start_x(self):
+        """获取滑块图片以及初始x坐标"""
+        print("正在获取滑块图片")
+
+        # 首先保存整个登录背景截图
+        self.page.wait_for_timeout(2000)
+        slideBg = self.frame.query_selector('#slideBg')
+        slideBg.screenshot(path="slide_bg.png")
+
+        # 获取滑动验证码所在的iframe
+        captcha_frame = self.frame
+
+        # 获取滑块图片
+        # .tc-fg-item对应的有三个元素,一个是目标滑块,一个是滑轨,还有一个是滑轨上的按钮
+        for i in range(3):
+            slide_block_ele = captcha_frame.locator(".tc-fg-item").nth(i)
+            slide_block_style = slide_block_ele.get_attribute("style")
+
+            # 滑轨按钮元素的style值中不包含url字符串
+            if "url" not in slide_block_style:
+                continue
+
+            # 从元素的style值中分析得出只有目标滑块的top值小于150
+            top_value = re.search(r'top: (.+)px;', slide_block_style).groups()[0]
+            if float(top_value) > 150:
+                continue
+
+            # 获取x坐标
+            slide_block_x = float(re.search(r'left: (.+)px; top: ', slide_block_style).groups()[0])
+            slide_block_y = float(top_value)
+
+            # 通过滑块位置,从背景图中截取滑块图片  # cropped_image = image.crop((left, top, right, bottom))
+            slide_block_rect = slide_block_ele.bounding_box()
+            bg = Image.open("slide_bg.png")
+            # offset = slide_block_rect["width"] // 5  # 从背景图上截取会混入滑块周围的一些像素点,所以加一个偏移值,截取到滑块内部的图片。
+            slide_block_img = bg.crop((slide_block_x + 4, slide_block_y + 4,
+                                       slide_block_x + slide_block_rect["width"] - 4,
+                                       slide_block_y + slide_block_rect["height"] - 4))
+            slide_block_img.save("slide_block.png")
+            return slide_block_x + 4
+
+    def get_slide_distance(self, start_x):
+        """获取滑动距离"""
+        print("正在获取滑动距离")
+        # 通过opencv比较图片,获取缺口位置
+        slide_bg_img = cv.imread("./slide_bg.png")
+        slide_bg_img = self.set_contrast_brightness(slide_bg_img, 0.4, 0)
+        slide_block_img = cv.imread("./slide_block.png")
+        slide_block_img = self.set_contrast_brightness(slide_block_img, 0.4, 0)
+        cv.imwrite("./slide_block_handled.png", slide_block_img)
+        result = cv.matchTemplate(slide_block_img, slide_bg_img, cv.TM_CCOEFF_NORMED)
+        minVal, maxVal, minLoc, maxLoc = cv.minMaxLoc(result)
+
+        # 缺口的x坐标
+        notch_x1 = minLoc[0]
+        notch_x2 = maxLoc[0]
+
+        # 距离
+        distance1 = notch_x1 - start_x
+        distance2 = notch_x2 - start_x
+        return distance1, distance2
+
+    @staticmethod
+    def set_contrast_brightness(frame, contrast_value, brightness_value):
+        if not contrast_value:
+            contrast_value = 0.0
+
+        if not brightness_value:
+            brightness_value = 0
+
+        blank = np.zeros(frame.shape, frame.dtype)
+        frame = cv.addWeighted(frame, contrast_value, blank, 1 - contrast_value, brightness_value)
+        return frame
+
+    @staticmethod
+    def get_tracks(distance):
+        """获取移动轨迹"""
+        tracks = []  # 移动轨迹
+        current = 0  # 当前位移
+        mid = distance * 3/4   # 减速阈值
+        t = 0.5  # 计算间隔
+        v = 1  # 初始速度
+
+        while current < distance:
+            if current < mid:
+                a = random.randint(5, 10)  # 加速度为正5
+            else:
+                a = random.randint(-5, -3)  # 加速度为负3
+
+            v0 = v  # 初速度 v0
+            v = v0 + a * t  # 当前速度
+            move = v0 * t + 1 / 2 * a * t * t  # 移动距离
+            current += move
+            tracks.append(round(current))
+
+        return tracks
+
+    def move_to_notch(self, distance1, distance2):
+        """移动滑轨按钮到缺口处"""
+        # 获取滑动验证码所在的iframe
+        captcha_iframe = self.frame
+
+        for i in range(2):
+            # 获取按钮位置,将鼠标移到上方并按下
+            slider_btn_rect = captcha_iframe.get_by_alt_text("slider").bounding_box()
+            self.page.mouse.move(slider_btn_rect['x'], slider_btn_rect['y'])
+            self.page.mouse.down()
+
+            distance = [distance1, distance2][i]
+            if distance <= 0:  # 距离不可能小于等于0
+                continue
+
+            print(f"正在进行第{i + 1}次滑动,滑动距离{distance}")
+            tracks = self.get_tracks(distance)
+            for x in tracks:
+                self.page.mouse.move(slider_btn_rect['x'] + x, random.randint(-5, 5) + slider_btn_rect['y'])
+            self.page.mouse.move(slider_btn_rect['x'] + tracks[-1] + 5, random.randint(-5, 5) + slider_btn_rect['y'])
+            self.page.mouse.move(slider_btn_rect['x'] + tracks[-1] - 5, random.randint(-5, 5) + slider_btn_rect['y'])
+            self.page.mouse.up()
+
+            # 滑动结束后等待一段时间
+            self.page.wait_for_timeout(2000)
+
+            # 寻找按钮是否还存在,不存在的话表明已通过滑动验证码,存在的话尝试下一个距离
+            if not is_element_present(self.page, '.ant-modal-body'):
+                print("滑动验证通过")
+                return True
+        return False
+
+    def refresh_captcha(self):
+        """刷新验证码"""
+        # 获取滑动验证码所在的iframe
+        print("刷新验证码")
+        self.frame = self.page.frames[1]
+        self.page.wait_for_timeout(2000)