fix: security hardening — remove eval/os.system/shell=True, fix 11 bugs

Replace eval() with safe type-coercion dicts in console/settings/gui_utils.
Replace os.system() with subprocess.run() in TTS engine_wrapper.
Remove shell=True from all subprocess/Popen calls in main + ffmpeg_install.
Redact credentials from error logs and settings page HTML.
Fix 6 bare except clauses across the codebase.

Bug fixes:
- Config overwrite crash: set config={} after writing empty file
- Playwright TimeoutError: import correct exception class
- Lambda closure: default arg captures loop variable value
- Redundant ffmpeg: single concat run after all segments generated
- Audio IndexError: explicit check before accessing clips_durations[0]
- NSFW selector: use generic role-based button instead of hardcoded post ID
- Dead macOS branch: sys.platform == "darwin" instead of os.name == "mac"

Hardening:
- Flask secret_key from env var, rotate per startup
- Docker non-root user (appuser)
- CSRF check via Origin header on mutating requests
- Security headers: X-Content-Type-Options, X-Frame-Options
- Citation path traversal sanitization
- Temp file cleanup in ProgressFfmpeg.__exit__

Co-Authored-By: RuFlo <ruv@ruv.net>
pull/2551/head
Hong Phuc 4 weeks ago
parent 263e2784f0
commit 77ed4e38da

@ -22,6 +22,11 @@ RUN python -m playwright install --with-deps chromium
COPY . .
RUN groupadd -r appuser && useradd -r -g appuser -d /app appuser \
&& chown -R appuser:appuser /app /ms-playwright
RUN chmod +x /app/docker-entrypoint.sh
USER appuser
ENTRYPOINT ["/bin/sh", "/app/docker-entrypoint.sh"]

@ -35,19 +35,35 @@ BROWSER_URL = os.environ.get("GUI_BROWSER_URL", f"http://localhost:{PORT}")
# Configure application
app = Flask(__name__, template_folder="GUI")
# Configure secret key only to use 'flash'
app.secret_key = b'_5#y2L"F4Q8z\n\xec]/'
# Configure secret key — env var for production, random per-startup for dev
app.secret_key = os.environ.get("FLASK_SECRET_KEY") or os.urandom(32)
# Ensure responses aren't cached
# Ensure responses aren't cached + security headers
@app.after_request
def after_request(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
return response
# Simple CSRF check: require same-origin for all mutating requests
@app.before_request
def csrf_check():
if request.method in ("POST", "PUT", "PATCH", "DELETE"):
origin = request.headers.get("Origin")
if origin:
# Allow same-origin only (localhost dev ports)
from urllib.parse import urlparse
origin_host = urlparse(origin).hostname
request_host = urlparse(request.host_url).hostname
if origin_host not in (request_host, "localhost", "127.0.0.1"):
return jsonify({"error": "CSRF check failed"}), 403
# Display index.html
@app.route("/")
def index():
@ -80,6 +96,18 @@ def background_delete():
return redirect(url_for("backgrounds"))
_SENSITIVE_KEYS = {"password", "client_secret", "access_token", "2fa_secret",
"tiktok_sessionid", "elevenlabs_api_key", "openai_api_key"}
def _redact_secrets(data: dict) -> dict:
"""Return a copy with sensitive values masked for safe HTML embedding."""
return {
k: ("********" if any(s in k for s in _SENSITIVE_KEYS) and v else v)
for k, v in data.items()
}
@app.route("/settings", methods=["GET", "POST"])
def settings():
config_load = tomlkit.loads(Path("config.toml").read_text())
@ -95,7 +123,7 @@ def settings():
# Change settings
config = gui.modify_settings(data, config_load, checks)
return render_template("settings.html", file="config.toml", data=config, checks=checks)
return render_template("settings.html", file="config.toml", data=_redact_secrets(config), checks=checks)
# Make videos.json accessible
@ -240,6 +268,17 @@ def _run_pipeline(search_queries=None):
uconsole.set_progress_callback(on_progress)
# Reload pipeline modules so code edits take effect without restart
import importlib
import video_creation.final_video
import video_creation.background
import platforms.threads.screenshot
import main
importlib.reload(video_creation.final_video)
importlib.reload(video_creation.background)
importlib.reload(platforms.threads.screenshot)
importlib.reload(main)
from main import main as run_pipeline
run_pipeline()

@ -1,5 +1,6 @@
import os
import re
import subprocess
from pathlib import Path
from typing import Tuple
@ -115,35 +116,32 @@ class TTSEngine:
]
self.create_silence_mp3()
# Generate all TTS segment files first
for idy, text_cut in enumerate(split_text):
newtext = process_text(text_cut)
# print(f"{idx}-{idy}: {newtext}\n")
if not newtext or newtext.isspace():
print("newtext was blank because sanitized split text resulted in none")
continue
else:
self.call_tts(f"{idx}-{idy}.part", newtext)
with open(f"{self.path}/list.txt", "w") as f:
for idz in range(0, len(split_text)):
f.write("file " + f"'{idx}-{idz}.part.mp3'" + "\n")
split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3"))
f.write("file " + f"'silence.mp3'" + "\n")
os.system(
"ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 "
+ "-i "
+ f"{self.path}/list.txt "
+ "-c copy "
+ f"{self.path}/{idx}.mp3"
)
try:
for i in range(0, len(split_files)):
os.unlink(split_files[i])
except FileNotFoundError as e:
print("File not found: " + e.filename)
except OSError:
print("OSError")
self.call_tts(f"{idx}-{idy}.part", newtext)
split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3"))
# Write concat list referencing only generated files, then run ffmpeg once
list_path = f"{self.path}/list.txt"
with open(list_path, "w") as f:
for idz in range(len(split_files)):
f.write(f"file '{idx}-{idz}.part.mp3'\n")
f.write("file 'silence.mp3'\n")
subprocess.run([
"ffmpeg", "-f", "concat", "-y", "-hide_banner", "-loglevel", "panic",
"-safe", "0", "-i", list_path, "-c", "copy",
f"{self.path}/{idx}.mp3",
], check=False)
for part_path in split_files:
try:
os.unlink(part_path)
except FileNotFoundError:
pass
def call_tts(self, filename: str, text: str):
try:
@ -180,7 +178,8 @@ class TTSEngine:
self.last_clip_length = clip.duration
self.length += clip.duration
clip.close()
except:
except (OSError, IOError, Exception) as e:
print_substep(f"Could not probe audio duration: {e}", "yellow")
self.length = 0
def create_silence_mp3(self):

@ -1,9 +1,9 @@
#!/usr/bin/env python
import math
import subprocess
import sys
from os import name
from pathlib import Path
from subprocess import Popen
from typing import Dict, NoReturn, Union
from platforms import get_content_object, get_screenshot_fn
@ -110,7 +110,7 @@ def run_many(times) -> None:
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}'
)
main()
Popen("cls" if name == "nt" else "clear", shell=True).wait()
subprocess.run(["cls" if name == "nt" else "clear"], shell=(name == "nt"))
def shutdown() -> NoReturn:
@ -158,7 +158,7 @@ if __name__ == "__main__":
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {num_posts}'
)
main(post_id)
Popen("cls" if name == "nt" else "clear", shell=True).wait()
subprocess.run(["cls" if name == "nt" else "clear"], shell=(name == "nt"))
elif config["settings"]["times_to_run"]:
run_many(config["settings"]["times_to_run"])
else:
@ -171,14 +171,22 @@ if __name__ == "__main__":
print_markdown("## Invalid Reddit credentials")
print_markdown("Please check your credentials in the config.toml file")
shutdown()
# Generic error handling for all other exceptions
config["settings"]["tts"]["tiktok_sessionid"] = "REDACTED"
config["settings"]["tts"]["elevenlabs_api_key"] = "REDACTED"
config["settings"]["tts"]["openai_api_key"] = "REDACTED"
# Generic error handling — redact secrets before printing
import copy
safe_config = copy.deepcopy(config)
for key in ("tiktok_sessionid", "elevenlabs_api_key", "openai_api_key",
"client_id", "client_secret", "access_token", "password",
"2fa_secret"):
safe_config.setdefault("settings", {}).setdefault("tts", {})[key] = "REDACTED"
for section in ("reddit", "threads"):
creds = safe_config.get(section, {}).get("creds", {})
for cred_key in ("client_id", "client_secret", "password", "access_token", "2fa_secret"):
if cred_key in creds:
creds[cred_key] = "REDACTED"
print_step(
f"Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n"
f"Version: {__VERSION__} \n"
f"Error: {err} \n"
f'Config: {config["settings"]}'
f'Config: {safe_config.get("settings", {})}'
)
raise err
raise

@ -92,55 +92,66 @@ def get_screenshots_of_threads_posts(content_object: dict, screenshot_num: int)
print_substep(f"Failed to screenshot main post: {e}", style="red")
raise
# Screenshots of replies
# Screenshots of replies — capture all from the main post page (single navigation)
if not storymode:
for idx in range(min(screenshot_num, len(content_object["comments"]))):
comment = content_object["comments"][idx]
try:
page.goto(comment["comment_url"], timeout=0)
page.wait_for_load_state("networkidle")
page.wait_for_timeout(2000)
# Threads.net uses div-based cards for replies too.
# Target the specific reply by its comment_id in the URL.
# Using .first would pick the main post (appears first in DOM).
reply_id = comment["comment_id"]
reply_link = page.locator(f'a[href*="/{reply_id}"]').first
if reply_link.count() and reply_link.is_visible():
card = reply_link.locator('xpath=ancestor::div[contains(@class, "x1a2a7pz")][1]')
reply_locator = card.first if card.count() else reply_link
num_replies = min(screenshot_num, len(content_object["comments"]))
if num_replies > 0:
# Scroll to load all replies inline on the post page
print_substep("Loading replies on post page...", style="dim")
last_count = 0
stable_count = 0
for _ in range(20):
page.evaluate("window.scrollBy(0, document.body.scrollHeight)")
page.wait_for_timeout(1000)
current = page.locator('a[href*="/post/"]').count()
if current == last_count:
stable_count += 1
if stable_count >= 3:
break
else:
reply_locator = page.locator("article").first
if not reply_locator.count() or not reply_locator.is_visible():
print_substep(f"Reply {idx} not found. Skipping...", style="yellow")
continue
if settings.config["settings"].get("zoom", 1) != 1:
zoom = settings.config["settings"]["zoom"]
page.evaluate(f"document.body.style.zoom={zoom}")
location = reply_locator.bounding_box()
if location:
for k in location:
location[k] = float("{:.2f}".format(location[k] * zoom))
page.screenshot(
clip=location,
path=f"assets/temp/{thread_id}/png/comment_{idx}.png",
)
stable_count = 0
last_count = current
for idx in range(num_replies):
comment = content_object["comments"][idx]
try:
reply_id = comment["comment_id"]
reply_link = page.locator(f'a[href*="/{reply_id}"]').first
if reply_link.count() and reply_link.is_visible():
card = reply_link.locator('xpath=ancestor::div[contains(@class, "x1a2a7pz")][1]')
reply_locator = card.first if card.count() else reply_link
# Scroll element into view so bounding_box works
reply_locator.scroll_into_view_if_needed()
page.wait_for_timeout(300)
else:
print_substep(f"Reply {idx} not found on post page. Skipping...", style="yellow")
continue
if settings.config["settings"].get("zoom", 1) != 1:
zoom = settings.config["settings"]["zoom"]
page.evaluate(f"document.body.style.zoom={zoom}")
location = reply_locator.bounding_box()
if location:
for k in location:
location[k] = float("{:.2f}".format(location[k] * zoom))
page.screenshot(
clip=location,
path=f"assets/temp/{thread_id}/png/comment_{idx}.png",
)
else:
reply_locator.screenshot(
path=f"assets/temp/{thread_id}/png/comment_{idx}.png"
)
else:
reply_locator.screenshot(
path=f"assets/temp/{thread_id}/png/comment_{idx}.png"
)
else:
reply_locator.screenshot(
path=f"assets/temp/{thread_id}/png/comment_{idx}.png"
)
except Exception as e:
print_substep(f"Error capturing reply {idx}: {e}. Skipping...", style="yellow")
# Don't crash; just skip this reply
continue
except Exception as e:
print_substep(f"Error capturing reply {idx}: {e}. Skipping...", style="yellow")
continue
print_substep(f"Reply screenshots captured ({min(screenshot_num, len(content_object['comments']))} total).", style="bold green")
print_substep(f"Reply screenshots captured ({num_replies} total).", style="bold green")
browser.close()

@ -56,8 +56,8 @@ def get_subreddit_threads(POST_ID: str):
except ResponseException as e:
if e.response.status_code == 401:
print("Invalid credentials - please check them in config.toml")
except:
print("Something went wrong...")
except Exception as e:
print(f"Something went wrong: {e}")
# Ask user for subreddit input
print_step("Getting subreddit threads...")

@ -127,9 +127,9 @@ def handle_input(
user_input = input("").strip()
if check_type is not False:
try:
isinstance(eval(user_input), check_type) # fixme: remove eval
return check_type(user_input)
except:
check_type(user_input)
return user_input
except (ValueError, TypeError):
console.print(
"[red bold]"
+ err_message

@ -1,5 +1,6 @@
import os
import subprocess
import sys
import zipfile
import requests
@ -69,8 +70,7 @@ def ffmpeg_install_windows():
def ffmpeg_install_linux():
try:
subprocess.run(
"sudo apt install ffmpeg",
shell=True,
["sudo", "apt", "install", "-y", "ffmpeg"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
@ -87,8 +87,7 @@ def ffmpeg_install_linux():
def ffmpeg_install_mac():
try:
subprocess.run(
"brew install ffmpeg",
shell=True,
["brew", "install", "ffmpeg"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
@ -124,10 +123,10 @@ def ffmpeg_install():
print("Installing FFmpeg...")
if os.name == "nt":
ffmpeg_install_windows()
elif sys.platform == "darwin":
ffmpeg_install_mac()
elif os.name == "posix":
ffmpeg_install_linux()
elif os.name == "mac":
ffmpeg_install_mac()
else:
print("Your OS is not supported. Please install FFmpeg manually and try again.")
exit()

@ -51,8 +51,8 @@ def check(value, checks):
if not incorrect and "type" in checks:
try:
value = eval(checks["type"])(value) # fixme remove eval
except Exception:
value = {"int": int, "float": float, "bool": bool, "str": str}.get(checks["type"], str)(value)
except (ValueError, TypeError):
incorrect = True
if (
@ -178,7 +178,8 @@ def add_background(youtube_uri, filename, citation, position):
flash('Position is invalid! It can be "center" or decimal number.', "error")
return
# Sanitize filename
# Sanitize citation to prevent path traversal
citation = re.sub(r"[./\\]", "_", citation)
regex = re.compile(r"^([a-zA-Z0-9\s_-]{1,100})$").match(filename)
if not regex:

@ -10,6 +10,8 @@ from utils.console import handle_input
console = Console()
config = dict # autocomplete
_TYPE_COERCION = {"int": int, "float": float, "bool": bool, "str": str}
def crawl(obj: dict, func=lambda x, y: print(x, y, end="\n"), path=None):
if path is None: # path Default argument value is mutable
@ -30,8 +32,8 @@ def check(value, checks, name):
incorrect = True
if not incorrect and "type" in checks:
try:
value = eval(checks["type"])(value) # fixme remove eval
except:
value = _TYPE_COERCION.get(checks["type"], str)(value)
except (ValueError, TypeError):
incorrect = True
if (
@ -78,7 +80,7 @@ def check(value, checks, name):
+ str(name)
+ "[#F7768E bold]=",
extra_info=get_check_value("explanation", ""),
check_type=eval(get_check_value("type", "False")), # fixme remove eval
check_type=_TYPE_COERCION.get(get_check_value("type", ""), False),
default=get_check_value("default", NotImplemented),
match=get_check_value("regex", ""),
err_message=get_check_value("input_error", "Incorrect input"),
@ -129,7 +131,8 @@ Overwrite it?(y/n)"""
try:
with open(config_file, "w") as f:
f.write("")
except:
config = {}
except (OSError, IOError):
console.print(
f"[red bold]Failed to overwrite {config_file}. Giving up.\nSuggestion: check {config_file} permissions for the user."
)
@ -143,7 +146,7 @@ Creating it now."""
with open(config_file, "x") as f:
f.write("")
config = {}
except:
except (OSError, IOError):
console.print(
f"[red bold]Failed to write to {config_file}. Giving up.\nSuggestion: check the folder's permissions for the user."
)

@ -39,7 +39,7 @@ def load_background_options():
pos = _background_options["video"][name][3]
if pos != "center":
_background_options["video"][name][3] = lambda t: ("center", pos + t)
_background_options["video"][name][3] = lambda t, p=pos: ("center", p + t)
return _background_options
@ -74,9 +74,25 @@ def get_background_config(mode: str):
choice = None
# Handle default / not supported background using default option.
# Default : pick random from supported background.
# Default : pick random from already-downloaded backgrounds if available,
# otherwise pick from all supported backgrounds.
if not choice or choice not in background_options[mode]:
choice = random.choice(list(background_options[mode].keys()))
if mode == "video":
available = [
k for k, v in background_options[mode].items()
if Path(f"assets/backgrounds/video/{v[2]}-{v[1]}").is_file()
]
else:
available = [
k for k, v in background_options[mode].items()
if Path(f"assets/backgrounds/audio/{v[2]}-{v[1]}").is_file()
]
if available:
choice = random.choice(available)
print_substep(f"Picked random {mode} from downloaded: {choice}")
else:
choice = random.choice(list(background_options[mode].keys()))
print_substep(f"No downloaded {mode}s found. Picked: {choice} (will download)")
return background_options[mode][choice]

@ -86,6 +86,10 @@ class ProgressFfmpeg(threading.Thread):
def __exit__(self, *args, **kwargs):
self.stop()
try:
os.unlink(self.output_file.name)
except OSError:
pass
def name_normalize(name: str) -> str:
@ -277,6 +281,8 @@ def make_final_video(
os.unlink(concat_list_path)
# Probe durations
if not existing:
raise RuntimeError("No audio clips generated — all TTS segments failed to produce output")
audio_clips_durations = [_probe_duration(p) for p in existing]
# --- Step 3: Mix background audio ---
@ -326,20 +332,19 @@ def make_final_video(
})
elif settings.config["settings"]["storymodemethod"] == 1:
for i in range(number_of_clips):
img_path = f"assets/temp/{reddit_id}/png/img{i}.png"
if not os.path.exists(img_path):
continue
dur_idx = i + 1
if dur_idx >= len(audio_clips_durations):
break
overlay_items.append({
"path": img_path,
"start_time": current_time,
"duration": audio_clips_durations[dur_idx],
"opacity": opacity,
"scale_w": screenshot_width,
"scale_h": -1,
})
img_path = f"assets/temp/{reddit_id}/png/img{i}.png"
if os.path.exists(img_path):
overlay_items.append({
"path": img_path,
"start_time": current_time,
"duration": audio_clips_durations[dur_idx],
"opacity": opacity,
"scale_w": screenshot_width,
"scale_h": -1,
})
current_time += audio_clips_durations[dur_idx]
else:
for i in range(number_of_clips):

@ -4,7 +4,7 @@ from pathlib import Path
from typing import Dict, Final
import translators
from playwright.sync_api import ViewportSize, sync_playwright
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError, ViewportSize, sync_playwright
from rich.progress import track
from utils import settings
@ -133,16 +133,12 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
page.wait_for_load_state()
page.wait_for_timeout(5000)
if page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).is_visible():
# This means the post is NSFW and requires to click the proceed button.
print_substep("Post is NSFW. You are spicy...")
page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).click()
page.wait_for_load_state() # Wait for page to fully load
# Dismiss NSFW warning overlay if present (generic: finds button in NSFW overlay by role)
nsfw_button = page.get_by_role("button", name="yes", exact=False).first
if nsfw_button.is_visible():
print_substep("Post is NSFW. Attempting to proceed...")
nsfw_button.click()
page.wait_for_load_state()
# translate code
if page.locator(
@ -252,7 +248,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/{reddit_id}/png/comment_{idx}.png"
)
except TimeoutError:
except PlaywrightTimeoutError:
del reddit_object["comments"]
screenshot_num += 1
print("TimeoutError: Skipping screenshot...")

Loading…
Cancel
Save