1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- """
- """
- import logging
- import os
- import random
- import threading
- import time
- import api
- class LockManager:
- """
- 全局锁管理,每个手机号只能打开一个上下文相同的浏览器
- """
- def __init__(self):
- self.locks = {}
- def acquire_lock(self, key):
- if key not in self.locks:
- self.locks[key] = threading.Lock()
- acquire = self.locks[key].acquire(timeout=300)
- if acquire:
- logging.info(f"{key} 获取锁成功")
- def release_lock(self, key):
- if key in self.locks:
- self.locks[key].release()
- logging.info(f"{key} 释放锁成功")
- def is_locked(self, key):
- """
- 检查给定的键是否处于锁定状态
- """
- if key in self.locks:
- return self.locks[key].locked()
- else:
- return False
- def add_phone(lock_key: str, phones: list):
- directory = f"./.data/{lock_key}"
- if not os.path.exists(directory):
- os.makedirs(directory)
- for entry in os.listdir(directory):
- # 构建完整的路径
- full_path = os.path.join(directory, entry)
- # 检查是否是文件夹
- if os.path.isdir(full_path):
- # 如果是文件夹,将文件夹名称添加到集合中
- phones.append(entry)
- print(f"已存在的{lock_key}账号:", phones)
- lock_manager_dict = {
- "huitun": LockManager(),
- "xhs": LockManager()
- }
- lock_phone_dict = {
- "huitun": list(),
- "xhs": list()
- }
- for key in lock_phone_dict.keys():
- add_phone(key, lock_phone_dict[key])
- def get_idle_phone(key: str):
- lock_manager = lock_manager_dict[key]
- api.assert_not_none(lock_manager, "lock_manager is None")
- while True:
- phone_list = lock_phone_dict[key]
- for phone in phone_list:
- if not lock_manager.is_locked(phone):
- random.shuffle(phone_list)
- return phone
- time.sleep(1)
|