""" 通用api """ import os import json import logging import time SUCCESS_RESPONSE = json.dumps({ "code": 1, "msg": "请求成功", "success": True, }, ensure_ascii=False) accounts = set() directory = "./.data/facebook" if not os.path.exists(directory): os.makedirs(directory) for entry in os.listdir(directory): # 构建完整的路径 full_path = os.path.join(directory, entry) # 检查是否是文件夹 if os.path.isdir(full_path): # 如果是文件夹,将文件夹名称添加到集合中 accounts.add(entry) print("已存在的账号:", accounts) def contain_browser(account): return account in accounts def get_idle_account(): from facebook import lock_manager while True: for account in accounts: if not lock_manager.is_locked(account): return account time.sleep(1) def add_account(account): accounts.add(account) class BusinessException(Exception): """ 自定义业务异常 """ def __init__(self, msg): super().__init__(self) self.msg = msg def raiseError(msg): """ """ raise BusinessException(msg) def fail_response(msg: str): """ 请求失败 """ return json.dumps({ "code": 0, "msg": msg, "success": False, }, ensure_ascii=False) def assert_not_none(data, msg): """ 断言方法 """ if data is None: raise BusinessException(msg) def success(data=None): if data is None: return SUCCESS_RESPONSE return json.dumps({ "code": 1, "msg": "请求成功", "data": data, "success": True, }, ensure_ascii=False)