You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
186 lines
8.3 KiB
186 lines
8.3 KiB
"""Captures screenshots of Threads posts via the configured browser backend."""
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Final
|
|
|
|
from playwright.sync_api import Page, ViewportSize
|
|
|
|
from platforms.threads.auth import ensure_authenticated_context
|
|
from utils import settings
|
|
from utils.browser_backend import launch_browser
|
|
from utils.console import print_step, print_substep
|
|
|
|
|
|
def _dismiss_popups(page: Page) -> None:
|
|
"""Dismiss common Threads/Instagram overlays that block screenshots."""
|
|
dismissals = [
|
|
("[role='button']", "Not now"),
|
|
("button", "Allow all cookies"),
|
|
("[aria-label='Close']", None),
|
|
("div[role='dialog'] button", "Not Now"),
|
|
]
|
|
for selector, text in dismissals:
|
|
try:
|
|
if text:
|
|
el = page.locator(selector, has_text=text).first
|
|
else:
|
|
el = page.locator(selector).first
|
|
if el.count() and el.is_visible():
|
|
el.click()
|
|
page.wait_for_timeout(500)
|
|
print_substep(f"Dismissed popup: {selector}", style="dim")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def get_screenshots_of_threads_posts(content_object: dict, screenshot_num: int) -> None:
|
|
"""
|
|
Downloads screenshots of Threads posts via Playwright.
|
|
|
|
Args:
|
|
content_object: Standard content dict from platforms/threads/fetcher.py
|
|
screenshot_num: Number of reply screenshots to capture
|
|
"""
|
|
W: Final[int] = int(settings.config["settings"]["resolution_w"])
|
|
H: Final[int] = int(settings.config["settings"]["resolution_h"])
|
|
storymode: Final[bool] = settings.config["settings"]["storymode"]
|
|
|
|
print_step("Downloading screenshots of Threads posts...")
|
|
|
|
thread_id = re.sub(r"[^\w\s-]", "", content_object["thread_id"])
|
|
Path(f"assets/temp/{thread_id}/png").mkdir(parents=True, exist_ok=True)
|
|
|
|
theme = settings.config["settings"]["theme"]
|
|
|
|
# Device scale factor (higher resolution screenshots)
|
|
dsf = (W // 600) + 1
|
|
|
|
with launch_browser(headless=True) as browser:
|
|
print_substep("Launching headless browser...")
|
|
context = ensure_authenticated_context(
|
|
browser,
|
|
color_scheme="dark" if theme == "dark" else "light",
|
|
viewport=ViewportSize(width=W, height=H),
|
|
device_scale_factor=dsf,
|
|
)
|
|
|
|
# Screenshot the main post
|
|
page = context.new_page()
|
|
page.goto(content_object["thread_url"], timeout=0)
|
|
page.wait_for_load_state("networkidle")
|
|
page.wait_for_timeout(3000)
|
|
|
|
_dismiss_popups(page)
|
|
|
|
if page.locator('div[role="dialog"] button:has-text("Log in")').first.is_visible():
|
|
raise RuntimeError(
|
|
"Threads login overlay is blocking the page — saved cookies are stale. "
|
|
"Delete video_creation/data/cookie-threads.json and re-run."
|
|
)
|
|
|
|
postcontentpath = f"assets/temp/{thread_id}/png/title.png"
|
|
try:
|
|
# Threads.net uses div-based cards, not <article> elements.
|
|
# Find the first post link and screenshot its parent card.
|
|
post_link = page.locator('a[href*="/post/"]').first
|
|
if post_link.count() and post_link.is_visible():
|
|
# Screenshot the card container, or fall back to the link's parent
|
|
card = post_link.locator('xpath=ancestor::div[contains(@class, "x1a2a7pz")][1]')
|
|
if card.count():
|
|
post_locator = card.first
|
|
else:
|
|
post_locator = post_link
|
|
else:
|
|
# Fallback: try article (older Threads layout) or full page
|
|
post_locator = page.locator("article").first
|
|
if not post_locator.count() or not post_locator.is_visible():
|
|
post_locator = page.locator("body")
|
|
|
|
if settings.config["settings"].get("zoom", 1) != 1:
|
|
zoom = settings.config["settings"]["zoom"]
|
|
page.evaluate(f"document.body.style.zoom={zoom}")
|
|
location = post_locator.bounding_box()
|
|
if location:
|
|
for k in location:
|
|
location[k] = float("{:.2f}".format(location[k] * zoom))
|
|
page.screenshot(clip=location, path=postcontentpath)
|
|
else:
|
|
post_locator.screenshot(path=postcontentpath)
|
|
else:
|
|
post_locator.screenshot(path=postcontentpath)
|
|
|
|
print_substep("Main post screenshot captured.", style="bold green")
|
|
except Exception as e:
|
|
print_substep(f"Failed to screenshot main post: {e}", style="red")
|
|
raise
|
|
|
|
# Screenshots of replies — capture all from the main post page (single navigation)
|
|
if not storymode:
|
|
num_replies = min(screenshot_num, len(content_object["comments"]))
|
|
if num_replies > 0:
|
|
# Scroll to load all replies inline on the post page
|
|
print_substep("Loading replies on post page...", style="dim")
|
|
_dismiss_popups(page)
|
|
if page.locator('div[role="dialog"] button:has-text("Log in")').first.is_visible():
|
|
raise RuntimeError(
|
|
"Threads login overlay is blocking the page — saved cookies are stale. "
|
|
"Delete video_creation/data/cookie-threads.json and re-run."
|
|
)
|
|
last_count = 0
|
|
stable_count = 0
|
|
for _ in range(20):
|
|
page.evaluate("window.scrollBy(0, document.body.scrollHeight)")
|
|
page.wait_for_timeout(1000)
|
|
current = page.locator('a[href*="/post/"]').count()
|
|
if current == last_count:
|
|
stable_count += 1
|
|
if stable_count >= 3:
|
|
break
|
|
else:
|
|
stable_count = 0
|
|
last_count = current
|
|
|
|
for idx in range(num_replies):
|
|
comment = content_object["comments"][idx]
|
|
try:
|
|
reply_id = comment["comment_id"]
|
|
reply_link = page.locator(f'a[href*="/{reply_id}"]').first
|
|
if reply_link.count() and reply_link.is_visible():
|
|
card = reply_link.locator('xpath=ancestor::div[contains(@class, "x1a2a7pz")][1]')
|
|
reply_locator = card.first if card.count() else reply_link
|
|
# Scroll element into view so bounding_box works
|
|
reply_locator.scroll_into_view_if_needed()
|
|
page.wait_for_timeout(300)
|
|
else:
|
|
print_substep(f"Reply {idx} not found on post page. Skipping...", style="yellow")
|
|
continue
|
|
|
|
if settings.config["settings"].get("zoom", 1) != 1:
|
|
zoom = settings.config["settings"]["zoom"]
|
|
page.evaluate(f"document.body.style.zoom={zoom}")
|
|
location = reply_locator.bounding_box()
|
|
if location:
|
|
for k in location:
|
|
location[k] = float("{:.2f}".format(location[k] * zoom))
|
|
page.screenshot(
|
|
clip=location,
|
|
path=f"assets/temp/{thread_id}/png/comment_{idx}.png",
|
|
)
|
|
else:
|
|
reply_locator.screenshot(
|
|
path=f"assets/temp/{thread_id}/png/comment_{idx}.png"
|
|
)
|
|
else:
|
|
reply_locator.screenshot(
|
|
path=f"assets/temp/{thread_id}/png/comment_{idx}.png"
|
|
)
|
|
|
|
except Exception as e:
|
|
print_substep(f"Error capturing reply {idx}: {e}. Skipping...", style="yellow")
|
|
continue
|
|
|
|
print_substep(f"Reply screenshots captured ({num_replies} total).", style="bold green")
|
|
|
|
print_substep("Threads screenshots downloaded successfully.", style="bold green")
|