Jelajahi Sumber

feat: tiktok rpa 获取视频或者图文信息

wuwenyi 6 bulan lalu
melakukan
b2541b5593
11 mengubah file dengan 583 tambahan dan 0 penghapusan
  1. 73 0
      .gitignore
  2. 4 0
      README.md
  3. 89 0
      api/__init__.py
  4. 40 0
      api/login.py
  5. 111 0
      api/search.py
  6. 72 0
      app.py
  7. 55 0
      browser/__init__.py
  8. 1 0
      requirements.txt
  9. 6 0
      stealth.min.js
  10. 54 0
      tiktok/__init__.py
  11. 78 0
      tiktok/data_handler.py

+ 73 - 0
.gitignore

@@ -0,0 +1,73 @@
+# ---> Python
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+Lib/
+Scripts/
+logs/
+**/__pycache__/
+
+### IntelliJ IDEA ###
+.idea/
+*.iws
+*.iml
+*.ipr
+
+# 浏览器上下文数据
+.data
+

+ 4 - 0
README.md

@@ -0,0 +1,4 @@
+# py-tiktok-robot
+
+python的tiktok RPA
+

+ 89 - 0
api/__init__.py

@@ -0,0 +1,89 @@
+"""
+通用api
+"""
+import os
+import json
+import logging
+import time
+
+SUCCESS_RESPONSE = json.dumps({
+    "code": 1,
+    "msg": "请求成功",
+    "success": True,
+}, ensure_ascii=False)
+
+accounts = set()
+directory = "./.data/tiktok"
+if not os.path.exists(directory):
+    os.makedirs(directory)
+for entry in os.listdir(directory):
+    # 构建完整的路径
+    full_path = os.path.join(directory, entry)
+    # 检查是否是文件夹
+    if os.path.isdir(full_path):
+        # 如果是文件夹,将文件夹名称添加到集合中
+        accounts.add(entry)
+
+print("已存在的账号:", accounts)
+
+
+def contain_browser(account):
+    return account in accounts
+
+
+def get_idle_account():
+    from tiktok import lock_manager
+    while True:
+        for account in accounts:
+            if not lock_manager.is_locked(account):
+                return account
+        time.sleep(1)
+
+
+def add_account(account):
+    accounts.add(account)
+
+
+class BusinessException(Exception):
+    """
+    自定义业务异常
+    """
+
+    def __init__(self, msg):
+        super().__init__(self)
+        self.msg = msg
+
+
+def raiseError(msg):
+    """ """
+    raise BusinessException(msg)
+
+
+def fail_response(msg: str):
+    """
+    请求失败
+    """
+    return json.dumps({
+        "code": 0,
+        "msg": msg,
+        "success": False,
+    }, ensure_ascii=False)
+
+
+def assert_not_none(data, msg):
+    """
+    断言方法
+    """
+    if data is None:
+        raise BusinessException(msg)
+
+
+def success(data=None):
+    if data is None:
+        return SUCCESS_RESPONSE
+    return json.dumps({
+        "code": 1,
+        "msg": "请求成功",
+        "data": data,
+        "success": True,
+    }, ensure_ascii=False)

+ 40 - 0
api/login.py

@@ -0,0 +1,40 @@
+"""
+登录接口
+"""
+
+from flask import Blueprint
+from flask import request
+
+import api
+import tiktok
+
+login_opt = Blueprint('login', __name__)
+
+
+@login_opt.route('/createPage', methods=["POST"])
+def create_page():
+    """
+    登录接口
+    :return: 1-登录成功 2-需要验证码
+    """
+    request_body = request.json
+    account = request_body.get('account')
+    login_result = 1
+    if not api.contain_browser(account):
+        browser = tiktok.TikTokBrowser(account)
+        browser.__init_browser__(playwright=None)
+        api.add_account(account)
+    return api.success(login_result)
+
+
+@login_opt.route('/login', methods=["POST"])
+def login():
+    """
+    登录接口
+    :return: 1-登录成功 2-需要验证码
+    """
+    request_body = request.json
+    phone = request_body.get('phone')
+    browser = douyin.DouYinBrowser(phone)
+    login_result = browser.login(request_body.get('password'))
+    return api.success(login_result)

+ 111 - 0
api/search.py

@@ -0,0 +1,111 @@
+"""
+搜索API
+"""
+from flask import Blueprint
+from flask import request
+
+import api
+from tiktok import TikTokBrowser
+
+search_opt = Blueprint('search', __name__)
+
+
+@search_opt.route('/image-text-by-keyword', methods=["POST"])
+def search_keyword():
+    """
+    根据关键字搜索抖音图文
+    :return:
+    """
+    request_body = request.json
+    keyword = request_body.get('keyword')
+    api.assert_not_none(keyword, '关键字不能为空')
+    phone = api.get_idle_account()
+    browser = TikTokBrowser(phone)
+    result = browser.search_image_text(keyword)
+    return api.success(result)
+
+
+@search_opt.route('/image-text-by-author', methods=["POST"])
+def image_text_by_author():
+    """
+    根据作者链接搜索抖音图文
+    :return:
+    """
+    request_body = request.json
+    author_url = request_body.get('authorUrl')
+    size = request_body.get('size')
+    api.assert_not_none(author_url, '作者链接不能为空')
+    phone = api.get_idle_account()
+    browser = TikTokBrowser(phone)
+    result = browser.search_image_text_by_author(author_url, size)
+    return api.success(result)
+
+
+@search_opt.route('/user', methods=["POST"])
+def search_user():
+    """
+    搜索抖音用户信息
+    :return:
+    """
+    request_body = request.json
+    user_url = request_body.get('url')
+    api.assert_not_none(user_url, '用户链接不能为空')
+    browser = TikTokBrowser(api.get_idle_account())
+    result = browser.search_user(user_url)
+    return api.success(result)
+
+
+@search_opt.route('/batch-users', methods=["POST"])
+def batch_users():
+    """
+    批量搜索抖音用户信息
+    :return:
+    """
+    request_body = request.json
+    user_urls = request_body.get('urls')
+    api.assert_not_none(user_urls, '用户链接不能为空')
+    browser = TikTokBrowser(api.get_idle_phone())
+    result = browser.batch_users(user_urls)
+    return api.success(result)
+
+
+@search_opt.route('/batch-user-polish', methods=["POST"])
+def batch_user_unique_id():
+    """
+    批量搜索抖音用户抖音号
+    :return:
+    """
+    request_body = request.json
+    sec_uids = request_body.get('secUids')
+    api.assert_not_none(sec_uids, '用户id不能为空')
+    browser = TikTokBrowser(api.get_idle_account())
+    result = browser.batch_user_polish(sec_uids)
+    return api.success(result)
+
+
+@search_opt.route('/item', methods=["POST"])
+def search_item():
+    """
+    根据关键字搜索抖音笔记、视频
+    :return:
+    """
+    request_body = request.json
+    note_url = request_body.get('url')
+    api.assert_not_none(note_url, '作品链接不能为空')
+    browser = TikTokBrowser(api.get_idle_account())
+    result = browser.invoke(func=browser.search_item, url=note_url)
+    return api.success(result)
+
+
+@search_opt.route('/batch-notes', methods=["POST"])
+def batch_notes():
+    """
+    根据关键字搜索抖音笔记、视频
+    :return:
+    """
+    request_body = request.json
+    note_urls = request_body.get('urls')
+    api.assert_not_none(note_urls, '作品链接不能为空')
+    browser = TikTokBrowser(api.get_idle_account())
+    result = browser.batch_notes(note_urls)
+    return api.success(result)

+ 72 - 0
app.py

@@ -0,0 +1,72 @@
+import logging
+import os
+import traceback
+
+from flask import Flask, request
+
+from api import *
+from api import login, search
+
+logs_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
+os.makedirs(logs_folder, exist_ok=True)
+current_folder = os.path.dirname(os.path.abspath(__file__))
+file_handler = logging.FileHandler(filename=f"{current_folder}/logs/app.log", encoding="utf-8")
+
+# 配置日志格式
+formatter = logging.Formatter("%(asctime)s %(levelname)s[%(funcName)s:%(lineno)s]:%(message)s", "%Y-%m-%d %H:%M:%S")
+file_handler.setFormatter(formatter)
+console_handler = logging.StreamHandler()
+
+# 配置日志记录器
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+logger.addHandler(file_handler)
+# 开发的时候打开这行注释,日志会打印在控制台上面
+logger.addHandler(console_handler)
+
+app = Flask(__name__)
+
+app.register_blueprint(login.login_opt, url_prefix="/login")
+app.register_blueprint(search.search_opt, url_prefix="/search")
+
+
+@app.errorhandler(Exception)
+def handle_exception(error: Exception):
+    """
+    全局异常处理
+    """
+    status_code = 500
+    if isinstance(error, BusinessException):
+        status_code = 200
+        response = fail_response(error.msg)
+    else:
+        logging.error(error)
+        traceback.print_exc()
+        response = fail_response(str(error))
+    return response, status_code
+
+
+@app.before_request
+def log_request():
+    """
+    打印请求
+    """
+    logging.info('Request: %s %s', request.method, request.url)
+    logging.info('Request Body: %s', request.get_data(as_text=True))
+
+
+@app.after_request
+def log_response(response):
+    """
+    打印返回
+    """
+    data = response.get_data(as_text=True)
+    if len(data) > 1000:
+        logging.info('Response Body: %s', data[:1000] + "...")
+    else:
+        logging.info('Response Body: %s', data)
+    return response
+
+
+if __name__ == '__main__':
+    app.run(host="0.0.0.0", port=8999, threaded=True)

+ 55 - 0
browser/__init__.py

@@ -0,0 +1,55 @@
+"""
+
+"""
+import platform
+from abc import abstractmethod
+
+import api
+from playwright.sync_api import Playwright, sync_playwright
+
+SPLIT_CHAR = '\\' if platform.system() == 'Windows' else '/'
+
+
+class BaseBrowser:
+    def __init__(self, account: str, playwright=None):
+        api.assert_not_none(account, "账号不能为空")
+        self.account = account
+        self.browser = None
+        self.page = None
+        self.result = None
+        self.list_result = []
+        self.map_result = {}
+        self.has_more = False
+        self.playwright = playwright
+
+    def __init_browser__(self, playwright):
+        if playwright:
+            self.playwright = playwright
+        else:
+            self.playwright = sync_playwright().start()
+        self.browser = self.playwright.chromium.launch_persistent_context(
+            proxy=None,
+            user_data_dir=f'.{SPLIT_CHAR}.data{SPLIT_CHAR}{self.__get_name__()}{SPLIT_CHAR}{self.account}',
+            headless=False,
+            slow_mo=1000,
+            channel="chrome",
+            ignore_https_errors=True,
+            user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
+                       'Chrome/126.0.0.0 Safari/537.36',
+            args=[
+                '--disable-blink-features=AutomationControlled',
+                '--incognito',
+                '--ignore-certificate-errors-spki-list',
+                '--disable-web-security',  # 禁用 Web 安全性,类似于 ChromeOptions 中的 --ignore-certificate-errors-spki-list
+                '--no-sandbox',  # 禁用沙盒模式
+                '--disable-dev-shm-usage',  # 禁用/dev/shm使用
+                '--disable-features=site-per-process',  # 禁用每个站点的进程,类似于 ChromeOptions 中的 --no-sandbox
+                '--ignore-certificate-errors',  # 忽略证书错误
+                '--disable-features=AutomationControlled'  # 禁用与自动化相关的特性
+            ])
+        self.browser.add_init_script(path="./stealth.min.js")
+        self.page = self.browser.pages[0]
+
+    @abstractmethod
+    def __get_name__(self):
+        pass

+ 1 - 0
requirements.txt

@@ -0,0 +1 @@
+playwright==1.46.0

File diff ditekan karena terlalu besar
+ 6 - 0
stealth.min.js


+ 54 - 0
tiktok/__init__.py

@@ -0,0 +1,54 @@
+"""
+
+"""
+import json
+import logging
+from playwright.sync_api import sync_playwright, Page, Playwright
+
+import api
+from browser import BaseBrowser
+from util.lock_util import LockManager
+from tiktok.data_handler import *
+
+IG_URL = 'https://www.tiktok.com/'
+
+lock_manager = LockManager()
+
+
+class TikTokBrowser(BaseBrowser):
+
+    def __init__(self, account: str, playwright=None):
+        super().__init__(account, playwright)
+        self.id = None
+
+    def __get_name__(self):
+        return 'tiktok'
+
+    def invoke(self, func, *args, **kwargs):
+        lock_manager.acquire_lock(self.account)
+        try:
+            with sync_playwright() as playwright:
+                self.__init_browser__(playwright)
+                return func(*args, **kwargs)
+        finally:
+            lock_manager.release_lock(self.account)
+
+    def search_item(self, url):
+        api.assert_not_none(url, 'url不能为空')
+        self.result = None
+        self.map_result = {}
+        self.id = url.lstrip('/').split('/')[-1]
+        self.browser.on('response', self.item_handler)
+        self.page.goto(url)
+        self.page.wait_for_timeout(2000)
+        return self.result
+
+    def item_handler(self, response):
+        if response is None or response.status != 200 or self.result is not None:
+            return
+        if self.id in response.url:
+            if '/api/item/detail/' in response.url:
+                self.result = get_detail_by_response(response)
+            elif response.request.resource_type == 'document':
+                logging.info(f'get {self.id} item response')
+                self.result = get_video_by_response(response)

+ 78 - 0
tiktok/data_handler.py

@@ -0,0 +1,78 @@
+"""
+
+"""
+import json
+import re
+
+import jsonpath
+
+item_require_fields = [
+    'id', 'author', 'authorStats', 'createTime', 'desc', 'imagePost', 'stats',' bitrateInfo', 'cover', 'CategoryType'
+]
+
+
+def get_video_by_response(response):
+    item_struct = get_item_json2(response.text())
+    item_struct['cover'] = item_struct.get('video').get('cover')
+    item_struct = {k: v for k, v in item_struct.items() if k in item_require_fields}
+    return item_struct
+
+
+def get_video_json(html_content):
+    # 逐行读取文件
+    inside_items = False  # 标志是否进入 items 部分
+    items_buffer = ""  # 临时保存 JSON 部分
+
+    for line in html_content.splitlines():
+        # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
+        line = line.strip()  # 去掉多余的空白字符
+
+        if '"itemStruct"' in line:
+            items_buffer = '{'
+            inside_items = True  # 发现目标字段
+            continue
+
+        # 处理 items 数组的部分
+        if inside_items:
+            items_buffer += line  # 累积读取多行
+
+            # 如果找到了 JSON 数组的结束
+            if '}' in items_buffer:
+                try:
+                    # 尝试解析 JSON
+                    data = json.loads(items_buffer)
+                    # 获取第一个 item
+                    return data
+                except json.JSONDecodeError:
+                    continue  # 如果解析失败,继续读取下一行
+                    # '<script id="__UNIVERSAL_DATA_FOR_REHYDRATION__" type="application/json">'
+    return None  # 如果没有找到匹配项
+
+
+def get_item_json2(html_content):
+    # 逐行读取文件
+    for line in html_content.splitlines():
+        # 通过简单的规则修复非标准 JSON 格式(可以根据实际情况定制)
+        line = line.strip()  # 去掉多余的空白字符
+
+        if '"itemStruct"' in line:
+            script_pattern = re.compile(r'<script\s+id="__UNIVERSAL_DATA_FOR_REHYDRATION__"\s+type="application/json"[^>]*>(.*?)</script>', re.DOTALL)
+
+            # 查找所有匹配的 <script> 标签内容
+            json_str = re.findall(script_pattern, line)
+            data = json.loads(json_str[0])
+            jsonpath_expr = '$..itemStruct'
+            data = jsonpath.jsonpath(data, jsonpath_expr)
+            if data:
+                return data[0]
+    return None  # 如果没有找到匹配项
+
+def get_detail_by_response(response):
+    json = response.json()
+    item_struct = json.get('itemInfo').get('itemStruct')
+    item_struct = {k: v for k, v in item_struct.items() if k in item_require_fields}
+    return item_struct
+
+# with open('../aweme_video.txt', 'r') as file:
+#     data = get_video_json(file.read())
+#     print(data)

Beberapa file tidak ditampilkan karena terlalu banyak file yang berubah dalam diff ini