""" """ import logging import threading class LockManager: """ 全局锁管理,每个账号只能打开一个上下文相同的浏览器 """ def __init__(self): self.locks = {} def acquire_lock(self, key): if key not in self.locks: self.locks[key] = threading.Lock() acquire = self.locks[key].acquire(timeout=300) if acquire: logging.info(f"{key} 获取锁成功") def release_lock(self, key): if key in self.locks: self.locks[key].release() logging.info(f"{key} 释放锁成功") def is_locked(self, key): """ 检查给定的键是否处于锁定状态 """ if key in self.locks: return self.locks[key].locked() else: return False