__init__.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """
  2. 通用api
  3. """
  4. import os
  5. import json
  6. import logging
  7. import time
  8. SUCCESS_RESPONSE = json.dumps({
  9. "code": 1,
  10. "msg": "请求成功",
  11. "success": True,
  12. }, ensure_ascii=False)
  13. accounts = set()
  14. directory = "./.data/tiktok"
  15. if not os.path.exists(directory):
  16. os.makedirs(directory)
  17. for entry in os.listdir(directory):
  18. # 构建完整的路径
  19. full_path = os.path.join(directory, entry)
  20. # 检查是否是文件夹
  21. if os.path.isdir(full_path):
  22. # 如果是文件夹,将文件夹名称添加到集合中
  23. accounts.add(entry)
  24. print("已存在的账号:", accounts)
  25. def contain_browser(account):
  26. return account in accounts
  27. def get_idle_account():
  28. from tiktok import lock_manager
  29. while True:
  30. for account in accounts:
  31. if not lock_manager.is_locked(account):
  32. return account
  33. time.sleep(1)
  34. def add_account(account):
  35. accounts.add(account)
  36. class BusinessException(Exception):
  37. """
  38. 自定义业务异常
  39. """
  40. def __init__(self, msg):
  41. super().__init__(self)
  42. self.msg = msg
  43. def raiseError(msg):
  44. """ """
  45. raise BusinessException(msg)
  46. def fail_response(msg: str):
  47. """
  48. 请求失败
  49. """
  50. return json.dumps({
  51. "code": 0,
  52. "msg": msg,
  53. "success": False,
  54. }, ensure_ascii=False)
  55. def assert_not_none(data, msg):
  56. """
  57. 断言方法
  58. """
  59. if data is None:
  60. raise BusinessException(msg)
  61. def success(data=None):
  62. if data is None:
  63. return SUCCESS_RESPONSE
  64. return json.dumps({
  65. "code": 1,
  66. "msg": "请求成功",
  67. "data": data,
  68. "success": True,
  69. }, ensure_ascii=False)