diff --git a/Dockerfile b/Dockerfile index 4b9ccf8..0e5cbd9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,6 +22,11 @@ RUN python -m playwright install --with-deps chromium COPY . . +RUN groupadd -r appuser && useradd -r -g appuser -d /app appuser \ + && chown -R appuser:appuser /app /ms-playwright + RUN chmod +x /app/docker-entrypoint.sh +USER appuser + ENTRYPOINT ["/bin/sh", "/app/docker-entrypoint.sh"] diff --git a/GUI.py b/GUI.py index e312e11..2fd8900 100644 --- a/GUI.py +++ b/GUI.py @@ -35,19 +35,35 @@ BROWSER_URL = os.environ.get("GUI_BROWSER_URL", f"http://localhost:{PORT}") # Configure application app = Flask(__name__, template_folder="GUI") -# Configure secret key only to use 'flash' -app.secret_key = b'_5#y2L"F4Q8z\n\xec]/' +# Configure secret key — env var for production, random per-startup for dev +app.secret_key = os.environ.get("FLASK_SECRET_KEY") or os.urandom(32) -# Ensure responses aren't cached +# Ensure responses aren't cached + security headers @app.after_request def after_request(response): response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" response.headers["Expires"] = 0 response.headers["Pragma"] = "no-cache" + response.headers["X-Content-Type-Options"] = "nosniff" + response.headers["X-Frame-Options"] = "DENY" return response +# Simple CSRF check: require same-origin for all mutating requests +@app.before_request +def csrf_check(): + if request.method in ("POST", "PUT", "PATCH", "DELETE"): + origin = request.headers.get("Origin") + if origin: + # Allow same-origin only (localhost dev ports) + from urllib.parse import urlparse + origin_host = urlparse(origin).hostname + request_host = urlparse(request.host_url).hostname + if origin_host not in (request_host, "localhost", "127.0.0.1"): + return jsonify({"error": "CSRF check failed"}), 403 + + # Display index.html @app.route("/") def index(): @@ -80,6 +96,18 @@ def background_delete(): return redirect(url_for("backgrounds")) +_SENSITIVE_KEYS = {"password", "client_secret", "access_token", "2fa_secret", + "tiktok_sessionid", "elevenlabs_api_key", "openai_api_key"} + + +def _redact_secrets(data: dict) -> dict: + """Return a copy with sensitive values masked for safe HTML embedding.""" + return { + k: ("********" if any(s in k for s in _SENSITIVE_KEYS) and v else v) + for k, v in data.items() + } + + @app.route("/settings", methods=["GET", "POST"]) def settings(): config_load = tomlkit.loads(Path("config.toml").read_text()) @@ -95,7 +123,7 @@ def settings(): # Change settings config = gui.modify_settings(data, config_load, checks) - return render_template("settings.html", file="config.toml", data=config, checks=checks) + return render_template("settings.html", file="config.toml", data=_redact_secrets(config), checks=checks) # Make videos.json accessible @@ -240,6 +268,17 @@ def _run_pipeline(search_queries=None): uconsole.set_progress_callback(on_progress) + # Reload pipeline modules so code edits take effect without restart + import importlib + import video_creation.final_video + import video_creation.background + import platforms.threads.screenshot + import main + importlib.reload(video_creation.final_video) + importlib.reload(video_creation.background) + importlib.reload(platforms.threads.screenshot) + importlib.reload(main) + from main import main as run_pipeline run_pipeline() diff --git a/TTS/engine_wrapper.py b/TTS/engine_wrapper.py index 954fe93..8e4e2a5 100644 --- a/TTS/engine_wrapper.py +++ b/TTS/engine_wrapper.py @@ -1,5 +1,6 @@ import os import re +import subprocess from pathlib import Path from typing import Tuple @@ -115,35 +116,32 @@ class TTSEngine: ] self.create_silence_mp3() + # Generate all TTS segment files first for idy, text_cut in enumerate(split_text): newtext = process_text(text_cut) - # print(f"{idx}-{idy}: {newtext}\n") - if not newtext or newtext.isspace(): - print("newtext was blank because sanitized split text resulted in none") continue - else: - self.call_tts(f"{idx}-{idy}.part", newtext) - with open(f"{self.path}/list.txt", "w") as f: - for idz in range(0, len(split_text)): - f.write("file " + f"'{idx}-{idz}.part.mp3'" + "\n") - split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3")) - f.write("file " + f"'silence.mp3'" + "\n") - - os.system( - "ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 " - + "-i " - + f"{self.path}/list.txt " - + "-c copy " - + f"{self.path}/{idx}.mp3" - ) - try: - for i in range(0, len(split_files)): - os.unlink(split_files[i]) - except FileNotFoundError as e: - print("File not found: " + e.filename) - except OSError: - print("OSError") + self.call_tts(f"{idx}-{idy}.part", newtext) + split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3")) + + # Write concat list referencing only generated files, then run ffmpeg once + list_path = f"{self.path}/list.txt" + with open(list_path, "w") as f: + for idz in range(len(split_files)): + f.write(f"file '{idx}-{idz}.part.mp3'\n") + f.write("file 'silence.mp3'\n") + + subprocess.run([ + "ffmpeg", "-f", "concat", "-y", "-hide_banner", "-loglevel", "panic", + "-safe", "0", "-i", list_path, "-c", "copy", + f"{self.path}/{idx}.mp3", + ], check=False) + + for part_path in split_files: + try: + os.unlink(part_path) + except FileNotFoundError: + pass def call_tts(self, filename: str, text: str): try: @@ -180,7 +178,8 @@ class TTSEngine: self.last_clip_length = clip.duration self.length += clip.duration clip.close() - except: + except (OSError, IOError, Exception) as e: + print_substep(f"Could not probe audio duration: {e}", "yellow") self.length = 0 def create_silence_mp3(self): diff --git a/main.py b/main.py index 3fcdcae..042099a 100755 --- a/main.py +++ b/main.py @@ -1,9 +1,9 @@ #!/usr/bin/env python import math +import subprocess import sys from os import name from pathlib import Path -from subprocess import Popen from typing import Dict, NoReturn, Union from platforms import get_content_object, get_screenshot_fn @@ -110,7 +110,7 @@ def run_many(times) -> None: f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}' ) main() - Popen("cls" if name == "nt" else "clear", shell=True).wait() + subprocess.run(["cls" if name == "nt" else "clear"], shell=(name == "nt")) def shutdown() -> NoReturn: @@ -158,7 +158,7 @@ if __name__ == "__main__": f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {num_posts}' ) main(post_id) - Popen("cls" if name == "nt" else "clear", shell=True).wait() + subprocess.run(["cls" if name == "nt" else "clear"], shell=(name == "nt")) elif config["settings"]["times_to_run"]: run_many(config["settings"]["times_to_run"]) else: @@ -171,14 +171,22 @@ if __name__ == "__main__": print_markdown("## Invalid Reddit credentials") print_markdown("Please check your credentials in the config.toml file") shutdown() - # Generic error handling for all other exceptions - config["settings"]["tts"]["tiktok_sessionid"] = "REDACTED" - config["settings"]["tts"]["elevenlabs_api_key"] = "REDACTED" - config["settings"]["tts"]["openai_api_key"] = "REDACTED" + # Generic error handling — redact secrets before printing + import copy + safe_config = copy.deepcopy(config) + for key in ("tiktok_sessionid", "elevenlabs_api_key", "openai_api_key", + "client_id", "client_secret", "access_token", "password", + "2fa_secret"): + safe_config.setdefault("settings", {}).setdefault("tts", {})[key] = "REDACTED" + for section in ("reddit", "threads"): + creds = safe_config.get(section, {}).get("creds", {}) + for cred_key in ("client_id", "client_secret", "password", "access_token", "2fa_secret"): + if cred_key in creds: + creds[cred_key] = "REDACTED" print_step( f"Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n" f"Version: {__VERSION__} \n" f"Error: {err} \n" - f'Config: {config["settings"]}' + f'Config: {safe_config.get("settings", {})}' ) - raise err + raise diff --git a/platforms/threads/screenshot.py b/platforms/threads/screenshot.py index 9d31749..2115b0f 100644 --- a/platforms/threads/screenshot.py +++ b/platforms/threads/screenshot.py @@ -92,55 +92,66 @@ def get_screenshots_of_threads_posts(content_object: dict, screenshot_num: int) print_substep(f"Failed to screenshot main post: {e}", style="red") raise - # Screenshots of replies + # Screenshots of replies — capture all from the main post page (single navigation) if not storymode: - for idx in range(min(screenshot_num, len(content_object["comments"]))): - comment = content_object["comments"][idx] - try: - page.goto(comment["comment_url"], timeout=0) - page.wait_for_load_state("networkidle") - page.wait_for_timeout(2000) - - # Threads.net uses div-based cards for replies too. - # Target the specific reply by its comment_id in the URL. - # Using .first would pick the main post (appears first in DOM). - reply_id = comment["comment_id"] - reply_link = page.locator(f'a[href*="/{reply_id}"]').first - if reply_link.count() and reply_link.is_visible(): - card = reply_link.locator('xpath=ancestor::div[contains(@class, "x1a2a7pz")][1]') - reply_locator = card.first if card.count() else reply_link + num_replies = min(screenshot_num, len(content_object["comments"])) + if num_replies > 0: + # Scroll to load all replies inline on the post page + print_substep("Loading replies on post page...", style="dim") + last_count = 0 + stable_count = 0 + for _ in range(20): + page.evaluate("window.scrollBy(0, document.body.scrollHeight)") + page.wait_for_timeout(1000) + current = page.locator('a[href*="/post/"]').count() + if current == last_count: + stable_count += 1 + if stable_count >= 3: + break else: - reply_locator = page.locator("article").first - if not reply_locator.count() or not reply_locator.is_visible(): - print_substep(f"Reply {idx} not found. Skipping...", style="yellow") - continue - - if settings.config["settings"].get("zoom", 1) != 1: - zoom = settings.config["settings"]["zoom"] - page.evaluate(f"document.body.style.zoom={zoom}") - location = reply_locator.bounding_box() - if location: - for k in location: - location[k] = float("{:.2f}".format(location[k] * zoom)) - page.screenshot( - clip=location, - path=f"assets/temp/{thread_id}/png/comment_{idx}.png", - ) + stable_count = 0 + last_count = current + + for idx in range(num_replies): + comment = content_object["comments"][idx] + try: + reply_id = comment["comment_id"] + reply_link = page.locator(f'a[href*="/{reply_id}"]').first + if reply_link.count() and reply_link.is_visible(): + card = reply_link.locator('xpath=ancestor::div[contains(@class, "x1a2a7pz")][1]') + reply_locator = card.first if card.count() else reply_link + # Scroll element into view so bounding_box works + reply_locator.scroll_into_view_if_needed() + page.wait_for_timeout(300) + else: + print_substep(f"Reply {idx} not found on post page. Skipping...", style="yellow") + continue + + if settings.config["settings"].get("zoom", 1) != 1: + zoom = settings.config["settings"]["zoom"] + page.evaluate(f"document.body.style.zoom={zoom}") + location = reply_locator.bounding_box() + if location: + for k in location: + location[k] = float("{:.2f}".format(location[k] * zoom)) + page.screenshot( + clip=location, + path=f"assets/temp/{thread_id}/png/comment_{idx}.png", + ) + else: + reply_locator.screenshot( + path=f"assets/temp/{thread_id}/png/comment_{idx}.png" + ) else: reply_locator.screenshot( path=f"assets/temp/{thread_id}/png/comment_{idx}.png" ) - else: - reply_locator.screenshot( - path=f"assets/temp/{thread_id}/png/comment_{idx}.png" - ) - except Exception as e: - print_substep(f"Error capturing reply {idx}: {e}. Skipping...", style="yellow") - # Don't crash; just skip this reply - continue + except Exception as e: + print_substep(f"Error capturing reply {idx}: {e}. Skipping...", style="yellow") + continue - print_substep(f"Reply screenshots captured ({min(screenshot_num, len(content_object['comments']))} total).", style="bold green") + print_substep(f"Reply screenshots captured ({num_replies} total).", style="bold green") browser.close() diff --git a/reddit/subreddit.py b/reddit/subreddit.py index aa34f84..25427c5 100644 --- a/reddit/subreddit.py +++ b/reddit/subreddit.py @@ -56,8 +56,8 @@ def get_subreddit_threads(POST_ID: str): except ResponseException as e: if e.response.status_code == 401: print("Invalid credentials - please check them in config.toml") - except: - print("Something went wrong...") + except Exception as e: + print(f"Something went wrong: {e}") # Ask user for subreddit input print_step("Getting subreddit threads...") diff --git a/utils/console.py b/utils/console.py index 1090421..795a27d 100644 --- a/utils/console.py +++ b/utils/console.py @@ -127,9 +127,9 @@ def handle_input( user_input = input("").strip() if check_type is not False: try: - isinstance(eval(user_input), check_type) # fixme: remove eval - return check_type(user_input) - except: + check_type(user_input) + return user_input + except (ValueError, TypeError): console.print( "[red bold]" + err_message diff --git a/utils/ffmpeg_install.py b/utils/ffmpeg_install.py index b41bad6..3f34ba1 100644 --- a/utils/ffmpeg_install.py +++ b/utils/ffmpeg_install.py @@ -1,5 +1,6 @@ import os import subprocess +import sys import zipfile import requests @@ -69,8 +70,7 @@ def ffmpeg_install_windows(): def ffmpeg_install_linux(): try: subprocess.run( - "sudo apt install ffmpeg", - shell=True, + ["sudo", "apt", "install", "-y", "ffmpeg"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) @@ -87,8 +87,7 @@ def ffmpeg_install_linux(): def ffmpeg_install_mac(): try: subprocess.run( - "brew install ffmpeg", - shell=True, + ["brew", "install", "ffmpeg"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) @@ -124,10 +123,10 @@ def ffmpeg_install(): print("Installing FFmpeg...") if os.name == "nt": ffmpeg_install_windows() + elif sys.platform == "darwin": + ffmpeg_install_mac() elif os.name == "posix": ffmpeg_install_linux() - elif os.name == "mac": - ffmpeg_install_mac() else: print("Your OS is not supported. Please install FFmpeg manually and try again.") exit() diff --git a/utils/gui_utils.py b/utils/gui_utils.py index 5b2b2bd..426d7c7 100644 --- a/utils/gui_utils.py +++ b/utils/gui_utils.py @@ -51,8 +51,8 @@ def check(value, checks): if not incorrect and "type" in checks: try: - value = eval(checks["type"])(value) # fixme remove eval - except Exception: + value = {"int": int, "float": float, "bool": bool, "str": str}.get(checks["type"], str)(value) + except (ValueError, TypeError): incorrect = True if ( @@ -178,7 +178,8 @@ def add_background(youtube_uri, filename, citation, position): flash('Position is invalid! It can be "center" or decimal number.', "error") return - # Sanitize filename + # Sanitize citation to prevent path traversal + citation = re.sub(r"[./\\]", "_", citation) regex = re.compile(r"^([a-zA-Z0-9\s_-]{1,100})$").match(filename) if not regex: diff --git a/utils/settings.py b/utils/settings.py index 6b8242b..45ef34f 100755 --- a/utils/settings.py +++ b/utils/settings.py @@ -10,6 +10,8 @@ from utils.console import handle_input console = Console() config = dict # autocomplete +_TYPE_COERCION = {"int": int, "float": float, "bool": bool, "str": str} + def crawl(obj: dict, func=lambda x, y: print(x, y, end="\n"), path=None): if path is None: # path Default argument value is mutable @@ -30,8 +32,8 @@ def check(value, checks, name): incorrect = True if not incorrect and "type" in checks: try: - value = eval(checks["type"])(value) # fixme remove eval - except: + value = _TYPE_COERCION.get(checks["type"], str)(value) + except (ValueError, TypeError): incorrect = True if ( @@ -78,7 +80,7 @@ def check(value, checks, name): + str(name) + "[#F7768E bold]=", extra_info=get_check_value("explanation", ""), - check_type=eval(get_check_value("type", "False")), # fixme remove eval + check_type=_TYPE_COERCION.get(get_check_value("type", ""), False), default=get_check_value("default", NotImplemented), match=get_check_value("regex", ""), err_message=get_check_value("input_error", "Incorrect input"), @@ -129,7 +131,8 @@ Overwrite it?(y/n)""" try: with open(config_file, "w") as f: f.write("") - except: + config = {} + except (OSError, IOError): console.print( f"[red bold]Failed to overwrite {config_file}. Giving up.\nSuggestion: check {config_file} permissions for the user." ) @@ -143,7 +146,7 @@ Creating it now.""" with open(config_file, "x") as f: f.write("") config = {} - except: + except (OSError, IOError): console.print( f"[red bold]Failed to write to {config_file}. Giving up.\nSuggestion: check the folder's permissions for the user." ) diff --git a/video_creation/background.py b/video_creation/background.py index 125374c..7cbf13b 100644 --- a/video_creation/background.py +++ b/video_creation/background.py @@ -39,7 +39,7 @@ def load_background_options(): pos = _background_options["video"][name][3] if pos != "center": - _background_options["video"][name][3] = lambda t: ("center", pos + t) + _background_options["video"][name][3] = lambda t, p=pos: ("center", p + t) return _background_options @@ -74,9 +74,25 @@ def get_background_config(mode: str): choice = None # Handle default / not supported background using default option. - # Default : pick random from supported background. + # Default : pick random from already-downloaded backgrounds if available, + # otherwise pick from all supported backgrounds. if not choice or choice not in background_options[mode]: - choice = random.choice(list(background_options[mode].keys())) + if mode == "video": + available = [ + k for k, v in background_options[mode].items() + if Path(f"assets/backgrounds/video/{v[2]}-{v[1]}").is_file() + ] + else: + available = [ + k for k, v in background_options[mode].items() + if Path(f"assets/backgrounds/audio/{v[2]}-{v[1]}").is_file() + ] + if available: + choice = random.choice(available) + print_substep(f"Picked random {mode} from downloaded: {choice}") + else: + choice = random.choice(list(background_options[mode].keys())) + print_substep(f"No downloaded {mode}s found. Picked: {choice} (will download)") return background_options[mode][choice] diff --git a/video_creation/final_video.py b/video_creation/final_video.py index a99b362..c255825 100644 --- a/video_creation/final_video.py +++ b/video_creation/final_video.py @@ -86,6 +86,10 @@ class ProgressFfmpeg(threading.Thread): def __exit__(self, *args, **kwargs): self.stop() + try: + os.unlink(self.output_file.name) + except OSError: + pass def name_normalize(name: str) -> str: @@ -277,6 +281,8 @@ def make_final_video( os.unlink(concat_list_path) # Probe durations + if not existing: + raise RuntimeError("No audio clips generated — all TTS segments failed to produce output") audio_clips_durations = [_probe_duration(p) for p in existing] # --- Step 3: Mix background audio --- @@ -326,20 +332,19 @@ def make_final_video( }) elif settings.config["settings"]["storymodemethod"] == 1: for i in range(number_of_clips): - img_path = f"assets/temp/{reddit_id}/png/img{i}.png" - if not os.path.exists(img_path): - continue dur_idx = i + 1 if dur_idx >= len(audio_clips_durations): break - overlay_items.append({ - "path": img_path, - "start_time": current_time, - "duration": audio_clips_durations[dur_idx], - "opacity": opacity, - "scale_w": screenshot_width, - "scale_h": -1, - }) + img_path = f"assets/temp/{reddit_id}/png/img{i}.png" + if os.path.exists(img_path): + overlay_items.append({ + "path": img_path, + "start_time": current_time, + "duration": audio_clips_durations[dur_idx], + "opacity": opacity, + "scale_w": screenshot_width, + "scale_h": -1, + }) current_time += audio_clips_durations[dur_idx] else: for i in range(number_of_clips): diff --git a/video_creation/screenshot_downloader.py b/video_creation/screenshot_downloader.py index 8dafaf6..96f8b26 100644 --- a/video_creation/screenshot_downloader.py +++ b/video_creation/screenshot_downloader.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Dict, Final import translators -from playwright.sync_api import ViewportSize, sync_playwright +from playwright.sync_api import TimeoutError as PlaywrightTimeoutError, ViewportSize, sync_playwright from rich.progress import track from utils import settings @@ -133,16 +133,12 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int): page.wait_for_load_state() page.wait_for_timeout(5000) - if page.locator( - "#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button" - ).is_visible(): - # This means the post is NSFW and requires to click the proceed button. - - print_substep("Post is NSFW. You are spicy...") - page.locator( - "#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button" - ).click() - page.wait_for_load_state() # Wait for page to fully load + # Dismiss NSFW warning overlay if present (generic: finds button in NSFW overlay by role) + nsfw_button = page.get_by_role("button", name="yes", exact=False).first + if nsfw_button.is_visible(): + print_substep("Post is NSFW. Attempting to proceed...") + nsfw_button.click() + page.wait_for_load_state() # translate code if page.locator( @@ -252,7 +248,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int): page.locator(f"#t1_{comment['comment_id']}").screenshot( path=f"assets/temp/{reddit_id}/png/comment_{idx}.png" ) - except TimeoutError: + except PlaywrightTimeoutError: del reddit_object["comments"] screenshot_num += 1 print("TimeoutError: Skipping screenshot...")