Merge pull request #3 from thaitien280401-stack/copilot/fetch-trending-now-articles

feat: add Threads Trending Now as content source via Playwright scraping
pull/2482/head
Truc Phan Dang Thien 3 days ago committed by GitHub
commit e696279ea5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,6 +24,9 @@ _MAX_RETRIES = 3
_RETRY_DELAY_SECONDS = 2
_REQUEST_TIMEOUT_SECONDS = 30
# Title length limit for video titles
_MAX_TITLE_LENGTH = 200
class ThreadsAPIError(Exception):
"""Lỗi khi gọi Threads API (token hết hạn, quyền thiếu, v.v.)."""
@ -253,6 +256,126 @@ def _contains_blocked_words(text: str) -> bool:
return any(word in text_lower for word in blocked_list)
def _get_trending_content(
max_comment_length: int,
min_comment_length: int,
) -> Optional[dict]:
"""Lấy nội dung từ Trending now trên Threads.
Sử dụng Playwright scraper để lấy bài viết từ trending topics.
Trả về None nếu không thể lấy trending content (để fallback sang user threads).
"""
from threads.trending import (
TrendingScrapeError,
get_trending_threads,
scrape_thread_replies,
)
try:
trending_threads = get_trending_threads()
except TrendingScrapeError as e:
print_substep(f"⚠️ Lỗi lấy trending: {e}", style="bold yellow")
return None
if not trending_threads:
return None
# Chọn thread phù hợp (chưa tạo video, không chứa từ bị chặn)
thread = None
for t in trending_threads:
text = t.get("text", "")
if not text or _contains_blocked_words(text):
continue
title_candidate = text[:_MAX_TITLE_LENGTH]
if is_title_used(title_candidate):
print_substep(
f"Bỏ qua trending đã tạo video: {text[:50]}...",
style="bold yellow",
)
continue
thread = t
break
if thread is None:
if trending_threads:
thread = trending_threads[0]
else:
return None
thread_text = thread.get("text", "")
thread_username = thread.get("username", "unknown")
thread_url = thread.get("permalink", "")
shortcode = thread.get("shortcode", "")
topic_title = thread.get("topic_title", "")
# Dùng topic_title làm tiêu đề video nếu có
display_title = topic_title if topic_title else thread_text[:_MAX_TITLE_LENGTH]
print_substep(
f"Video sẽ được tạo từ trending: {display_title[:100]}...",
style="bold green",
)
print_substep(f"Thread URL: {thread_url}", style="bold green")
print_substep(f"Tác giả: @{thread_username}", style="bold blue")
content: dict = {
"thread_url": thread_url,
"thread_title": display_title[:_MAX_TITLE_LENGTH],
"thread_id": re.sub(r"[^\w\s-]", "", shortcode or thread_text[:20]),
"thread_author": f"@{thread_username}",
"is_nsfw": False,
"thread_post": thread_text,
"comments": [],
}
if not settings.config["settings"].get("storymode", False):
# Lấy replies bằng scraping (vì thread không thuộc user nên API không dùng được)
try:
if thread_url:
raw_replies = scrape_thread_replies(thread_url, limit=50)
else:
raw_replies = []
except Exception as exc:
print_substep(
f"⚠️ Lỗi lấy replies trending: {exc}", style="bold yellow"
)
raw_replies = []
for idx, reply in enumerate(raw_replies):
reply_text = reply.get("text", "")
reply_username = reply.get("username", "unknown")
if not reply_text or _contains_blocked_words(reply_text):
continue
sanitised = sanitize_text(reply_text)
if not sanitised or sanitised.strip() == "":
continue
if len(reply_text) > max_comment_length:
continue
if len(reply_text) < min_comment_length:
continue
content["comments"].append(
{
"comment_body": reply_text,
"comment_url": "",
"comment_id": re.sub(
r"[^\w\s-]", "", f"trending_reply_{idx}"
),
"comment_author": f"@{reply_username}",
}
)
print_substep(
f"Đã lấy nội dung trending thành công! "
f"({len(content.get('comments', []))} replies)",
style="bold green",
)
return content
def get_threads_posts(POST_ID: str = None) -> dict:
"""Lấy nội dung từ Threads để tạo video.
@ -312,9 +435,29 @@ def get_threads_posts(POST_ID: str = None) -> dict:
max_comment_length = int(thread_config.get("max_comment_length", 500))
min_comment_length = int(thread_config.get("min_comment_length", 1))
min_comments = int(thread_config.get("min_comments", 5))
source = thread_config.get("source", "user")
print_step("Đang lấy nội dung từ Threads...")
# ------------------------------------------------------------------
# Source: trending Lấy bài viết từ Trending now
# ------------------------------------------------------------------
if source == "trending" and not POST_ID:
content = _get_trending_content(
max_comment_length=max_comment_length,
min_comment_length=min_comment_length,
)
if content is not None:
return content
# Fallback: nếu trending thất bại, tiếp tục dùng user threads
print_substep(
"⚠️ Trending không khả dụng, chuyển sang lấy từ user threads...",
style="bold yellow",
)
# ------------------------------------------------------------------
# Source: user (mặc định) hoặc POST_ID cụ thể
# ------------------------------------------------------------------
if POST_ID:
# Lấy thread cụ thể theo ID
thread = client.get_thread_by_id(POST_ID)
@ -364,7 +507,7 @@ def get_threads_posts(POST_ID: str = None) -> dict:
if not text or _contains_blocked_words(text):
continue
# Kiểm tra title đã được sử dụng chưa (tránh trùng lặp)
title_candidate = text[:200] if len(text) > 200 else text
title_candidate = text[:_MAX_TITLE_LENGTH] if len(text) > _MAX_TITLE_LENGTH else text
if is_title_used(title_candidate):
print_substep(
f"Bỏ qua thread đã tạo video: {text[:50]}...",
@ -399,8 +542,9 @@ def get_threads_posts(POST_ID: str = None) -> dict:
print_substep(f"Thread URL: {thread_url}", style="bold green")
print_substep(f"Tác giả: @{thread_username}", style="bold blue")
content = {}
content["thread_url"] = thread_url
content["thread_title"] = thread_text[:200] if len(thread_text) > 200 else thread_text
content["thread_title"] = thread_text[:_MAX_TITLE_LENGTH] if len(thread_text) > _MAX_TITLE_LENGTH else thread_text
content["thread_id"] = re.sub(r"[^\w\s-]", "", thread_id)
content["thread_author"] = f"@{thread_username}"
content["is_nsfw"] = False

@ -0,0 +1,372 @@
"""
Threads Trending Scraper - Lấy bài viết từ mục "Trending now" trên Threads.
Threads API chính thức không cung cấp endpoint cho trending topics.
Module này sử dụng Playwright để scrape nội dung trending từ giao diện web Threads.
Flow:
1. Mở trang tìm kiếm Threads (https://www.threads.net/search)
2. Trích xuất trending topic links
3. Truy cập từng topic để lấy danh sách bài viết
4. Truy cập bài viết để lấy replies (nếu cần)
"""
import re
from typing import Dict, List
from playwright.sync_api import (
Page,
TimeoutError as PlaywrightTimeoutError,
sync_playwright,
)
from utils.console import print_step, print_substep
THREADS_SEARCH_URL = "https://www.threads.net/search"
_PAGE_LOAD_TIMEOUT_MS = 30_000
_CONTENT_WAIT_MS = 3_000
_REPLY_SCROLL_ITERATIONS = 5
_TOPIC_SCROLL_ITERATIONS = 2
# Shared browser context settings
_BROWSER_VIEWPORT = {"width": 1280, "height": 900}
_BROWSER_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
_BROWSER_LOCALE = "vi-VN"
class TrendingScrapeError(Exception):
"""Lỗi khi scrape trending content từ Threads."""
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _extract_topic_links(page: Page, limit: int) -> List[Dict[str, str]]:
"""Extract trending topic links from the search page DOM."""
topics: List[Dict[str, str]] = []
elements = page.query_selector_all('a[href*="/search?q="]')
for elem in elements:
if len(topics) >= limit:
break
try:
href = elem.get_attribute("href") or ""
text = elem.inner_text().strip()
if not text or not href:
continue
lines = [line.strip() for line in text.split("\n") if line.strip()]
title = lines[0] if lines else ""
if not title:
continue
url = f"https://www.threads.net{href}" if href.startswith("/") else href
topics.append({"title": title, "url": url})
except Exception:
continue
return topics
def _extract_post_links(page: Page, limit: int) -> List[Dict[str, str]]:
"""Extract thread post data from a page containing post links."""
threads: List[Dict[str, str]] = []
seen_shortcodes: set = set()
post_links = page.query_selector_all('a[href*="/post/"]')
for link in post_links:
if len(threads) >= limit:
break
try:
href = link.get_attribute("href") or ""
sc_match = re.search(r"/post/([A-Za-z0-9_-]+)", href)
if not sc_match:
continue
shortcode = sc_match.group(1)
if shortcode in seen_shortcodes:
continue
seen_shortcodes.add(shortcode)
# Username from URL: /@username/post/...
user_match = re.search(r"/@([^/]+)/post/", href)
username = user_match.group(1) if user_match else "unknown"
# Walk up the DOM to find a container with the post text
text = _get_post_text(link)
if not text or len(text) < 10:
continue
permalink = (
f"https://www.threads.net{href}" if href.startswith("/") else href
)
threads.append(
{
"text": text,
"username": username,
"permalink": permalink,
"shortcode": shortcode,
}
)
except Exception:
continue
return threads
def _get_post_text(link_handle) -> str:
"""Walk up the DOM from a link element to extract post text content."""
try:
container = link_handle.evaluate_handle(
"""el => {
let node = el;
for (let i = 0; i < 10; i++) {
node = node.parentElement;
if (!node) return el.parentElement || el;
const text = node.innerText || '';
if (text.length > 30 && (
node.getAttribute('role') === 'article' ||
node.tagName === 'ARTICLE' ||
node.dataset && node.dataset.testid
)) {
return node;
}
}
return el.parentElement ? el.parentElement.parentElement || el.parentElement : el;
}"""
)
raw = container.inner_text().strip() if container else ""
except Exception:
return ""
if not raw:
return ""
# Clean: remove short metadata lines (timestamps, UI buttons, etc.)
_skip = {"Trả lời", "Thích", "Chia sẻ", "Repost", "Quote", "...", ""}
cleaned_lines: list = []
for line in raw.split("\n"):
line = line.strip()
if not line or len(line) < 3:
continue
if line in _skip:
continue
# Skip standalone @username lines
if line.startswith("@") and " " not in line and len(line) < 30:
continue
cleaned_lines.append(line)
return "\n".join(cleaned_lines)
def _extract_replies(page: Page, limit: int) -> List[Dict[str, str]]:
"""Extract replies from a thread detail page."""
replies: List[Dict[str, str]] = []
# Scroll to load more replies
for _ in range(_REPLY_SCROLL_ITERATIONS):
page.evaluate("window.scrollBy(0, window.innerHeight)")
page.wait_for_timeout(1000)
articles = page.query_selector_all('div[role="article"], article')
for idx, article in enumerate(articles):
if idx == 0:
continue # Skip main post
if len(replies) >= limit:
break
try:
text = article.inner_text().strip()
if not text or len(text) < 5:
continue
# Username
username_link = article.query_selector('a[href^="/@"]')
username = "unknown"
if username_link:
href = username_link.get_attribute("href") or ""
match = re.match(r"/@([^/]+)", href)
username = match.group(1) if match else "unknown"
# Clean text
_skip = {"Trả lời", "Thích", "Chia sẻ", "Repost", "...", ""}
lines = [
l.strip()
for l in text.split("\n")
if l.strip() and len(l.strip()) > 3 and l.strip() not in _skip
]
clean_text = "\n".join(lines)
if clean_text:
replies.append({"text": clean_text, "username": username})
except Exception:
continue
return replies
def _create_browser_context(playwright):
"""Create a Playwright browser and context with shared settings."""
browser = playwright.chromium.launch(headless=True)
context = browser.new_context(
viewport=_BROWSER_VIEWPORT,
user_agent=_BROWSER_USER_AGENT,
locale=_BROWSER_LOCALE,
)
return browser, context
def _scroll_page(page: Page, times: int = _TOPIC_SCROLL_ITERATIONS) -> None:
"""Scroll down to trigger lazy-loading content."""
for _ in range(times):
page.evaluate("window.scrollBy(0, window.innerHeight)")
page.wait_for_timeout(1000)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def get_trending_threads(
max_topics: int = 5,
max_threads_per_topic: int = 10,
) -> List[Dict[str, str]]:
"""Lấy danh sách threads từ các trending topics trên Threads.
Mở một phiên Playwright duy nhất, duyệt qua trending topics
trích xuất bài viết từ mỗi topic.
Args:
max_topics: Số trending topics tối đa cần duyệt.
max_threads_per_topic: Số bài viết tối đa từ mỗi topic.
Returns:
Danh sách thread dicts: ``{text, username, permalink, shortcode, topic_title}``.
Raises:
TrendingScrapeError: Nếu không thể scrape trending.
"""
print_step("🔥 Đang lấy bài viết từ Trending now trên Threads...")
all_threads: List[Dict[str, str]] = []
with sync_playwright() as p:
browser, context = _create_browser_context(p)
page = context.new_page()
try:
# Step 1: Navigate to search page
page.goto(THREADS_SEARCH_URL, timeout=_PAGE_LOAD_TIMEOUT_MS)
page.wait_for_load_state(
"domcontentloaded", timeout=_PAGE_LOAD_TIMEOUT_MS
)
page.wait_for_timeout(_CONTENT_WAIT_MS)
# Step 2: Extract trending topics
topics = _extract_topic_links(page, limit=max_topics)
if not topics:
raise TrendingScrapeError(
"Không tìm thấy trending topics trên Threads. "
"Có thể Threads đã thay đổi giao diện hoặc yêu cầu đăng nhập."
)
topic_names = ", ".join(t["title"][:30] for t in topics[:3])
suffix = "..." if len(topics) > 3 else ""
print_substep(
f"🔥 Tìm thấy {len(topics)} trending topics: {topic_names}{suffix}",
style="bold blue",
)
# Step 3: Visit each topic and extract threads
for topic in topics:
try:
page.goto(topic["url"], timeout=_PAGE_LOAD_TIMEOUT_MS)
page.wait_for_load_state(
"domcontentloaded", timeout=_PAGE_LOAD_TIMEOUT_MS
)
page.wait_for_timeout(_CONTENT_WAIT_MS)
_scroll_page(page, times=2)
threads = _extract_post_links(
page, limit=max_threads_per_topic
)
for t in threads:
t["topic_title"] = topic["title"]
all_threads.extend(threads)
print_substep(
f" 📝 Topic '{topic['title'][:30]}': "
f"{len(threads)} bài viết",
style="bold blue",
)
except PlaywrightTimeoutError:
print_substep(
f" ⚠️ Timeout topic '{topic['title'][:30]}'",
style="bold yellow",
)
except Exception as exc:
print_substep(
f" ⚠️ Lỗi topic '{topic['title'][:30]}': {exc}",
style="bold yellow",
)
except TrendingScrapeError:
raise
except PlaywrightTimeoutError as exc:
raise TrendingScrapeError(
"Timeout khi tải trang Threads. Kiểm tra kết nối mạng."
) from exc
except Exception as exc:
raise TrendingScrapeError(
f"Lỗi khi scrape trending: {exc}"
) from exc
finally:
browser.close()
print_substep(
f"✅ Tổng cộng {len(all_threads)} bài viết từ trending",
style="bold green",
)
return all_threads
def scrape_thread_replies(
thread_url: str, limit: int = 50
) -> List[Dict[str, str]]:
"""Lấy replies của một thread bằng cách scrape trang web.
Sử dụng khi không thể dùng Threads API chính thức
( dụ thread không thuộc user đã xác thực).
Args:
thread_url: URL của thread trên Threads.
limit: Số replies tối đa.
Returns:
Danh sách reply dicts: ``{text, username}``.
"""
print_substep(f"💬 Đang lấy replies từ: {thread_url[:60]}...")
replies: List[Dict[str, str]] = []
with sync_playwright() as p:
browser, context = _create_browser_context(p)
page = context.new_page()
try:
page.goto(thread_url, timeout=_PAGE_LOAD_TIMEOUT_MS)
page.wait_for_load_state(
"domcontentloaded", timeout=_PAGE_LOAD_TIMEOUT_MS
)
page.wait_for_timeout(_CONTENT_WAIT_MS)
replies = _extract_replies(page, limit=limit)
except PlaywrightTimeoutError:
print_substep(
"⚠️ Timeout khi tải thread", style="bold yellow"
)
except Exception as exc:
print_substep(
f"⚠️ Lỗi lấy replies: {exc}", style="bold yellow"
)
finally:
browser.close()
print_substep(f"💬 Đã lấy {len(replies)} replies", style="bold blue")
return replies

@ -5,7 +5,8 @@ access_token = { optional = false, nmin = 10, explanation = "Threads API access
user_id = { optional = false, nmin = 1, explanation = "Threads user ID của bạn", example = "12345678" }
[threads.thread]
target_user_id = { optional = true, default = "", explanation = "ID user muốn lấy threads. Để trống dùng user của bạn.", example = "87654321" }
source = { optional = true, default = "user", options = ["user", "trending"], explanation = "Nguồn lấy bài viết: 'user' (từ user cụ thể) hoặc 'trending' (từ Trending now). Mặc định: user", example = "user" }
target_user_id = { optional = true, default = "", explanation = "ID user muốn lấy threads. Để trống dùng user của bạn. Chỉ dùng khi source = 'user'.", example = "87654321" }
post_id = { optional = true, default = "", explanation = "ID cụ thể của thread. Để trống để tự động chọn.", example = "18050000000000000" }
keywords = { optional = true, default = "", type = "str", explanation = "Từ khóa lọc threads, phân cách bằng dấu phẩy.", example = "viral, trending, hài hước" }
max_comment_length = { default = 500, optional = false, nmin = 10, nmax = 10000, type = "int", explanation = "Độ dài tối đa reply (ký tự). Mặc định: 500", example = 500, oob_error = "Phải trong khoảng 10-10000" }

Loading…
Cancel
Save