feat: Replace Reddit flow with Meta Threads API

- New threads/threads_api.py: fetch posts + replies via Threads Graph API
  - Endpoint: graph.threads.net/v1.0
  - Auth: access_token from config
  - Supports: specific post URL/ID or fetch user's recent threads
  - Filters replies by min/max length and blocked words
  - Returns dict compatible with existing TTS + video pipeline
- main.py: route through get_threads_posts() instead of Reddit
  - Reddit import commented out (not deleted)
  - Renamed POST_ID → POST_URL
  - Removed ResponseException handler (PRAW-specific)
  - Uses config[threads][thread][post_id] for batch processing
- config template: added [threads.creds] and [threads.thread] sections

Reddit code preserved in repo for reference but no longer called.

https://claude.ai/code/session_01G67vV3sJLXtm2q9r9FcF7L
pull/2541/head
Claude 2 weeks ago
parent 6487383933
commit 83257918b9
No known key found for this signature in database

@ -6,9 +6,9 @@ from pathlib import Path
from subprocess import Popen
from typing import Dict, NoReturn
from prawcore import ResponseException
from reddit.subreddit import get_subreddit_threads
# Reddit pipeline disabled — Threads is the active source.
# from reddit.subreddit import get_subreddit_threads
from threads.threads_api import get_threads_posts
from utils import settings
from utils.cleanup import cleanup
from utils.checkpoint import run_step, save_checkpoint, load_checkpoint, clear_checkpoint, print_resume_status
@ -47,14 +47,14 @@ reddit_id: str
reddit_object: Dict[str, str | list]
def main(POST_ID=None) -> None:
def main(POST_URL=None) -> None:
global reddit_id, reddit_object
# Step 1: Fetch Reddit threads (no checkpoint — reddit_id unknown yet)
reddit_object = get_subreddit_threads(POST_ID)
# Step 1: Fetch Threads post (no checkpoint — thread_id unknown yet)
reddit_object = get_threads_posts(POST_URL)
reddit_id = extract_id(reddit_object)
print_substep(f"Thread ID is {reddit_id}", style="bold blue")
save_checkpoint(reddit_id, "fetch_reddit", {"result": None})
save_checkpoint(reddit_id, "fetch_thread", {"result": None})
print_resume_status(reddit_id)
# Step 2: Generate TTS audio
@ -144,11 +144,15 @@ if __name__ == "__main__":
)
sys.exit()
try:
if config["reddit"]["thread"]["post_id"]:
for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")):
threads_post_id = (
config.get("threads", {}).get("thread", {}).get("post_id", "")
if isinstance(config.get("threads", {}), dict) else ""
)
if threads_post_id:
for index, post_id in enumerate(threads_post_id.split("+")):
index += 1
print_step(
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(threads_post_id.split("+"))}'
)
main(post_id)
Popen("cls" if name == "nt" else "clear", shell=True).wait()
@ -158,10 +162,6 @@ if __name__ == "__main__":
main()
except KeyboardInterrupt:
shutdown()
except ResponseException:
print_markdown("## Invalid credentials")
print_markdown("Please check your credentials in the config.toml file")
shutdown()
except Exception as err:
config["settings"]["tts"]["tiktok_sessionid"] = "REDACTED"
config["settings"]["tts"]["elevenlabs_api_key"] = "REDACTED"

@ -0,0 +1,171 @@
"""Threads API integration for fetching posts and replies.
Uses Meta's Threads Graph API (https://developers.facebook.com/docs/threads).
Requires an access token with 'threads_basic' and 'threads_read_replies' permissions.
"""
import json
import re
from pathlib import Path
from typing import Optional
import requests
from utils import settings
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
THREADS_API_BASE = "https://graph.threads.net/v1.0"
VIDEOS_DONE_FILE = "./video_creation/data/videos.json"
def _api_get(path: str, access_token: str, params: Optional[dict] = None) -> dict:
"""Call Threads Graph API GET endpoint."""
params = dict(params or {})
params["access_token"] = access_token
url = f"{THREADS_API_BASE}/{path.lstrip('/')}"
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
return response.json()
def _extract_thread_id(url_or_id: str) -> str:
"""Extract thread ID from a threads.net URL or return as-is if already an ID."""
match = re.search(r"/post/([A-Za-z0-9_-]+)", url_or_id)
return match.group(1) if match else url_or_id
def _fetch_thread_details(thread_id: str, access_token: str) -> dict:
fields = "id,text,username,timestamp,permalink,media_type,is_quote_post"
return _api_get(thread_id, access_token, {"fields": fields})
def _fetch_replies(thread_id: str, access_token: str, limit: int = 50) -> list[dict]:
fields = "id,text,username,timestamp,permalink"
try:
data = _api_get(
f"{thread_id}/replies",
access_token,
{"fields": fields, "limit": limit, "reverse": "false"},
)
return data.get("data", [])
except requests.HTTPError:
return []
def _fetch_user_threads(access_token: str, limit: int = 25) -> list[dict]:
fields = "id,text,username,timestamp,permalink,media_type"
data = _api_get(
"me/threads",
access_token,
{"fields": fields, "limit": limit},
)
return data.get("data", [])
def _is_valid_reply(text: str, min_len: int, max_len: int, blocked_words: list[str]) -> bool:
if not text or not text.strip():
return False
if len(text) < min_len or len(text) > max_len:
return False
lower = text.lower()
if any(w.strip().lower() in lower for w in blocked_words if w.strip()):
return False
if not sanitize_text(text):
return False
return True
def get_threads_posts(POST_URL: Optional[str] = None) -> dict:
"""Fetches a Threads post + replies. Returns a dict compatible with the video pipeline.
Args:
POST_URL: Optional specific thread URL or ID. If not provided, picks from
the authenticated user's recent threads.
Returns:
A dict with keys: thread_url, thread_title, thread_id, is_nsfw, comments,
optionally thread_post (for storymode).
"""
print_step("Fetching Threads post...")
try:
access_token = settings.config["threads"]["creds"]["access_token"]
except KeyError:
raise RuntimeError(
"Missing Threads access_token in config.toml under [threads.creds]"
)
if not access_token:
raise RuntimeError("Threads access_token is empty. Set it in config.toml.")
thread_cfg = settings.config["threads"]["thread"]
min_len = int(thread_cfg.get("min_comment_length", 10))
max_len = int(thread_cfg.get("max_comment_length", 500))
blocked_words = [
w for w in str(thread_cfg.get("blocked_words", "")).split(",") if w.strip()
]
target = POST_URL or thread_cfg.get("post_id") or ""
if target:
thread_id = _extract_thread_id(target)
submission = _fetch_thread_details(thread_id, access_token)
else:
print_substep("No post_id specified. Fetching authenticated user's recent threads.")
user_threads = _fetch_user_threads(access_token, limit=25)
if not user_threads:
raise RuntimeError("No threads found for authenticated user.")
submission = user_threads[0]
thread_id = submission["id"]
text = submission.get("text", "")
title = (text[:100] + "...") if len(text) > 100 else text
permalink = submission.get("permalink", f"https://www.threads.net/@unknown/post/{thread_id}")
print_substep(f"Thread: {title}", style="bold green")
print_substep(f"URL: {permalink}", style="bold blue")
content = {
"thread_url": permalink,
"thread_title": title or f"Thread {thread_id}",
"thread_id": thread_id,
"is_nsfw": False,
"comments": [],
}
if settings.config["settings"]["storymode"]:
content["thread_post"] = text
else:
replies = _fetch_replies(thread_id, access_token, limit=50)
for reply in replies:
body = reply.get("text", "")
if not _is_valid_reply(body, min_len, max_len, blocked_words):
continue
content["comments"].append({
"comment_body": body,
"comment_url": reply.get("permalink", ""),
"comment_id": reply["id"],
})
print_substep(
f"Got {len(content['comments'])} valid replies.",
style="bold green",
)
if _is_already_done(thread_id) and not target:
print_substep("Thread already processed. Fetch skipped.", style="yellow")
raise RuntimeError("Thread already processed. Set post_id to force reprocess.")
return content
def _is_already_done(thread_id: str) -> bool:
path = Path(VIDEOS_DONE_FILE)
if not path.exists():
return False
try:
done = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return False
return any(v.get("id") == thread_id for v in done)

@ -1,3 +1,12 @@
[threads.creds]
access_token = { optional = false, nmin = 10, explanation = "Meta Threads Graph API access token with threads_basic and threads_read_replies permissions", example = "THAAaBbCc123...", oob_error = "Access token too short" }
[threads.thread]
post_id = { optional = true, default = "", explanation = "Specific Threads post URL or ID. Use '+' to separate multiple IDs. Leave empty to fetch from your own recent threads.", example = "https://www.threads.net/@user/post/ABC123" }
max_comment_length = { default = 500, optional = false, nmin = 10, nmax = 10000, type = "int", explanation = "Max characters per reply" }
min_comment_length = { default = 10, optional = true, nmin = 0, nmax = 10000, type = "int", explanation = "Min characters per reply" }
blocked_words = { optional = true, default = "", type = "str", explanation = "Comma-separated words to exclude from replies", example = "spam, nsfw" }
[reddit.creds]
client_id = { optional = false, nmin = 12, nmax = 30, explanation = "The ID of your Reddit app of SCRIPT type", example = "fFAGRNJru1FTz70BzhT3Zg", regex = "^[-a-zA-Z0-9._~+/]+=*$", input_error = "The client ID can only contain printable characters.", oob_error = "The ID should be over 12 and under 30 characters, double check your input." }
client_secret = { optional = false, nmin = 20, nmax = 40, explanation = "The SECRET of your Reddit app of SCRIPT type", example = "fFAGRNJru1FTz70BzhT3Zg", regex = "^[-a-zA-Z0-9._~+/]+=*$", input_error = "The client ID can only contain printable characters.", oob_error = "The secret should be over 20 and under 40 characters, double check your input." }

Loading…
Cancel
Save