""" """ import logging import os import random import threading import time import api class LockManager: """ 全局锁管理,每个手机号只能打开一个上下文相同的浏览器 """ def __init__(self): self.locks = {} def acquire_lock(self, key): if key not in self.locks: self.locks[key] = threading.Lock() acquire = self.locks[key].acquire(timeout=300) if acquire: logging.info(f"{key} 获取锁成功") def release_lock(self, key): if key in self.locks: self.locks[key].release() logging.info(f"{key} 释放锁成功") def is_locked(self, key): """ 检查给定的键是否处于锁定状态 """ if key in self.locks: return self.locks[key].locked() else: return False def add_phone(lock_key: str, phones: list): directory = f"./.data/{lock_key}" if not os.path.exists(directory): os.makedirs(directory) for entry in os.listdir(directory): # 构建完整的路径 full_path = os.path.join(directory, entry) # 检查是否是文件夹 if os.path.isdir(full_path): # 如果是文件夹,将文件夹名称添加到集合中 phones.append(entry) print(f"已存在的{lock_key}账号:", phones) lock_manager_dict = { "huitun": LockManager(), "xhs": LockManager() } lock_phone_dict = { "huitun": list(), "xhs": list() } for key in lock_phone_dict.keys(): add_phone(key, lock_phone_dict[key]) def get_idle_phone(key: str): lock_manager = lock_manager_dict[key] api.assert_not_none(lock_manager, "lock_manager is None") while True: phone_list = lock_phone_dict[key] for phone in phone_list: if not lock_manager.is_locked(phone): random.shuffle(phone_list) return phone time.sleep(1)