lock_util.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. """
  2. """
  3. import logging
  4. import os
  5. import random
  6. import threading
  7. import time
  8. import api
  9. class LockManager:
  10. """
  11. 全局锁管理,每个手机号只能打开一个上下文相同的浏览器
  12. """
  13. def __init__(self):
  14. self.locks = {}
  15. def acquire_lock(self, key):
  16. if key not in self.locks:
  17. self.locks[key] = threading.Lock()
  18. acquire = self.locks[key].acquire(timeout=300)
  19. if acquire:
  20. logging.info(f"{key} 获取锁成功")
  21. def release_lock(self, key):
  22. if key in self.locks:
  23. self.locks[key].release()
  24. logging.info(f"{key} 释放锁成功")
  25. def is_locked(self, key):
  26. """
  27. 检查给定的键是否处于锁定状态
  28. """
  29. if key in self.locks:
  30. return self.locks[key].locked()
  31. else:
  32. return False
  33. def add_phone(lock_key: str, phones: list):
  34. directory = f"./.data/{lock_key}"
  35. if not os.path.exists(directory):
  36. os.makedirs(directory)
  37. for entry in os.listdir(directory):
  38. # 构建完整的路径
  39. full_path = os.path.join(directory, entry)
  40. # 检查是否是文件夹
  41. if os.path.isdir(full_path):
  42. # 如果是文件夹,将文件夹名称添加到集合中
  43. phones.append(entry)
  44. print(f"已存在的{lock_key}账号:", phones)
  45. lock_manager_dict = {
  46. "huitun": LockManager(),
  47. "xhs": LockManager()
  48. }
  49. lock_phone_dict = {
  50. "huitun": list(),
  51. "xhs": list()
  52. }
  53. for key in lock_phone_dict.keys():
  54. add_phone(key, lock_phone_dict[key])
  55. def get_idle_phone(key: str):
  56. lock_manager = lock_manager_dict[key]
  57. api.assert_not_none(lock_manager, "lock_manager is None")
  58. while True:
  59. phone_list = lock_phone_dict[key]
  60. for phone in phone_list:
  61. if not lock_manager.is_locked(phone):
  62. random.shuffle(phone_list)
  63. return phone
  64. time.sleep(1)