diff --git a/threads/threads_client.py b/threads/threads_client.py index 7c81b60..f603f21 100644 --- a/threads/threads_client.py +++ b/threads/threads_client.py @@ -24,6 +24,9 @@ _MAX_RETRIES = 3 _RETRY_DELAY_SECONDS = 2 _REQUEST_TIMEOUT_SECONDS = 30 +# Title length limit for video titles +_MAX_TITLE_LENGTH = 200 + class ThreadsAPIError(Exception): """Lỗi khi gọi Threads API (token hết hạn, quyền thiếu, v.v.).""" @@ -283,7 +286,7 @@ def _get_trending_content( text = t.get("text", "") if not text or _contains_blocked_words(text): continue - title_candidate = text[:200] + title_candidate = text[:_MAX_TITLE_LENGTH] if is_title_used(title_candidate): print_substep( f"Bỏ qua trending đã tạo video: {text[:50]}...", @@ -306,7 +309,7 @@ def _get_trending_content( topic_title = thread.get("topic_title", "") # Dùng topic_title làm tiêu đề video nếu có - display_title = topic_title if topic_title else thread_text[:200] + display_title = topic_title if topic_title else thread_text[:_MAX_TITLE_LENGTH] print_substep( f"Video sẽ được tạo từ trending: {display_title[:100]}...", @@ -317,7 +320,7 @@ def _get_trending_content( content: dict = { "thread_url": thread_url, - "thread_title": display_title[:200], + "thread_title": display_title[:_MAX_TITLE_LENGTH], "thread_id": re.sub(r"[^\w\s-]", "", shortcode or thread_text[:20]), "thread_author": f"@{thread_username}", "is_nsfw": False, @@ -504,7 +507,7 @@ def get_threads_posts(POST_ID: str = None) -> dict: if not text or _contains_blocked_words(text): continue # Kiểm tra title đã được sử dụng chưa (tránh trùng lặp) - title_candidate = text[:200] if len(text) > 200 else text + title_candidate = text[:_MAX_TITLE_LENGTH] if len(text) > _MAX_TITLE_LENGTH else text if is_title_used(title_candidate): print_substep( f"Bỏ qua thread đã tạo video: {text[:50]}...", @@ -541,7 +544,7 @@ def get_threads_posts(POST_ID: str = None) -> dict: content = {} content["thread_url"] = thread_url - content["thread_title"] = thread_text[:200] if len(thread_text) > 200 else thread_text + content["thread_title"] = thread_text[:_MAX_TITLE_LENGTH] if len(thread_text) > _MAX_TITLE_LENGTH else thread_text content["thread_id"] = re.sub(r"[^\w\s-]", "", thread_id) content["thread_author"] = f"@{thread_username}" content["is_nsfw"] = False diff --git a/threads/trending.py b/threads/trending.py index 6fc0f1c..ad3f437 100644 --- a/threads/trending.py +++ b/threads/trending.py @@ -12,7 +12,7 @@ Flow: """ import re -from typing import Dict, List, Optional, Tuple +from typing import Dict, List from playwright.sync_api import ( Page, @@ -25,6 +25,17 @@ from utils.console import print_step, print_substep THREADS_SEARCH_URL = "https://www.threads.net/search" _PAGE_LOAD_TIMEOUT_MS = 30_000 _CONTENT_WAIT_MS = 3_000 +_REPLY_SCROLL_ITERATIONS = 5 +_TOPIC_SCROLL_ITERATIONS = 2 + +# Shared browser context settings +_BROWSER_VIEWPORT = {"width": 1280, "height": 900} +_BROWSER_USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Safari/537.36" +) +_BROWSER_LOCALE = "vi-VN" class TrendingScrapeError(Exception): @@ -77,7 +88,7 @@ def _extract_post_links(page: Page, limit: int) -> List[Dict[str, str]]: continue seen_shortcodes.add(shortcode) - # Username from URL /@username/post/... + # Username from URL: /@username/post/... user_match = re.search(r"/@([^/]+)/post/", href) username = user_match.group(1) if user_match else "unknown" @@ -151,7 +162,7 @@ def _extract_replies(page: Page, limit: int) -> List[Dict[str, str]]: replies: List[Dict[str, str]] = [] # Scroll to load more replies - for _ in range(5): + for _ in range(_REPLY_SCROLL_ITERATIONS): page.evaluate("window.scrollBy(0, window.innerHeight)") page.wait_for_timeout(1000) @@ -189,7 +200,18 @@ def _extract_replies(page: Page, limit: int) -> List[Dict[str, str]]: return replies -def _scroll_page(page: Page, times: int = 2) -> None: +def _create_browser_context(playwright): + """Create a Playwright browser and context with shared settings.""" + browser = playwright.chromium.launch(headless=True) + context = browser.new_context( + viewport=_BROWSER_VIEWPORT, + user_agent=_BROWSER_USER_AGENT, + locale=_BROWSER_LOCALE, + ) + return browser, context + + +def _scroll_page(page: Page, times: int = _TOPIC_SCROLL_ITERATIONS) -> None: """Scroll down to trigger lazy-loading content.""" for _ in range(times): page.evaluate("window.scrollBy(0, window.innerHeight)") @@ -225,16 +247,7 @@ def get_trending_threads( all_threads: List[Dict[str, str]] = [] with sync_playwright() as p: - browser = p.chromium.launch(headless=True) - context = browser.new_context( - viewport={"width": 1280, "height": 900}, - user_agent=( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/131.0.0.0 Safari/537.36" - ), - locale="vi-VN", - ) + browser, context = _create_browser_context(p) page = context.new_page() try: @@ -333,16 +346,7 @@ def scrape_thread_replies( replies: List[Dict[str, str]] = [] with sync_playwright() as p: - browser = p.chromium.launch(headless=True) - context = browser.new_context( - viewport={"width": 1280, "height": 900}, - user_agent=( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/131.0.0.0 Safari/537.36" - ), - locale="vi-VN", - ) + browser, context = _create_browser_context(p) page = context.new_page() try: