feat: add Threads Trending now support as content source

- Create threads/trending.py: Playwright-based scraper for Threads
  trending topics and thread replies
- Modify threads/threads_client.py: add source config check, integrate
  trending scraper with fallback to user threads
- Update .config.template.toml: add source option (user/trending)

Agent-Logs-Url: https://github.com/thaitien280401-stack/RedditVideoMakerBot/sessions/01a85c1b-5157-4723-80f1-ca726e410a39

Co-authored-by: thaitien280401-stack <271128961+thaitien280401-stack@users.noreply.github.com>
pull/2482/head
copilot-swe-agent[bot] 3 days ago committed by GitHub
parent 36b702eceb
commit c7e6bae8cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -253,6 +253,126 @@ def _contains_blocked_words(text: str) -> bool:
return any(word in text_lower for word in blocked_list)
def _get_trending_content(
max_comment_length: int,
min_comment_length: int,
) -> Optional[dict]:
"""Lấy nội dung từ Trending now trên Threads.
Sử dụng Playwright scraper để lấy bài viết từ trending topics.
Trả về None nếu không thể lấy trending content (để fallback sang user threads).
"""
from threads.trending import (
TrendingScrapeError,
get_trending_threads,
scrape_thread_replies,
)
try:
trending_threads = get_trending_threads()
except TrendingScrapeError as e:
print_substep(f"⚠️ Lỗi lấy trending: {e}", style="bold yellow")
return None
if not trending_threads:
return None
# Chọn thread phù hợp (chưa tạo video, không chứa từ bị chặn)
thread = None
for t in trending_threads:
text = t.get("text", "")
if not text or _contains_blocked_words(text):
continue
title_candidate = text[:200]
if is_title_used(title_candidate):
print_substep(
f"Bỏ qua trending đã tạo video: {text[:50]}...",
style="bold yellow",
)
continue
thread = t
break
if thread is None:
if trending_threads:
thread = trending_threads[0]
else:
return None
thread_text = thread.get("text", "")
thread_username = thread.get("username", "unknown")
thread_url = thread.get("permalink", "")
shortcode = thread.get("shortcode", "")
topic_title = thread.get("topic_title", "")
# Dùng topic_title làm tiêu đề video nếu có
display_title = topic_title if topic_title else thread_text[:200]
print_substep(
f"Video sẽ được tạo từ trending: {display_title[:100]}...",
style="bold green",
)
print_substep(f"Thread URL: {thread_url}", style="bold green")
print_substep(f"Tác giả: @{thread_username}", style="bold blue")
content: dict = {
"thread_url": thread_url,
"thread_title": display_title[:200],
"thread_id": re.sub(r"[^\w\s-]", "", shortcode or thread_text[:20]),
"thread_author": f"@{thread_username}",
"is_nsfw": False,
"thread_post": thread_text,
"comments": [],
}
if not settings.config["settings"].get("storymode", False):
# Lấy replies bằng scraping (vì thread không thuộc user nên API không dùng được)
try:
if thread_url:
raw_replies = scrape_thread_replies(thread_url, limit=50)
else:
raw_replies = []
except Exception as exc:
print_substep(
f"⚠️ Lỗi lấy replies trending: {exc}", style="bold yellow"
)
raw_replies = []
for idx, reply in enumerate(raw_replies):
reply_text = reply.get("text", "")
reply_username = reply.get("username", "unknown")
if not reply_text or _contains_blocked_words(reply_text):
continue
sanitised = sanitize_text(reply_text)
if not sanitised or sanitised.strip() == "":
continue
if len(reply_text) > max_comment_length:
continue
if len(reply_text) < min_comment_length:
continue
content["comments"].append(
{
"comment_body": reply_text,
"comment_url": "",
"comment_id": re.sub(
r"[^\w\s-]", "", f"trending_reply_{idx}"
),
"comment_author": f"@{reply_username}",
}
)
print_substep(
f"Đã lấy nội dung trending thành công! "
f"({len(content.get('comments', []))} replies)",
style="bold green",
)
return content
def get_threads_posts(POST_ID: str = None) -> dict:
"""Lấy nội dung từ Threads để tạo video.
@ -312,9 +432,29 @@ def get_threads_posts(POST_ID: str = None) -> dict:
max_comment_length = int(thread_config.get("max_comment_length", 500))
min_comment_length = int(thread_config.get("min_comment_length", 1))
min_comments = int(thread_config.get("min_comments", 5))
source = thread_config.get("source", "user")
print_step("Đang lấy nội dung từ Threads...")
# ------------------------------------------------------------------
# Source: trending Lấy bài viết từ Trending now
# ------------------------------------------------------------------
if source == "trending" and not POST_ID:
content = _get_trending_content(
max_comment_length=max_comment_length,
min_comment_length=min_comment_length,
)
if content is not None:
return content
# Fallback: nếu trending thất bại, tiếp tục dùng user threads
print_substep(
"⚠️ Trending không khả dụng, chuyển sang lấy từ user threads...",
style="bold yellow",
)
# ------------------------------------------------------------------
# Source: user (mặc định) hoặc POST_ID cụ thể
# ------------------------------------------------------------------
if POST_ID:
# Lấy thread cụ thể theo ID
thread = client.get_thread_by_id(POST_ID)
@ -399,6 +539,7 @@ def get_threads_posts(POST_ID: str = None) -> dict:
print_substep(f"Thread URL: {thread_url}", style="bold green")
print_substep(f"Tác giả: @{thread_username}", style="bold blue")
content = {}
content["thread_url"] = thread_url
content["thread_title"] = thread_text[:200] if len(thread_text) > 200 else thread_text
content["thread_id"] = re.sub(r"[^\w\s-]", "", thread_id)

@ -0,0 +1,368 @@
"""
Threads Trending Scraper - Lấy bài viết từ mục "Trending now" trên Threads.
Threads API chính thức không cung cấp endpoint cho trending topics.
Module này sử dụng Playwright để scrape nội dung trending từ giao diện web Threads.
Flow:
1. Mở trang tìm kiếm Threads (https://www.threads.net/search)
2. Trích xuất trending topic links
3. Truy cập từng topic để lấy danh sách bài viết
4. Truy cập bài viết để lấy replies (nếu cần)
"""
import re
from typing import Dict, List, Optional, Tuple
from playwright.sync_api import (
Page,
TimeoutError as PlaywrightTimeoutError,
sync_playwright,
)
from utils.console import print_step, print_substep
THREADS_SEARCH_URL = "https://www.threads.net/search"
_PAGE_LOAD_TIMEOUT_MS = 30_000
_CONTENT_WAIT_MS = 3_000
class TrendingScrapeError(Exception):
"""Lỗi khi scrape trending content từ Threads."""
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _extract_topic_links(page: Page, limit: int) -> List[Dict[str, str]]:
"""Extract trending topic links from the search page DOM."""
topics: List[Dict[str, str]] = []
elements = page.query_selector_all('a[href*="/search?q="]')
for elem in elements:
if len(topics) >= limit:
break
try:
href = elem.get_attribute("href") or ""
text = elem.inner_text().strip()
if not text or not href:
continue
lines = [line.strip() for line in text.split("\n") if line.strip()]
title = lines[0] if lines else ""
if not title:
continue
url = f"https://www.threads.net{href}" if href.startswith("/") else href
topics.append({"title": title, "url": url})
except Exception:
continue
return topics
def _extract_post_links(page: Page, limit: int) -> List[Dict[str, str]]:
"""Extract thread post data from a page containing post links."""
threads: List[Dict[str, str]] = []
seen_shortcodes: set = set()
post_links = page.query_selector_all('a[href*="/post/"]')
for link in post_links:
if len(threads) >= limit:
break
try:
href = link.get_attribute("href") or ""
sc_match = re.search(r"/post/([A-Za-z0-9_-]+)", href)
if not sc_match:
continue
shortcode = sc_match.group(1)
if shortcode in seen_shortcodes:
continue
seen_shortcodes.add(shortcode)
# Username from URL /@username/post/...
user_match = re.search(r"/@([^/]+)/post/", href)
username = user_match.group(1) if user_match else "unknown"
# Walk up the DOM to find a container with the post text
text = _get_post_text(link)
if not text or len(text) < 10:
continue
permalink = (
f"https://www.threads.net{href}" if href.startswith("/") else href
)
threads.append(
{
"text": text,
"username": username,
"permalink": permalink,
"shortcode": shortcode,
}
)
except Exception:
continue
return threads
def _get_post_text(link_handle) -> str:
"""Walk up the DOM from a link element to extract post text content."""
try:
container = link_handle.evaluate_handle(
"""el => {
let node = el;
for (let i = 0; i < 10; i++) {
node = node.parentElement;
if (!node) return el.parentElement || el;
const text = node.innerText || '';
if (text.length > 30 && (
node.getAttribute('role') === 'article' ||
node.tagName === 'ARTICLE' ||
node.dataset && node.dataset.testid
)) {
return node;
}
}
return el.parentElement ? el.parentElement.parentElement || el.parentElement : el;
}"""
)
raw = container.inner_text().strip() if container else ""
except Exception:
return ""
if not raw:
return ""
# Clean: remove short metadata lines (timestamps, UI buttons, etc.)
_skip = {"Trả lời", "Thích", "Chia sẻ", "Repost", "Quote", "...", ""}
cleaned_lines: list = []
for line in raw.split("\n"):
line = line.strip()
if not line or len(line) < 3:
continue
if line in _skip:
continue
# Skip standalone @username lines
if line.startswith("@") and " " not in line and len(line) < 30:
continue
cleaned_lines.append(line)
return "\n".join(cleaned_lines)
def _extract_replies(page: Page, limit: int) -> List[Dict[str, str]]:
"""Extract replies from a thread detail page."""
replies: List[Dict[str, str]] = []
# Scroll to load more replies
for _ in range(5):
page.evaluate("window.scrollBy(0, window.innerHeight)")
page.wait_for_timeout(1000)
articles = page.query_selector_all('div[role="article"], article')
for idx, article in enumerate(articles):
if idx == 0:
continue # Skip main post
if len(replies) >= limit:
break
try:
text = article.inner_text().strip()
if not text or len(text) < 5:
continue
# Username
username_link = article.query_selector('a[href^="/@"]')
username = "unknown"
if username_link:
href = username_link.get_attribute("href") or ""
match = re.match(r"/@([^/]+)", href)
username = match.group(1) if match else "unknown"
# Clean text
_skip = {"Trả lời", "Thích", "Chia sẻ", "Repost", "...", ""}
lines = [
l.strip()
for l in text.split("\n")
if l.strip() and len(l.strip()) > 3 and l.strip() not in _skip
]
clean_text = "\n".join(lines)
if clean_text:
replies.append({"text": clean_text, "username": username})
except Exception:
continue
return replies
def _scroll_page(page: Page, times: int = 2) -> None:
"""Scroll down to trigger lazy-loading content."""
for _ in range(times):
page.evaluate("window.scrollBy(0, window.innerHeight)")
page.wait_for_timeout(1000)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def get_trending_threads(
max_topics: int = 5,
max_threads_per_topic: int = 10,
) -> List[Dict[str, str]]:
"""Lấy danh sách threads từ các trending topics trên Threads.
Mở một phiên Playwright duy nhất, duyệt qua trending topics
trích xuất bài viết từ mỗi topic.
Args:
max_topics: Số trending topics tối đa cần duyệt.
max_threads_per_topic: Số bài viết tối đa từ mỗi topic.
Returns:
Danh sách thread dicts: ``{text, username, permalink, shortcode, topic_title}``.
Raises:
TrendingScrapeError: Nếu không thể scrape trending.
"""
print_step("🔥 Đang lấy bài viết từ Trending now trên Threads...")
all_threads: List[Dict[str, str]] = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
viewport={"width": 1280, "height": 900},
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
locale="vi-VN",
)
page = context.new_page()
try:
# Step 1: Navigate to search page
page.goto(THREADS_SEARCH_URL, timeout=_PAGE_LOAD_TIMEOUT_MS)
page.wait_for_load_state(
"domcontentloaded", timeout=_PAGE_LOAD_TIMEOUT_MS
)
page.wait_for_timeout(_CONTENT_WAIT_MS)
# Step 2: Extract trending topics
topics = _extract_topic_links(page, limit=max_topics)
if not topics:
raise TrendingScrapeError(
"Không tìm thấy trending topics trên Threads. "
"Có thể Threads đã thay đổi giao diện hoặc yêu cầu đăng nhập."
)
topic_names = ", ".join(t["title"][:30] for t in topics[:3])
suffix = "..." if len(topics) > 3 else ""
print_substep(
f"🔥 Tìm thấy {len(topics)} trending topics: {topic_names}{suffix}",
style="bold blue",
)
# Step 3: Visit each topic and extract threads
for topic in topics:
try:
page.goto(topic["url"], timeout=_PAGE_LOAD_TIMEOUT_MS)
page.wait_for_load_state(
"domcontentloaded", timeout=_PAGE_LOAD_TIMEOUT_MS
)
page.wait_for_timeout(_CONTENT_WAIT_MS)
_scroll_page(page, times=2)
threads = _extract_post_links(
page, limit=max_threads_per_topic
)
for t in threads:
t["topic_title"] = topic["title"]
all_threads.extend(threads)
print_substep(
f" 📝 Topic '{topic['title'][:30]}': "
f"{len(threads)} bài viết",
style="bold blue",
)
except PlaywrightTimeoutError:
print_substep(
f" ⚠️ Timeout topic '{topic['title'][:30]}'",
style="bold yellow",
)
except Exception as exc:
print_substep(
f" ⚠️ Lỗi topic '{topic['title'][:30]}': {exc}",
style="bold yellow",
)
except TrendingScrapeError:
raise
except PlaywrightTimeoutError as exc:
raise TrendingScrapeError(
"Timeout khi tải trang Threads. Kiểm tra kết nối mạng."
) from exc
except Exception as exc:
raise TrendingScrapeError(
f"Lỗi khi scrape trending: {exc}"
) from exc
finally:
browser.close()
print_substep(
f"✅ Tổng cộng {len(all_threads)} bài viết từ trending",
style="bold green",
)
return all_threads
def scrape_thread_replies(
thread_url: str, limit: int = 50
) -> List[Dict[str, str]]:
"""Lấy replies của một thread bằng cách scrape trang web.
Sử dụng khi không thể dùng Threads API chính thức
( dụ thread không thuộc user đã xác thực).
Args:
thread_url: URL của thread trên Threads.
limit: Số replies tối đa.
Returns:
Danh sách reply dicts: ``{text, username}``.
"""
print_substep(f"💬 Đang lấy replies từ: {thread_url[:60]}...")
replies: List[Dict[str, str]] = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
viewport={"width": 1280, "height": 900},
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
locale="vi-VN",
)
page = context.new_page()
try:
page.goto(thread_url, timeout=_PAGE_LOAD_TIMEOUT_MS)
page.wait_for_load_state(
"domcontentloaded", timeout=_PAGE_LOAD_TIMEOUT_MS
)
page.wait_for_timeout(_CONTENT_WAIT_MS)
replies = _extract_replies(page, limit=limit)
except PlaywrightTimeoutError:
print_substep(
"⚠️ Timeout khi tải thread", style="bold yellow"
)
except Exception as exc:
print_substep(
f"⚠️ Lỗi lấy replies: {exc}", style="bold yellow"
)
finally:
browser.close()
print_substep(f"💬 Đã lấy {len(replies)} replies", style="bold blue")
return replies

@ -5,7 +5,8 @@ access_token = { optional = false, nmin = 10, explanation = "Threads API access
user_id = { optional = false, nmin = 1, explanation = "Threads user ID của bạn", example = "12345678" }
[threads.thread]
target_user_id = { optional = true, default = "", explanation = "ID user muốn lấy threads. Để trống dùng user của bạn.", example = "87654321" }
source = { optional = true, default = "user", options = ["user", "trending"], explanation = "Nguồn lấy bài viết: 'user' (từ user cụ thể) hoặc 'trending' (từ Trending now). Mặc định: user", example = "user" }
target_user_id = { optional = true, default = "", explanation = "ID user muốn lấy threads. Để trống dùng user của bạn. Chỉ dùng khi source = 'user'.", example = "87654321" }
post_id = { optional = true, default = "", explanation = "ID cụ thể của thread. Để trống để tự động chọn.", example = "18050000000000000" }
keywords = { optional = true, default = "", type = "str", explanation = "Từ khóa lọc threads, phân cách bằng dấu phẩy.", example = "viral, trending, hài hước" }
max_comment_length = { default = 500, optional = false, nmin = 10, nmax = 10000, type = "int", explanation = "Độ dài tối đa reply (ký tự). Mặc định: 500", example = 500, oob_error = "Phải trong khoảng 10-10000" }

Loading…
Cancel
Save