Merge pull request #2 from thaitien280401-stack/copilot/fix-no-threads-error

Preflight access token validation before pipeline execution
pull/2482/head
Truc Phan Dang Thien 3 days ago committed by GitHub
commit 36b702eceb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -199,6 +199,12 @@ if __name__ == "__main__":
)
sys.exit()
# Kiểm tra access token trước khi chạy (chỉ cho Threads mode)
if not args.reddit:
from utils.check_token import preflight_check
preflight_check()
try:
if args.mode == "scheduled":
# Chế độ lên lịch tự động
@ -285,10 +291,25 @@ if __name__ == "__main__":
except (KeyError, TypeError):
pass
print_step(
f"Đã xảy ra lỗi! Vui lòng thử lại hoặc báo lỗi trên GitHub.\n"
f"Phiên bản: {__VERSION__}\n"
f"Lỗi: {err}\n"
f'Config: {config.get("settings", {})}'
)
# Import here to avoid circular import at module level
from threads.threads_client import ThreadsAPIError
if isinstance(err, ThreadsAPIError):
print_step(
f"❌ Lỗi xác thực Threads API!\n"
f"Phiên bản: {__VERSION__}\n"
f"Lỗi: {err}\n\n"
"Hướng dẫn khắc phục:\n"
"1. Kiểm tra access_token trong config.toml còn hiệu lực không\n"
"2. Lấy token mới tại: https://developers.facebook.com/docs/threads/get-started\n"
"3. Đảm bảo token có quyền: threads_basic_read\n"
"4. Kiểm tra user_id khớp với tài khoản Threads"
)
else:
print_step(
f"Đã xảy ra lỗi! Vui lòng thử lại hoặc báo lỗi trên GitHub.\n"
f"Phiên bản: {__VERSION__}\n"
f"Lỗi: {err}\n"
f'Config: {config.get("settings", {})}'
)
raise err

@ -50,6 +50,11 @@ def run_pipeline(post_id: Optional[str] = None) -> Optional[str]:
print_step("🚀 Bắt đầu pipeline tạo video...")
# Preflight: kiểm tra access token trước khi gọi API
from utils.check_token import preflight_check
preflight_check()
try:
# Step 1: Lấy nội dung từ Threads
print_step("📱 Bước 1: Lấy nội dung từ Threads...")

@ -6,6 +6,7 @@ Docs: https://developers.facebook.com/docs/threads
"""
import re
import time as _time
from typing import Dict, List, Optional
import requests
@ -18,6 +19,20 @@ from utils.voice import sanitize_text
THREADS_API_BASE = "https://graph.threads.net/v1.0"
# Retry configuration for transient failures
_MAX_RETRIES = 3
_RETRY_DELAY_SECONDS = 2
_REQUEST_TIMEOUT_SECONDS = 30
class ThreadsAPIError(Exception):
"""Lỗi khi gọi Threads API (token hết hạn, quyền thiếu, v.v.)."""
def __init__(self, message: str, error_type: str = "", error_code: int = 0):
self.error_type = error_type
self.error_code = error_code
super().__init__(message)
class ThreadsClient:
"""Client để tương tác với Threads API (Meta)."""
@ -26,21 +41,131 @@ class ThreadsClient:
self.access_token = settings.config["threads"]["creds"]["access_token"]
self.user_id = settings.config["threads"]["creds"]["user_id"]
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
}
)
def _get(self, endpoint: str, params: Optional[dict] = None) -> dict:
"""Make a GET request to the Threads API."""
"""Make a GET request to the Threads API with retry logic.
Raises:
ThreadsAPIError: If the API returns an error in the response body.
requests.HTTPError: If the HTTP request fails after retries.
"""
url = f"{THREADS_API_BASE}/{endpoint}"
if params is None:
params = {}
params["access_token"] = self.access_token
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
last_exception: Optional[Exception] = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
response = self.session.get(url, params=params, timeout=_REQUEST_TIMEOUT_SECONDS)
# Check for HTTP-level errors with detailed messages
if response.status_code == 401:
raise ThreadsAPIError(
"Access token không hợp lệ hoặc đã hết hạn (HTTP 401). "
"Vui lòng cập nhật access_token trong config.toml.",
error_type="OAuthException",
error_code=401,
)
if response.status_code == 403:
raise ThreadsAPIError(
"Không có quyền truy cập (HTTP 403). Kiểm tra quyền "
"threads_basic_read trong Meta Developer Portal.",
error_type="PermissionError",
error_code=403,
)
response.raise_for_status()
data = response.json()
# Meta Graph API có thể trả về 200 nhưng body chứa error
if "error" in data:
err = data["error"]
error_msg = err.get("message", "Unknown API error")
error_type = err.get("type", "")
error_code = err.get("code", 0)
raise ThreadsAPIError(
f"Threads API error: {error_msg} (type={error_type}, code={error_code})",
error_type=error_type,
error_code=error_code,
)
return data
except (requests.ConnectionError, requests.Timeout) as exc:
last_exception = exc
if attempt < _MAX_RETRIES:
print_substep(
f"Lỗi kết nối (lần {attempt}/{_MAX_RETRIES}), "
f"thử lại sau {_RETRY_DELAY_SECONDS}s...",
style="bold yellow",
)
_time.sleep(_RETRY_DELAY_SECONDS)
continue
raise
except (ThreadsAPIError, requests.HTTPError):
raise
def validate_token(self) -> dict:
"""Kiểm tra access token có hợp lệ bằng cách gọi /me endpoint.
Returns:
User profile data nếu token hợp lệ.
Raises:
ThreadsAPIError: Nếu token không hợp lệ hoặc đã hết hạn.
"""
try:
return self._get("me", params={"fields": "id,username"})
except (ThreadsAPIError, requests.HTTPError) as exc:
raise ThreadsAPIError(
"Access token không hợp lệ hoặc đã hết hạn. "
"Vui lòng cập nhật access_token trong config.toml. "
"Hướng dẫn: https://developers.facebook.com/docs/threads/get-started\n"
f"Chi tiết: {exc}"
) from exc
def refresh_token(self) -> str:
"""Làm mới access token (long-lived token).
Meta Threads API cho phép refresh long-lived tokens.
Endpoint: GET /refresh_access_token?grant_type=th_refresh_token&access_token=...
Returns:
Access token mới.
Raises:
ThreadsAPIError: Nếu không thể refresh token.
"""
try:
url = f"{THREADS_API_BASE}/refresh_access_token"
response = self.session.get(
url,
params={
"grant_type": "th_refresh_token",
"access_token": self.access_token,
},
timeout=_REQUEST_TIMEOUT_SECONDS,
)
response.raise_for_status()
data = response.json()
if "error" in data:
error_msg = data["error"].get("message", "Unknown error")
raise ThreadsAPIError(f"Không thể refresh token: {error_msg}")
new_token = data.get("access_token", "")
if not new_token:
raise ThreadsAPIError("API không trả về access_token mới khi refresh.")
self.access_token = new_token
print_substep("✅ Đã refresh access token thành công!", style="bold green")
return new_token
except requests.RequestException as exc:
raise ThreadsAPIError(
"Không thể refresh token. Vui lòng lấy token mới từ "
"Meta Developer Portal.\n"
f"Chi tiết: {exc}"
) from exc
def get_user_threads(self, user_id: Optional[str] = None, limit: int = 25) -> List[dict]:
"""Lấy danh sách threads của user.
@ -138,12 +263,51 @@ def get_threads_posts(POST_ID: str = None) -> dict:
Returns:
Dict chứa thread content replies.
Raises:
ThreadsAPIError: Nếu token không hợp lệ hoặc API trả về lỗi.
ValueError: Nếu không tìm thấy threads phù hợp.
"""
print_substep("Đang kết nối với Threads API...")
client = ThreadsClient()
content = {}
# Bước 0: Validate token trước khi gọi API
print_substep("Đang kiểm tra access token...")
try:
user_info = client.validate_token()
print_substep(
f"✅ Token hợp lệ - User: @{user_info.get('username', 'N/A')} "
f"(ID: {user_info.get('id', 'N/A')})",
style="bold green",
)
except ThreadsAPIError:
# Token không hợp lệ → thử refresh
print_substep(
"⚠️ Token có thể đã hết hạn, đang thử refresh...",
style="bold yellow",
)
try:
client.refresh_token()
user_info = client.validate_token()
print_substep(
f"✅ Token đã refresh - User: @{user_info.get('username', 'N/A')}",
style="bold green",
)
except (ThreadsAPIError, requests.RequestException) as refresh_err:
print_substep(
"❌ Không thể xác thực hoặc refresh token.\n"
" Vui lòng lấy token mới từ Meta Developer Portal:\n"
" https://developers.facebook.com/docs/threads/get-started",
style="bold red",
)
raise ThreadsAPIError(
"Access token không hợp lệ hoặc đã hết hạn. "
"Vui lòng cập nhật access_token trong config.toml. "
f"Chi tiết: {refresh_err}"
) from refresh_err
thread_config = settings.config["threads"]["thread"]
max_comment_length = int(thread_config.get("max_comment_length", 500))
min_comment_length = int(thread_config.get("min_comment_length", 1))
@ -160,14 +324,36 @@ def get_threads_posts(POST_ID: str = None) -> dict:
threads_list = client.get_user_threads(user_id=target_user, limit=25)
if not threads_list:
print_substep("Không tìm thấy threads nào!", style="bold red")
raise ValueError("No threads found")
print_substep(
"❌ Không tìm thấy threads nào!\n"
" Kiểm tra các nguyên nhân sau:\n"
f" - User ID đang dùng: {target_user}\n"
" - User này có bài viết công khai không?\n"
" - Token có quyền threads_basic_read?\n"
" - Token có đúng cho user_id này không?",
style="bold red",
)
raise ValueError(
f"No threads found for user '{target_user}'. "
"Verify the user has public posts and the access token has "
"'threads_basic_read' permission."
)
# Lọc theo từ khóa nếu có
keywords = thread_config.get("keywords", "")
unfiltered_count = len(threads_list)
if keywords:
keyword_list = [k.strip() for k in keywords.split(",") if k.strip()]
threads_list = client.search_threads_by_keyword(threads_list, keyword_list)
filtered = client.search_threads_by_keyword(threads_list, keyword_list)
if filtered:
threads_list = filtered
else:
# Nếu keyword filter loại hết → bỏ qua filter, dùng list gốc
print_substep(
f"⚠️ Keyword filter ({keywords}) loại hết {unfiltered_count} "
"threads. Bỏ qua keyword filter, dùng tất cả threads.",
style="bold yellow",
)
# Chọn thread phù hợp (chưa tạo video, đủ replies, title chưa dùng)
thread = None

@ -0,0 +1,198 @@
"""
Preflight Access-Token Checker chạy trước khi pipeline bắt đầu.
Kiểm tra:
1. access_token được cấu hình trong config.toml không.
2. Token hợp lệ trên Threads API (/me endpoint) không.
3. Nếu token hết hạn tự động thử refresh.
4. user_id trong config khớp với user sở hữu token không.
Usage:
# Gọi trực tiếp:
python -m utils.check_token
# Hoặc import trong code:
from utils.check_token import preflight_check
preflight_check() # raises SystemExit on failure
"""
import sys
from typing import Optional
import requests
from utils import settings
from utils.console import print_step, print_substep
THREADS_API_BASE = "https://graph.threads.net/v1.0"
_REQUEST_TIMEOUT_SECONDS = 15 # preflight should be fast
class TokenCheckError(Exception):
"""Raised when the access-token preflight fails."""
def _call_me_endpoint(access_token: str) -> dict:
"""GET /me?fields=id,username&access_token=… with minimal retry."""
url = f"{THREADS_API_BASE}/me"
params = {
"fields": "id,username",
"access_token": access_token,
}
response = requests.get(url, params=params, timeout=_REQUEST_TIMEOUT_SECONDS)
# HTTP-level errors
if response.status_code == 401:
raise TokenCheckError(
"Access token không hợp lệ hoặc đã hết hạn (HTTP 401).\n"
"→ Cập nhật [threads.creds] access_token trong config.toml."
)
if response.status_code == 403:
raise TokenCheckError(
"Token thiếu quyền (HTTP 403).\n"
"→ Đảm bảo token có quyền threads_basic_read trong Meta Developer Portal."
)
response.raise_for_status()
data = response.json()
# Graph API may return 200 with an error body
if "error" in data:
err = data["error"]
msg = err.get("message", "Unknown error")
code = err.get("code", 0)
raise TokenCheckError(f"Threads API trả về lỗi: {msg} (code={code})")
return data
def _try_refresh(access_token: str) -> Optional[str]:
"""Attempt to refresh a long-lived Threads token.
Returns new token string, or None if refresh is not possible.
"""
url = f"{THREADS_API_BASE}/refresh_access_token"
try:
resp = requests.get(
url,
params={
"grant_type": "th_refresh_token",
"access_token": access_token,
},
timeout=_REQUEST_TIMEOUT_SECONDS,
)
resp.raise_for_status()
data = resp.json()
if "error" in data:
return None
return data.get("access_token") or None
except requests.RequestException:
return None
def preflight_check() -> None:
"""Validate the Threads access token configured in *config.toml*.
On success, prints a confirmation and returns normally.
On failure, prints actionable diagnostics and raises ``SystemExit(1)``.
"""
print_step("🔑 Kiểm tra access token trước khi chạy...")
# --- 1. Check config values exist -----------------------------------
try:
threads_creds = settings.config["threads"]["creds"]
access_token: str = threads_creds.get("access_token", "").strip()
user_id: str = threads_creds.get("user_id", "").strip()
except (KeyError, TypeError):
print_substep(
"❌ Thiếu cấu hình [threads.creds] trong config.toml.\n"
" Cần có access_token và user_id.",
style="bold red",
)
sys.exit(1)
if not access_token:
print_substep(
"❌ access_token trống trong config.toml!\n"
" Lấy token tại: https://developers.facebook.com/docs/threads/get-started",
style="bold red",
)
sys.exit(1)
if not user_id:
print_substep(
"❌ user_id trống trong config.toml!\n"
" Lấy user_id bằng cách gọi /me với access token.",
style="bold red",
)
sys.exit(1)
# --- 2. Validate token via /me endpoint -----------------------------
try:
me_data = _call_me_endpoint(access_token)
except TokenCheckError as exc:
# Token invalid → try refresh
print_substep(
f"⚠️ Token hiện tại không hợp lệ: {exc}\n" " Đang thử refresh token...",
style="bold yellow",
)
new_token = _try_refresh(access_token)
if new_token:
try:
me_data = _call_me_endpoint(new_token)
access_token = new_token
# Update in-memory config so downstream code uses the new token
settings.config["threads"]["creds"]["access_token"] = new_token
print_substep("✅ Token đã được refresh thành công!", style="bold green")
except TokenCheckError as inner:
print_substep(
f"❌ Token mới sau refresh vẫn lỗi: {inner}\n"
" Vui lòng lấy token mới từ Meta Developer Portal:\n"
" https://developers.facebook.com/docs/threads/get-started",
style="bold red",
)
sys.exit(1)
else:
print_substep(
"❌ Không thể refresh token.\n"
" Vui lòng lấy token mới từ Meta Developer Portal:\n"
" https://developers.facebook.com/docs/threads/get-started",
style="bold red",
)
sys.exit(1)
except requests.RequestException as exc:
print_substep(
f"❌ Lỗi kết nối khi kiểm tra token: {exc}\n" " Kiểm tra kết nối mạng và thử lại.",
style="bold red",
)
sys.exit(1)
# --- 3. Cross-check user_id ----------------------------------------
api_user_id = me_data.get("id", "")
api_username = me_data.get("username", "N/A")
if api_user_id and api_user_id != user_id:
print_substep(
f"⚠️ user_id trong config ({user_id}) khác với user sở hữu token ({api_user_id}).\n"
" Nếu bạn muốn lấy threads của chính mình, hãy cập nhật user_id trong config.toml.\n"
" Đang tiếp tục với token hiện tại...",
style="bold yellow",
)
print_substep(
f"✅ Access token hợp lệ — @{api_username} (ID: {api_user_id})",
style="bold green",
)
# Allow running standalone: python -m utils.check_token
if __name__ == "__main__":
from pathlib import Path
directory = Path().absolute()
settings.check_toml(
f"{directory}/utils/.config.template.toml",
f"{directory}/config.toml",
)
preflight_check()
print_step("🎉 Tất cả kiểm tra đều OK — sẵn sàng chạy!")
Loading…
Cancel
Save