Refactor: Comprehensive improvements to logging, error handling, and code quality

This commit introduces a series of improvements across the codebase:

1.  **Logging:**
    *   Integrated Python's `logging` module throughout the application.
    *   Replaced custom `print_step/print_substep` functions with standard logging calls (INFO, WARNING, ERROR, DEBUG).
    *   Configured file logging (rotating logs) and console logging (using RichHandler if available).
    *   Enhanced FFmpeg error logs to include commands and stderr.

2.  **Error Handling:**
    *   Made error handling more robust in TTS modules, Playwright interactions (screenshotting), API calls, and file operations.
    *   Replaced `sys.exit()` calls in modules with exceptions to allow better control by the caller.
    *   Improved reporting of unhandled exceptions in `main.py` with tracebacks to the log file.

3.  **Code Quality & Modularity:**
    *   Refactored `main.py` and `video_creation/final_video.py` into smaller, more manageable functions, reducing global state and improving readability.
    *   Replaced `eval()` in configuration parsing (`utils/settings.py`, `utils/gui_utils.py`) with safer type conversion methods.
    *   Decoupled background choice management from the static TOML template, now dynamically loaded from `backgrounds.json`.
    *   Standardized path handling using `pathlib.Path` across numerous modules, replacing `os.path` and manual string concatenations.
    *   Corrected OS dispatcher logic in `utils/ffmpeg_install.py`.
    *   Clarified voice selection logic in `TTS/pyttsx.py`.

4.  **FFmpeg Interaction:**
    *   Reviewed and refined the `ProgressFfmpeg` class for more robust progress parsing and thread management.

5.  **Testing Groundwork:**
    *   Created `tests/` directory and added initial unit tests for utility functions `name_normalize` and `_safe_str_to_bool` using `unittest`.

6.  **Documentation:**
    *   Added/updated module-level and key function docstrings in `main.py` and `video_creation/final_video.py`.
pull/2364/head
google-labs-jules[bot] 2 months ago
parent aa15172430
commit 47612a09ac

274
GUI.py

@ -1,116 +1,158 @@
import webbrowser
from pathlib import Path
# Used "tomlkit" instead of "toml" because it doesn't change formatting on "dump"
import tomlkit
from flask import (
Flask,
redirect,
render_template,
request,
send_from_directory,
url_for,
)
import utils.gui_utils as gui
# Set the hostname
HOST = "localhost"
# Set the port number
PORT = 4000
# Configure application
app = Flask(__name__, template_folder="GUI")
# Configure secret key only to use 'flash'
app.secret_key = b'_5#y2L"F4Q8z\n\xec]/'
# Ensure responses aren't cached
@app.after_request
def after_request(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
# Display index.html
@app.route("/")
def index():
return render_template("index.html", file="videos.json")
@app.route("/backgrounds", methods=["GET"])
def backgrounds():
return render_template("backgrounds.html", file="backgrounds.json")
@app.route("/background/add", methods=["POST"])
def background_add():
# Get form values
youtube_uri = request.form.get("youtube_uri").strip()
filename = request.form.get("filename").strip()
citation = request.form.get("citation").strip()
position = request.form.get("position").strip()
gui.add_background(youtube_uri, filename, citation, position)
return redirect(url_for("backgrounds"))
@app.route("/background/delete", methods=["POST"])
def background_delete():
key = request.form.get("background-key")
gui.delete_background(key)
return redirect(url_for("backgrounds"))
@app.route("/settings", methods=["GET", "POST"])
def settings():
config_load = tomlkit.loads(Path("config.toml").read_text())
config = gui.get_config(config_load)
# Get checks for all values
checks = gui.get_checks()
if request.method == "POST":
# Get data from form as dict
data = request.form.to_dict()
# Change settings
config = gui.modify_settings(data, config_load, checks)
return render_template("settings.html", file="config.toml", data=config, checks=checks)
# Make videos.json accessible
@app.route("/videos.json")
def videos_json():
return send_from_directory("video_creation/data", "videos.json")
# Make backgrounds.json accessible
@app.route("/backgrounds.json")
def backgrounds_json():
return send_from_directory("utils", "backgrounds.json")
# Make videos in results folder accessible
@app.route("/results/<path:name>")
def results(name):
return send_from_directory("results", name, as_attachment=True)
# Make voices samples in voices folder accessible
@app.route("/voices/<path:name>")
def voices(name):
return send_from_directory("GUI/voices", name, as_attachment=True)
# Run browser and start the app
if __name__ == "__main__":
webbrowser.open(f"http://{HOST}:{PORT}", new=2)
print("Website opened in new tab. Refresh if it didn't load.")
app.run(port=PORT)
import json # Added import
import webbrowser
from pathlib import Path
# Used "tomlkit" instead of "toml" because it doesn't change formatting on "dump"
import tomlkit
from flask import (
Flask,
redirect,
render_template,
request,
send_from_directory,
url_for,
)
import utils.gui_utils as gui
# Set the hostname
HOST = "localhost"
# Set the port number
PORT = 4000
# Configure application
app = Flask(__name__, template_folder="GUI")
# Configure secret key only to use 'flash'
app.secret_key = b'_5#y2L"F4Q8z\n\xec]/'
# Ensure responses aren't cached
@app.after_request
def after_request(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
# Display index.html
@app.route("/")
def index():
return render_template("index.html", file="videos.json")
@app.route("/backgrounds", methods=["GET"])
def backgrounds():
return render_template("backgrounds.html", file="backgrounds.json")
@app.route("/background/add", methods=["POST"])
def background_add():
# Get form values
youtube_uri = request.form.get("youtube_uri").strip()
filename = request.form.get("filename").strip()
citation = request.form.get("citation").strip()
position = request.form.get("position").strip()
gui.add_background(youtube_uri, filename, citation, position)
return redirect(url_for("backgrounds"))
@app.route("/background/delete", methods=["POST"])
def background_delete():
key = request.form.get("background-key")
gui.delete_background(key)
return redirect(url_for("backgrounds"))
@app.route("/settings", methods=["GET", "POST"])
def settings():
config_load = tomlkit.loads(Path("config.toml").read_text())
config = gui.get_config(config_load)
# Get checks for all values
checks = gui.get_checks()
# Dynamically load background choices for the settings page
available_backgrounds = []
backgrounds_json_path = Path("utils/backgrounds.json")
if backgrounds_json_path.exists():
try:
with open(backgrounds_json_path, "r", encoding="utf-8") as f:
background_data = json.load(f)
available_backgrounds = sorted(list(background_data.keys())) # Sort for consistent order
except (json.JSONDecodeError, IOError) as e:
# Log this error or flash a message if persistent issues occur
app.logger.warning(f"Could not load background choices from {backgrounds_json_path}: {e}")
pass # Keep available_backgrounds empty
if request.method == "POST":
# Get data from form as dict
data = request.form.to_dict()
# Change settings
# The gui.modify_settings function will internally use gui.check,
# which now uses safe type conversion.
# Validation of 'background_choice' against available_backgrounds
# should ideally happen within gui.check if 'options' were dynamic,
# or here before calling gui.modify_settings.
# For now, relying on utils.settings.py to do the final validation run
# when the main script loads the config.
config = gui.modify_settings(data, config_load, checks)
# It's good practice to redirect after a POST to prevent re-submission
# However, the current structure re-renders. If issues arise, consider redirect:
# return redirect(url_for('settings'))
# For now, we need to re-fetch the (potentially modified) flat config for rendering
config = gui.get_config(config_load)
# Add available_backgrounds to the template context.
# The settings.html template will need to be updated to use this.
# Example for the dropdown in settings.html:
#
# <label for="background_choice">Background Choice:</label>
# <select name="background_choice" id="background_choice">
# {% for bg_name in available_backgrounds %}
# <option value="{{ bg_name }}" {% if bg_name == data.get('background_choice') %}selected{% endif %}>
# {{ bg_name }}
# </option>
# {% endfor %}
# </select>
#
# Note: `data.get('background_choice')` refers to the current config value for background_choice.
return render_template("settings.html", file="config.toml", data=config, checks=checks, available_backgrounds=available_backgrounds)
# Make videos.json accessible
@app.route("/videos.json")
def videos_json():
return send_from_directory("video_creation/data", "videos.json")
# Make backgrounds.json accessible
@app.route("/backgrounds.json")
def backgrounds_json():
return send_from_directory("utils", "backgrounds.json")
# Make videos in results folder accessible
@app.route("/results/<path:name>")
def results(name):
return send_from_directory("results", name, as_attachment=True)
# Make voices samples in voices folder accessible
@app.route("/voices/<path:name>")
def voices(name):
return send_from_directory("GUI/voices", name, as_attachment=True)
# Run browser and start the app
if __name__ == "__main__":
webbrowser.open(f"http://{HOST}:{PORT}", new=2)
print("Website opened in new tab. Refresh if it didn't load.")
app.run(port=PORT)

@ -1,22 +1,41 @@
import random
from gtts import gTTS
import logging # Added for logging
from gtts import gTTS, gTTSError
from utils import settings
logger = logging.getLogger(__name__)
class GTTS:
def __init__(self):
self.max_chars = 5000
self.voices = []
logger.debug("Initializing GTTS engine.")
self.max_chars = 5000 # gTTS has its own limits, but this is for consistency if we pre-validate.
# self.voices = [] # gTTS doesn't have selectable voices in the same way as pyttsx or TikTok; lang is the main variant.
def run(self, text: str, filepath: str):
language = settings.config["reddit"]["thread"]["post_lang"] or "en"
logger.info(f"Requesting GTTS for text: '{text[:30]}...' using lang: '{language}'. Output: {filepath}")
def run(self, text, filepath):
tts = gTTS(
text=text,
lang=settings.config["reddit"]["thread"]["post_lang"] or "en",
slow=False,
)
tts.save(filepath)
try:
tts = gTTS(
text=text,
lang=language,
slow=False, # Speed is not highly configurable; 'slow' is the only option.
)
logger.debug(f"Saving GTTS audio to {filepath}")
tts.save(filepath)
logger.info(f"Successfully saved GTTS audio to {filepath}")
except gTTSError as e: # Catch specific gTTS errors
logger.error(f"gTTS API error: {e}", exc_info=True)
# Decide if to raise a custom exception or re-raise
raise RuntimeError(f"gTTS failed: {e}")
except Exception as e: # Catch any other unexpected errors during gTTS processing
logger.error(f"An unexpected error occurred with GTTS: {e}", exc_info=True)
raise RuntimeError(f"Unexpected GTTS failure: {e}")
def randomvoice(self):
return random.choice(self.voices)
# gTTS language is the primary "voice" variant. No list of voices to pick from.
# This method might be redundant for GTTS or could return a random language if desired.
# For now, it's not actively used by the engine_wrapper for GTTS in a meaningful way.
logger.debug("randomvoice called for GTTS, but GTTS primarily uses language codes, not distinct voices.")
return settings.config["reddit"]["thread"]["post_lang"] or "en" # Return current lang as a placeholder

@ -1,5 +1,6 @@
# documentation for tiktok api: https://github.com/oscie57/tiktok-voice/wiki
import base64
import logging # Added for logging
import random
import time
from typing import Final, Optional
@ -75,11 +76,14 @@ vocals: Final[tuple] = (
"en_female_ht_f08_wonderful_world", # Dramatic
)
logger = logging.getLogger(__name__)
class TikTok:
"""TikTok Text-to-Speech Wrapper"""
def __init__(self):
logger.debug("Initializing TikTok TTS session.")
headers = {
"User-Agent": "com.zhiliaoapp.musically/2022600030 (Linux; U; Android 7.1.2; es_ES; SM-G988N; "
"Build/NRD90M;tt-ok/3.12.13.1)",
@ -94,53 +98,87 @@ class TikTok:
self._session.headers = headers
def run(self, text: str, filepath: str, random_voice: bool = False):
logger.info(f"Requesting TikTok TTS for text: '{text[:30]}...' Output: {filepath}")
if random_voice:
voice = self.random_voice()
logger.debug(f"Using random TikTok voice: {voice}")
else:
# if tiktok_voice is not set in the config file, then use a random voice
voice = settings.config["settings"]["tts"].get("tiktok_voice", None)
if voice:
logger.debug(f"Using configured TikTok voice: {voice}")
else:
logger.debug("No specific TikTok voice configured, API will choose.")
# get the audio from the TikTok API
data = self.get_voices(voice=voice, text=text)
# check if there was an error in the request
status_code = data["status_code"]
status_code = data.get("status_code") # Use .get for safer access
if status_code != 0:
raise TikTokTTSException(status_code, data["message"])
message = data.get("message", "Unknown error from TikTok API")
logger.error(f"TikTok TTS API error. Status: {status_code}, Message: {message}")
raise TikTokTTSException(status_code, message)
# decode data from base64 to binary
try:
raw_voices = data["data"]["v_str"]
except:
print(
"The TikTok TTS returned an invalid response. Please try again later, and report this bug."
)
raise TikTokTTSException(0, "Invalid response")
except KeyError: # More specific exception
logger.error("TikTok TTS returned an invalid response: 'data' or 'v_str' key missing. Full response: %s", data)
raise TikTokTTSException(0, "Invalid response structure from TikTok API")
logger.debug("Decoding base64 audio data.")
decoded_voices = base64.b64decode(raw_voices)
# write voices to specified filepath
with open(filepath, "wb") as out:
out.write(decoded_voices)
try:
with open(filepath, "wb") as out:
out.write(decoded_voices)
logger.info(f"Successfully saved TikTok TTS audio to {filepath}")
except IOError as e:
logger.error(f"Failed to write TikTok TTS audio to {filepath}: {e}", exc_info=True)
raise # Re-raise the IOError
def get_voices(self, text: str, voice: Optional[str] = None) -> dict:
"""If voice is not passed, the API will try to use the most fitting voice"""
# sanitize text
text = text.replace("+", "plus").replace("&", "and").replace("r/", "")
sanitized_text = text.replace("+", "plus").replace("&", "and").replace("r/", "")
logger.debug(f"Sanitized text for TikTok API: '{sanitized_text[:50]}...'")
# prepare url request
params = {"req_text": text, "speaker_map_type": 0, "aid": 1233}
params = {"req_text": sanitized_text, "speaker_map_type": 0, "aid": 1233}
if voice is not None:
params["text_speaker"] = voice
# send request
logger.debug(f"Sending POST request to TikTok TTS API: {self.URI_BASE} with params: {params}")
try:
response = self._session.post(self.URI_BASE, params=params, timeout=10) # Added timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
except requests.exceptions.ConnectionError as e:
logger.warning(f"Connection error during TikTok TTS request: {e}. Retrying after delay...")
time.sleep(random.uniform(1, 5)) # Use uniform for float sleep times
try:
response = self._session.post(self.URI_BASE, params=params, timeout=15) # Longer timeout for retry
response.raise_for_status()
except requests.exceptions.RequestException as retry_e: # Catch any request exception on retry
logger.error(f"TikTok TTS request failed after retry: {retry_e}", exc_info=True)
# Return a dict that mimics an error response from the API
return {"status_code": -1, "message": f"Request failed after retry: {retry_e}"}
except requests.exceptions.HTTPError as e: # Handle HTTP errors (4xx, 5xx)
logger.error(f"TikTok TTS API returned HTTP error: {e.response.status_code} {e.response.reason}. Response: {e.response.text[:200]}")
# Try to parse JSON even on HTTP error, as API might still return JSON error message
try:
return e.response.json()
except ValueError: # If response is not JSON
return {"status_code": e.response.status_code, "message": e.response.reason}
except requests.exceptions.Timeout as e:
logger.error(f"TikTok TTS request timed out: {e}")
return {"status_code": -2, "message": f"Request timed out: {e}"}
except requests.exceptions.RequestException as e: # Catch other request-related errors
logger.error(f"TikTok TTS request failed: {e}", exc_info=True)
return {"status_code": -3, "message": f"Request failed: {e}"}
try:
response = self._session.post(self.URI_BASE, params=params)
except ConnectionError:
time.sleep(random.randrange(1, 7))
response = self._session.post(self.URI_BASE, params=params)
return response.json()
except ValueError as e: # If response is not JSON
logger.error(f"TikTok TTS API did not return valid JSON. Status: {response.status_code}, Response: {response.text[:200]}. Error: {e}")
return {"status_code": -4, "message": "Invalid JSON response from API"}
return response.json()
@staticmethod
def random_voice() -> str:

@ -1,5 +1,6 @@
import random
import sys
import logging # Added for logging
from boto3 import Session
from botocore.exceptions import BotoCoreError, ClientError, ProfileNotFound
@ -24,54 +25,77 @@ voices = [
"Raveena",
]
logger = logging.getLogger(__name__)
class AWSPolly:
def __init__(self):
self.max_chars = 3000
self.voices = voices
logger.debug("Initializing AWS Polly TTS engine.")
self.max_chars = 3000 # Max characters for Polly synthesize_speech if not using SSML.
self.voices = voices # Keep this list for random selection and validation.
def run(self, text, filepath, random_voice: bool = False):
def run(self, text: str, filepath: str, random_voice: bool = False):
logger.info(f"Requesting AWS Polly TTS for text: '{text[:30]}...' Output: {filepath}")
try:
session = Session(profile_name="polly")
# It's good practice to fetch profile from config or environment variables
# rather than hardcoding "polly" if flexibility is needed.
# For now, assuming "polly" profile is standard for this app.
profile_name = settings.config["settings"]["tts"].get("aws_profile_name") or "polly"
logger.debug(f"Attempting to create AWS session with profile: {profile_name}")
session = Session(profile_name=profile_name)
polly = session.client("polly")
logger.debug("AWS session and Polly client created successfully.")
selected_voice_id = ""
if random_voice:
voice = self.randomvoice()
selected_voice_id = self.randomvoice()
logger.debug(f"Using random AWS Polly voice: {selected_voice_id}")
else:
if not settings.config["settings"]["tts"]["aws_polly_voice"]:
raise ValueError(
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}"
)
voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize()
try:
# Request speech synthesis
response = polly.synthesize_speech(
Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural"
)
except (BotoCoreError, ClientError) as error:
# The service returned an error, exit gracefully
print(error)
sys.exit(-1)
selected_voice_id = settings.config["settings"]["tts"].get("aws_polly_voice")
if not selected_voice_id:
logger.error(f"AWS Polly voice not set in config. Available options: {self.voices}")
raise ValueError(f"AWS_VOICE not set. Options: {self.voices}")
selected_voice_id = selected_voice_id.capitalize()
if selected_voice_id not in self.voices:
logger.error(f"Invalid AWS Polly voice '{selected_voice_id}' in config. Available: {self.voices}")
raise ValueError(f"Invalid AWS_VOICE '{selected_voice_id}'. Options: {self.voices}")
logger.debug(f"Using configured AWS Polly voice: {selected_voice_id}")
# Request speech synthesis
logger.debug(f"Synthesizing speech with Polly. VoiceId: {selected_voice_id}, Engine: neural")
response = polly.synthesize_speech(
Text=text, OutputFormat="mp3", VoiceId=selected_voice_id, Engine="neural" # Consider making Engine configurable
)
# Access the audio stream from the response
if "AudioStream" in response:
file = open(filepath, "wb")
file.write(response["AudioStream"].read())
file.close()
# print_substep(f"Saved Text {idx} to MP3 files successfully.", style="bold green")
logger.debug("AudioStream received from Polly. Writing to file.")
with open(filepath, "wb") as audio_file:
audio_file.write(response["AudioStream"].read())
logger.info(f"Successfully saved AWS Polly TTS audio to {filepath}")
else:
# The response didn't contain audio data, exit gracefully
print("Could not stream audio")
sys.exit(-1)
except ProfileNotFound:
print("You need to install the AWS CLI and configure your profile")
print(
"""
Linux: https://docs.aws.amazon.com/polly/latest/dg/setup-aws-cli.html
Windows: https://docs.aws.amazon.com/polly/latest/dg/install-voice-plugin2.html
"""
)
sys.exit(-1)
logger.error("Could not stream audio from Polly response. 'AudioStream' not in response.")
# Log part of the response if it's small enough and doesn't contain sensitive info
logger.debug(f"Polly response without AudioStream: {str(response)[:200]}")
raise RuntimeError("AWS Polly: Could not stream audio, 'AudioStream' missing from response.")
except ProfileNotFound as e:
logger.error(f"AWS profile '{profile_name}' not found: {e}. Please configure AWS CLI.")
logger.error("Refer to AWS documentation for setup: "
"Linux: https://docs.aws.amazon.com/polly/latest/dg/setup-aws-cli.html, "
"Windows: https://docs.aws.amazon.com/polly/latest/dg/install-voice-plugin2.html")
# sys.exit(-1) is too abrupt for a library. Raise an exception.
raise RuntimeError(f"AWS Profile '{profile_name}' not found. Configure AWS CLI.")
except (BotoCoreError, ClientError) as error:
logger.error(f"AWS Polly API error: {error}", exc_info=True)
raise RuntimeError(f"AWS Polly API error: {error}")
except ValueError as e: # Catch voice configuration errors
logger.error(f"Configuration error for AWS Polly: {e}")
raise # Re-raise to be handled by calling code
except Exception as e: # Catch any other unexpected errors
logger.error(f"An unexpected error occurred with AWS Polly: {e}", exc_info=True)
raise RuntimeError(f"Unexpected AWS Polly failure: {e}")
def randomvoice(self):
return random.choice(self.voices)
def randomvoice(self) -> str:
choice = random.choice(self.voices)
logger.debug(f"Randomly selected AWS Polly voice: {choice}")
return choice

@ -1,38 +1,96 @@
import random
from elevenlabs import save
import logging # Added for logging
from elevenlabs import save, APIError # Import APIError for specific exception handling
from elevenlabs.client import ElevenLabs
from utils import settings
logger = logging.getLogger(__name__)
class elevenlabs:
def __init__(self):
self.max_chars = 2500
logger.debug("Initializing ElevenLabs TTS engine (client will be created on first run or randomvoice call).")
self.max_chars = 2500 # Character limit for ElevenLabs (check their current limits)
self.client: ElevenLabs = None
self.available_voices = [] # To store fetched voice names
def run(self, text, filepath, random_voice: bool = False):
def _ensure_client_initialized(self):
"""Initializes the ElevenLabs client if not already done."""
if self.client is None:
self.initialize()
logger.info("ElevenLabs client not initialized. Initializing now...")
api_key = settings.config["settings"]["tts"].get("elevenlabs_api_key")
if not api_key:
logger.error("ElevenLabs API key is not set in config (ELEVENLABS_API_KEY).")
raise ValueError("ElevenLabs API key is missing. Please set ELEVENLABS_API_KEY in config.")
try:
self.client = ElevenLabs(api_key=api_key)
# Fetch and store available voices upon successful initialization
all_voices_response = self.client.voices.get_all()
self.available_voices = [v.name for v in all_voices_response.voices if v.name]
if not self.available_voices:
logger.warning("No voices returned from ElevenLabs API after initialization.")
else:
logger.debug(f"Fetched {len(self.available_voices)} voices from ElevenLabs: {self.available_voices}")
logger.info("ElevenLabs client initialized successfully.")
except APIError as e:
logger.error(f"Failed to initialize ElevenLabs client due to API error: {e}", exc_info=True)
raise RuntimeError(f"ElevenLabs API initialization failed: {e}")
except Exception as e: # Catch other potential errors during client init
logger.error(f"An unexpected error occurred during ElevenLabs client initialization: {e}", exc_info=True)
raise RuntimeError(f"Unexpected error initializing ElevenLabs client: {e}")
def run(self, text: str, filepath: str, random_voice: bool = False):
self._ensure_client_initialized()
selected_voice_name = ""
if random_voice:
voice = self.randomvoice()
selected_voice_name = self.randomvoice() # randomvoice now also ensures client init
logger.debug(f"Using random ElevenLabs voice: {selected_voice_name}")
else:
voice = str(settings.config["settings"]["tts"]["elevenlabs_voice_name"]).capitalize()
selected_voice_name = settings.config["settings"]["tts"].get("elevenlabs_voice_name")
if not selected_voice_name:
logger.error("ElevenLabs voice name (elevenlabs_voice_name) not set in config.")
# Fallback to a random voice if no specific voice is set, or raise error
# For now, let's try a random voice as a fallback.
logger.warning("elevenlabs_voice_name not set. Attempting to use a random voice.")
selected_voice_name = self.randomvoice()
if not selected_voice_name: # If randomvoice also fails to find one
logger.error("No ElevenLabs voice configured and no random voice available.")
raise ValueError("ElevenLabs voice not configured and no random voice found.")
else:
# Check if configured voice is in available list (case-sensitive for ElevenLabs names usually)
if self.available_voices and selected_voice_name not in self.available_voices:
logger.warning(f"Configured ElevenLabs voice '{selected_voice_name}' not found in fetched available voices. "
f"Available: {self.available_voices}. Attempting to use it anyway.")
logger.debug(f"Using configured ElevenLabs voice: {selected_voice_name}")
audio = self.client.generate(text=text, voice=voice, model="eleven_multilingual_v1")
save(audio=audio, filename=filepath)
logger.info(f"Requesting ElevenLabs TTS for text: '{text[:30]}...' Voice: {selected_voice_name}. Output: {filepath}")
def initialize(self):
if settings.config["settings"]["tts"]["elevenlabs_api_key"]:
api_key = settings.config["settings"]["tts"]["elevenlabs_api_key"]
else:
raise ValueError(
"You didn't set an Elevenlabs API key! Please set the config variable ELEVENLABS_API_KEY to a valid API key."
)
try:
# Consider making model configurable e.g. "eleven_multilingual_v2"
audio = self.client.generate(text=text, voice=selected_voice_name, model="eleven_multilingual_v1")
logger.debug(f"Saving ElevenLabs audio to {filepath}")
save(audio=audio, filename=filepath)
logger.info(f"Successfully saved ElevenLabs TTS audio to {filepath}")
except APIError as e:
logger.error(f"ElevenLabs API error during audio generation or save: {e}", exc_info=True)
raise RuntimeError(f"ElevenLabs API operation failed: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred with ElevenLabs processing: {e}", exc_info=True)
raise RuntimeError(f"Unexpected ElevenLabs failure: {e}")
self.client = ElevenLabs(api_key=api_key)
def randomvoice(self):
if self.client is None:
self.initialize()
return random.choice(self.client.voices.get_all().voices).voice_name
def randomvoice(self) -> str:
self._ensure_client_initialized() # Ensure client and self.available_voices are populated
if not self.available_voices:
logger.error("No voices available from ElevenLabs to choose randomly.")
# This could raise an error or return a default/empty string depending on desired strictness
raise RuntimeError("ElevenLabs: No voices available for random selection.")
choice = random.choice(self.available_voices)
logger.debug(f"Randomly selected ElevenLabs voice: {choice}")
return choice

@ -1,14 +1,17 @@
import random
import logging # Added for logging
import pyttsx3
from utils import settings
logger = logging.getLogger(__name__)
class pyttsx:
def __init__(self):
self.max_chars = 5000
self.voices = []
logger.debug("Initializing pyttsx TTS engine.")
self.max_chars = 5000 # Max characters, not currently enforced by pyttsx3 directly but good for consistency
self.available_voice_indices = [] # Store available voice indices
def run(
self,
@ -16,27 +19,73 @@ class pyttsx:
filepath: str,
random_voice=False,
):
voice_id = settings.config["settings"]["tts"]["python_voice"]
voice_num = settings.config["settings"]["tts"]["py_voice_num"]
if voice_id == "" or voice_num == "":
voice_id = 2
voice_num = 3
raise ValueError("set pyttsx values to a valid value, switching to defaults")
else:
voice_id = int(voice_id)
voice_num = int(voice_num)
for i in range(voice_num):
self.voices.append(i)
i = +1
voice_id_str = settings.config["settings"]["tts"].get("python_voice", "0") # Default to "0" if not set
# py_voice_num seems to indicate the number of voices to consider, not directly used for selection by ID.
# The old logic for py_voice_num was confusing. We'll rely on pyttsx3 to list available voices.
try:
selected_voice_idx = int(voice_id_str)
except ValueError:
logger.warning(f"Invalid pyttsx voice ID '{voice_id_str}' in config. Defaulting to voice index 0.")
selected_voice_idx = 0
logger.info(f"Requesting pyttsx TTS for text: '{text[:30]}...' Output: {filepath}")
try:
engine = pyttsx3.init()
except Exception as e:
logger.error(f"Failed to initialize pyttsx3 engine: {e}", exc_info=True)
raise RuntimeError(f"pyttsx3 engine initialization failed: {e}")
available_voices = engine.getProperty("voices")
if not available_voices:
logger.error("No voices found by pyttsx3 engine.")
raise RuntimeError("pyttsx3 found no available voices.")
self.available_voice_indices = list(range(len(available_voices)))
if random_voice:
voice_id = self.randomvoice()
engine = pyttsx3.init()
voices = engine.getProperty("voices")
engine.setProperty(
"voice", voices[voice_id].id
) # changing index changes voices but ony 0 and 1 are working here
engine.save_to_file(text, f"{filepath}")
engine.runAndWait()
def randomvoice(self):
return random.choice(self.voices)
if not self.available_voice_indices:
logger.warning("No available voices for random selection in pyttsx. Using default index 0.")
final_voice_to_use_idx = 0
else:
final_voice_to_use_idx = self.randomvoice()
logger.debug(f"Using random pyttsx voice index: {final_voice_to_use_idx}")
else:
final_voice_to_use_idx = selected_voice_idx
logger.debug(f"Using configured pyttsx voice index: {final_voice_to_use_idx}")
if not (0 <= final_voice_to_use_idx < len(available_voices)):
logger.warning(
f"Selected pyttsx voice index {final_voice_to_use_idx} is out of range (0-{len(available_voices)-1}). "
f"Falling back to voice index 0."
)
final_voice_to_use_idx = 0
if not available_voices: # Should be caught earlier, but as a safeguard
logger.error("Critical: No voices available even for fallback.")
raise RuntimeError("No pyttsx voices available for fallback.")
try:
voice_to_set = available_voices[final_voice_to_use_idx].id
logger.debug(f"Setting pyttsx voice to ID: {voice_to_set} (Index: {final_voice_to_use_idx}, Name: {available_voices[final_voice_to_use_idx].name})")
engine.setProperty("voice", voice_to_set)
logger.debug(f"Saving pyttsx TTS audio to {filepath} for text: '{text[:50]}...'")
engine.save_to_file(text, filepath) # Corrected filepath variable
engine.runAndWait()
logger.info(f"Successfully saved pyttsx TTS audio to {filepath}")
except IndexError: # Should be caught by above checks, but good safeguard
logger.error(f"Internal error: pyttsx voice index {final_voice_to_use_idx} became invalid.", exc_info=True)
raise RuntimeError("Failed to set pyttsx voice due to an internal indexing error.")
except Exception as e: # Catch other pyttsx3 errors
logger.error(f"Error during pyttsx3 operation (setProperty, save_to_file, runAndWait): {e}", exc_info=True)
raise RuntimeError(f"pyttsx3 operation failed: {e}")
def randomvoice(self) -> int:
"""Returns a random valid voice index."""
if not self.available_voice_indices:
logger.warning("Attempted to get random pyttsx voice, but no voices seem available. Defaulting to index 0.")
return 0 # Fallback, though this case should ideally be handled before calling
return random.choice(self.available_voice_indices)

@ -1,10 +1,12 @@
import random
import logging # Added for logging
import time # For potential sleep on rate limit
import requests
from requests.exceptions import JSONDecodeError
from requests.exceptions import JSONDecodeError, RequestException # Import base RequestException
from utils import settings
from utils.voice import check_ratelimit
from utils.voice import check_ratelimit # This function likely needs logging too
voices = [
"Brian",
@ -27,41 +29,99 @@ voices = [
# valid voices https://lazypy.ro/tts/
logger = logging.getLogger(__name__)
class StreamlabsPolly:
def __init__(self):
logger.debug("Initializing Streamlabs Polly TTS engine.")
self.url = "https://streamlabs.com/polly/speak"
self.max_chars = 550
self.voices = voices
self.voices = voices # Keep for validation and random selection
def run(self, text, filepath, random_voice: bool = False):
def run(self, text: str, filepath: str, random_voice: bool = False, retry_count=0):
max_retries = 3 # Max retries for rate limiting or transient errors
logger.info(f"Requesting Streamlabs Polly TTS for text: '{text[:30]}...' Output: {filepath}")
selected_voice = ""
if random_voice:
voice = self.randomvoice()
selected_voice = self.randomvoice()
logger.debug(f"Using random Streamlabs Polly voice: {selected_voice}")
else:
if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]:
raise ValueError(
f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}"
)
voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize()
selected_voice = settings.config["settings"]["tts"].get("streamlabs_polly_voice")
if not selected_voice:
logger.error(f"Streamlabs Polly voice not set. Available: {self.voices}")
raise ValueError(f"STREAMLABS_POLLY_VOICE not set. Options: {self.voices}")
selected_voice = selected_voice.capitalize()
if selected_voice not in self.voices:
logger.error(f"Invalid Streamlabs Polly voice '{selected_voice}' in config. Available: {self.voices}")
raise ValueError(f"Invalid STREAMLABS_POLLY_VOICE '{selected_voice}'. Options: {self.voices}")
logger.debug(f"Using configured Streamlabs Polly voice: {selected_voice}")
body = {"voice": voice, "text": text, "service": "polly"}
headers = {"Referer": "https://streamlabs.com/"}
response = requests.post(self.url, headers=headers, data=body)
body = {"voice": selected_voice, "text": text, "service": "polly"}
headers = {"Referer": "https://streamlabs.com/"} # Important for this unofficial API
if not check_ratelimit(response):
self.run(text, filepath, random_voice)
try:
logger.debug(f"Posting to Streamlabs Polly API: {self.url} with voice: {selected_voice}")
response = requests.post(self.url, headers=headers, data=body, timeout=10)
response.raise_for_status() # Check for HTTP errors
except RequestException as e:
logger.error(f"Streamlabs Polly request failed: {e}", exc_info=True)
if retry_count < max_retries:
logger.info(f"Retrying Streamlabs Polly request ({retry_count+1}/{max_retries})...")
time.sleep(2 ** retry_count) # Exponential backoff
return self.run(text, filepath, random_voice, retry_count + 1)
raise RuntimeError(f"Streamlabs Polly request failed after {max_retries} retries: {e}")
else:
try:
voice_data = requests.get(response.json()["speak_url"])
with open(filepath, "wb") as f:
f.write(voice_data.content)
except (KeyError, JSONDecodeError):
try:
if response.json()["error"] == "No text specified!":
raise ValueError("Please specify a text to convert to speech.")
except (KeyError, JSONDecodeError):
print("Error occurred calling Streamlabs Polly")
def randomvoice(self):
return random.choice(self.voices)
# check_ratelimit likely prints and might call sys.exit or recurse.
# This needs to be handled better. For now, assume it returns True if okay.
if not check_ratelimit(response): # Assuming check_ratelimit returns True if NOT rate limited
logger.warning("Streamlabs Polly rate limit hit or other issue indicated by check_ratelimit.")
if retry_count < max_retries:
logger.info(f"Retrying Streamlabs Polly due to rate limit ({retry_count+1}/{max_retries})...")
time.sleep(random.uniform(5, 10) * (retry_count + 1)) # Longer, randomized sleep for rate limits
return self.run(text, filepath, random_voice, retry_count + 1)
logger.error("Streamlabs Polly rate limit persists after retries.")
raise RuntimeError("Streamlabs Polly rate limited after multiple retries.")
try:
response_json = response.json()
speak_url = response_json.get("speak_url")
if not speak_url:
error_message = response_json.get("error", "Unknown error from Streamlabs Polly (speak_url missing).")
logger.error(f"Streamlabs Polly API error: {error_message}. Full response: {response_json}")
if error_message == "No text specified!": # Specific known error
raise ValueError("Streamlabs Polly: No text specified to convert to speech.")
raise RuntimeError(f"Streamlabs Polly API error: {error_message}")
logger.debug(f"Fetching audio from speak_url: {speak_url}")
voice_data_response = requests.get(speak_url, timeout=15)
voice_data_response.raise_for_status() # Check for HTTP errors on speak_url
with open(filepath, "wb") as f:
f.write(voice_data_response.content)
logger.info(f"Successfully saved Streamlabs Polly TTS audio to {filepath}")
except JSONDecodeError as e:
logger.error(f"Failed to decode JSON response from Streamlabs Polly: {e}. Response text: {response.text[:200]}", exc_info=True)
raise RuntimeError(f"Streamlabs Polly returned non-JSON response: {e}")
except KeyError : # Should be caught by speak_url check now
logger.error(f"Unexpected response structure from Streamlabs Polly (KeyError). Response: {response.text[:200]}", exc_info=True)
raise RuntimeError("Streamlabs Polly: Unexpected response structure.")
except RequestException as e: # For the GET request to speak_url
logger.error(f"Failed to fetch audio from Streamlabs Polly speak_url: {e}", exc_info=True)
raise RuntimeError(f"Streamlabs Polly audio fetch failed: {e}")
except IOError as e:
logger.error(f"Failed to write Streamlabs Polly audio to {filepath}: {e}", exc_info=True)
raise # Re-raise IOError
except ValueError as e: # Re-raise specific ValueErrors
raise
except Exception as e: # Catch-all for other unexpected errors
logger.error(f"An unexpected error occurred with Streamlabs Polly processing: {e}", exc_info=True)
raise RuntimeError(f"Unexpected Streamlabs Polly failure: {e}")
def randomvoice(self) -> str:
choice = random.choice(self.voices)
logger.debug(f"Randomly selected Streamlabs Polly voice: {choice}")
return choice

@ -1,10 +1,20 @@
#!/usr/bin/env python
"""
Main script for the Reddit Video Maker Bot.
This script orchestrates the process of fetching Reddit content,
generating audio and video components, and compiling them into a final video.
It handles configuration loading, application initialization, and error management.
"""
import math
import sys
import re # Added for reddit_id extraction
import logging # Added for logging
import logging.handlers # Added for logging
from os import name
from pathlib import Path
from subprocess import Popen
from typing import NoReturn
from typing import NoReturn, Dict, Any, Tuple
import argparse
from prawcore import ResponseException
@ -14,7 +24,10 @@ from utils import settings
from utils.cleanup import cleanup
from utils.console import print_markdown, print_step, print_substep
from utils.ffmpeg_install import ffmpeg_install
from utils.id import id
# from utils.id import id # This import seems unused and id is a python built-in.
# If utils.id.id() was intended, it was shadowed by the global redditid.
# Assuming it was for generating a unique ID from the reddit object,
# this functionality will be implicitly handled by using reddit_object["thread_id"]
from utils.version import checkversion
from video_creation.background import (
chop_background,
@ -28,8 +41,13 @@ from video_creation.voices import save_text_to_mp3, TTSProviders
__VERSION__ = "3.3.0"
print(
"""
# Store the original reddit_id for cleanup at shutdown
_current_reddit_id_for_cleanup = None
def display_banner_and_initial_message():
"""Prints the welcome banner and initial informational message."""
print(
"""
@ -37,49 +55,15 @@ print(
"""
)
print_markdown(
"### Thanks for using this tool! Feel free to contribute to this project on GitHub! If you have any questions, feel free to join my Discord server or submit a GitHub issue. You can find solutions to many common problems in the documentation: https://reddit-video-maker-bot.netlify.app/"
)
checkversion(__VERSION__)
def main(POST_ID=None) -> None:
global redditid, reddit_object
reddit_object = get_subreddit_threads(POST_ID)
redditid = id(reddit_object)
length, number_of_comments = save_text_to_mp3(reddit_object)
length = math.ceil(length)
get_screenshots_of_reddit_posts(reddit_object, number_of_comments)
bg_config = {
"video": get_background_config("video"),
"audio": get_background_config("audio"),
}
download_background_video(bg_config["video"])
download_background_audio(bg_config["audio"])
chop_background(bg_config, length, reddit_object)
make_final_video(number_of_comments, length, reddit_object, bg_config)
def run_many(times) -> None:
for x in range(1, times + 1):
print_step(
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}'
) # correct 1st 2nd 3rd 4th 5th....
main()
Popen("cls" if name == "nt" else "clear", shell=True).wait()
def shutdown() -> NoReturn:
if "redditid" in globals():
print_markdown("## Clearing temp files")
cleanup(redditid)
print("Exiting...")
sys.exit()
)
print_markdown(
"### Thanks for using this tool! Feel free to contribute to this project on GitHub! If you have any questions, feel free to join my Discord server or submit a GitHub issue. You can find solutions to many common problems in the documentation: https://reddit-video-maker-bot.netlify.app/"
)
def initialize_app_checks_and_config():
"""Handles initial application setup including version checks, argument parsing, and configuration loading."""
checkversion(__VERSION__) # This might print, consider replacing if it does. For now, assume it's a simple check or uses logging.
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Reddit Video Maker Bot")
parser.add_argument(
"--list-tts",
@ -89,58 +73,243 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.list_tts:
print_step("Available TTS Providers:")
logging.info("Available TTS Providers:")
for provider in TTSProviders:
print_substep(f"- {provider}")
logging.info(f"- {provider}") # Simple info log for list items
sys.exit()
if sys.version_info.major != 3 or sys.version_info.minor not in [10, 11]:
print(
"Hey! Congratulations, you've made it so far (which is pretty rare with no Python 3.10). Unfortunately, this program only works on Python 3.10. Please install Python 3.10 and try again."
logging.error(
"Hey! Congratulations, you've made it so far (which is pretty rare with Python 3.10/3.11). "
"Unfortunately, this program primarily supports Python 3.10 and 3.11. "
"Please install one of these versions and try again."
)
sys.exit()
ffmpeg_install()
ffmpeg_install() # This function might print, review separately.
directory = Path().absolute()
logging.info("Checking TOML configuration...")
config = settings.check_toml(
f"{directory}/utils/.config.template.toml", f"{directory}/config.toml"
directory / "utils" / ".config.template.toml", directory / "config.toml"
)
config is False and sys.exit()
if not config: # check_toml returns False on failure
logging.error("Failed to load or create configuration. Exiting.")
sys.exit()
logging.info("TOML configuration check complete.")
if (
not settings.config["settings"]["tts"]["tiktok_sessionid"]
or settings.config["settings"]["tts"]["tiktok_sessionid"] == ""
) and config["settings"]["tts"]["voice_choice"] == "tiktok":
print_substep(
"TikTok voice requires a sessionid! Check our documentation on how to obtain one.",
"bold red",
) and settings.config["settings"]["tts"]["voice_choice"] == "tiktok":
logging.error(
"TikTok voice requires a sessionid! Check our documentation on how to obtain one."
)
sys.exit()
return config
def get_reddit_data(post_id_override: str = None) -> Dict[str, Any]:
"""
Fetches and processes Reddit thread data using praw.
It retrieves submission details and comments. A 'safe_thread_id' is generated
by sanitizing the original thread_id for filesystem compatibility and stored
in the returned dictionary. This safe ID is also stored globally for cleanup operations.
Args:
post_id_override (Optional[str]): Specific Reddit post ID to fetch.
If None, fetches based on subreddit config.
Returns:
Dict[str, Any]: A dictionary containing the processed Reddit thread data,
including the 'safe_thread_id'.
"""
logging.info("Fetching Reddit data...")
reddit_object = get_subreddit_threads(post_id_override)
if "thread_id" in reddit_object:
reddit_object["safe_thread_id"] = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
logging.debug(f"Reddit thread ID: {reddit_object['thread_id']}, Safe ID: {reddit_object['safe_thread_id']}")
else:
logging.error("Critical: thread_id missing from Reddit object.")
reddit_object["safe_thread_id"] = "unknown_thread_" + str(math.floor(time.time())) # Ensure unique unknown ID
logging.warning(f"Assigned fallback safe_thread_id: {reddit_object['safe_thread_id']}")
global _current_reddit_id_for_cleanup
_current_reddit_id_for_cleanup = reddit_object["safe_thread_id"]
return reddit_object
def generate_audio_and_screenshots(reddit_object: Dict[str, Any]) -> Tuple[int, int]:
"""Generates TTS audio for the reddit content and takes screenshots."""
logging.info("Generating audio and screenshots...")
length, number_of_comments = save_text_to_mp3(reddit_object)
final_length = math.ceil(length)
get_screenshots_of_reddit_posts(reddit_object, number_of_comments)
logging.info("Audio and screenshots generated.")
return final_length, number_of_comments
def prepare_background_assets(length: int, reddit_object: Dict[str, Any]) -> Dict[str, Any]:
"""Prepares background video and audio assets."""
logging.info("Preparing background assets...")
bg_config = {
"video": get_background_config("video"),
"audio": get_background_config("audio"),
}
download_background_video(bg_config["video"])
download_background_audio(bg_config["audio"])
chop_background(bg_config, length, reddit_object)
logging.info("Background assets prepared.")
return bg_config
def create_video_from_assets(number_of_comments: int, length: int, reddit_object: Dict[str, Any], bg_config: Dict[str, Any]) -> None:
"""Compiles the final video from all generated assets."""
logging.info("Compiling final video...")
make_final_video(number_of_comments, length, reddit_object, bg_config)
logging.info("Final video compilation complete.")
def process_single_submission(post_id_override: str = None) -> None:
"""Main workflow to process a single Reddit submission into a video."""
logging.info(f"Starting processing for submission ID: {post_id_override if post_id_override else 'random'}")
reddit_object = get_reddit_data(post_id_override)
length, num_comments = generate_audio_and_screenshots(reddit_object)
background_config = prepare_background_assets(length, reddit_object)
create_video_from_assets(num_comments, length, reddit_object, background_config)
def run_many(times: int, config: Dict[str, Any]) -> None:
"""Runs the video creation process multiple times for random submissions."""
for x in range(1, times + 1):
logging.info(
f'On iteration {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10 if x % 10 < 4 and x // 10 != 1 else 0]} of {times}'
)
process_single_submission() # For random posts, no specific post_id
if x < times : # Don't clear after the last run
Popen("cls" if name == "nt" else "clear", shell=True).wait()
def shutdown_app() -> NoReturn:
"""Handles application shutdown, including cleanup."""
global _current_reddit_id_for_cleanup
if _current_reddit_id_for_cleanup:
logging.info(f"Clearing temp files for ID: {_current_reddit_id_for_cleanup}")
cleanup(_current_reddit_id_for_cleanup)
_current_reddit_id_for_cleanup = None
logging.info("Exiting Reddit Video Maker Bot.")
sys.exit()
if __name__ == "__main__":
display_banner_and_initial_message() # This function still uses print and print_markdown
# --- Logging Setup ---
log_file_path = Path("reddit_video_bot.log")
# Max log file size 5MB, keep 3 backup logs
file_handler = logging.handlers.RotatingFileHandler(log_file_path, maxBytes=5*1024*1024, backupCount=3, encoding='utf-8')
file_handler.setLevel(logging.DEBUG) # Log everything to file
console_handler = logging.StreamHandler() # Defaults to stderr
console_handler.setLevel(logging.INFO) # Log INFO and above to console
# Rich console handler for better formatting, if rich is available and preferred
try:
if config["reddit"]["thread"]["post_id"]:
for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")):
index += 1
print_step(
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'
from rich.logging import RichHandler
console_handler = RichHandler(rich_tracebacks=True, show_path=False, show_time=False, markup=True) # markup=True for rich styles
console_handler.setLevel(logging.INFO)
log_formatter = logging.Formatter("%(message)s")
detailed_log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(module)s.%(funcName)s:%(lineno)d] - %(message)s')
except ImportError:
log_formatter = logging.Formatter('[%(levelname)s] %(message)s')
detailed_log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(module)s.%(funcName)s:%(lineno)d] - %(message)s')
console_handler.setFormatter(log_formatter)
file_handler.setFormatter(detailed_log_formatter)
# Configure the root logger
# logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) # This is one way
# Or, get the root logger and add handlers:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Set root logger level
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# --- End Logging Setup ---
logging.info(f"Reddit Video Maker Bot Version: {__VERSION__}")
logging.debug("Logging initialized.")
app_config = initialize_app_checks_and_config()
try:
post_ids_str = app_config.get("reddit", {}).get("thread", {}).get("post_id")
times_to_run = app_config.get("settings", {}).get("times_to_run")
# Determine execution mode based on configuration
if post_ids_str:
# Mode 1: Process a specific list of post IDs
logging.info(f"Processing specific Reddit post IDs from config: {post_ids_str}")
post_id_list = post_ids_str.split("+")
for index, p_id in enumerate(post_id_list):
logging.info(
f'Processing post {index + 1}{("st", "nd", "rd", "th")[min(index % 10, 3) if (index + 1) % 100 // 10 != 1 else 3]} of {len(post_id_list)} (ID: {p_id.strip()})'
)
main(post_id)
Popen("cls" if name == "nt" else "clear", shell=True).wait()
elif config["settings"]["times_to_run"]:
run_many(config["settings"]["times_to_run"])
process_single_submission(p_id.strip())
if index < len(post_id_list) -1 :
# Clear console between processing multiple specified posts (except for the last one)
Popen("cls" if name == "nt" else "clear", shell=True).wait()
elif times_to_run:
# Mode 2: Run for a configured number of times (fetches random posts)
logging.info(f"Running Reddit Video Maker Bot {times_to_run} times for random posts.")
run_many(times_to_run, app_config)
else:
main()
# Mode 3: Default single run for a random post
logging.info("No specific post IDs or multiple runs configured. Running once for a random post.")
process_single_submission()
except KeyboardInterrupt:
shutdown()
except ResponseException:
print_markdown("## Invalid credentials")
print_markdown("Please check your credentials in the config.toml file")
shutdown()
logging.warning("Keyboard interrupt detected!")
shutdown_app()
except ResponseException as e:
logging.error(f"Reddit API Error: {e}")
logging.error("Please check your credentials in the config.toml file and your internet connection.")
shutdown_app()
except Exception as err:
config["settings"]["tts"]["tiktok_sessionid"] = "REDACTED"
config["settings"]["tts"]["elevenlabs_api_key"] = "REDACTED"
print_step(
f"Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n"
f"Version: {__VERSION__} \n"
f"Error: {err} \n"
f'Config: {config["settings"]}'
logging.error(f"An unexpected error occurred: {type(err).__name__} - {err}", exc_info=True) # Log traceback to file
# Redact sensitive info before showing to console (if error is printed to console by RichHandler)
# This part is more for if we were constructing the console message manually here.
# RichHandler with exc_info=True will show traceback, which might contain sensitive data from locals.
# For now, the detailed log goes to file, console gets a simpler message.
# Simplified console error message:
error_details_for_console = (
f"Version: {__VERSION__}\n"
f"Error Type: {type(err).__name__}\n"
"Details have been logged to reddit_video_bot.log.\n"
"Please report this issue at GitHub or the Discord community if it persists."
)
raise err
# If not using RichHandler or if more control is needed for console:
# console.print(Panel(Text(error_details_for_console, style="bold red"), title="Unhandled Exception"))
# Since RichHandler is used, logging.error will display it.
# The main `logging.error` call above with `exc_info=True` handles file logging.
# For console, RichHandler will format the exception. We might want a less verbose console output.
# The below is a more controlled message for console if the above logging.error is too verbose for console.
# For now, rely on RichHandler's traceback formatting for console errors.
# The original print_step for error:
# print_step(error_message, style="bold red") # This would now be logging.error(...)
# The error_message variable construction from original code:
# config_settings_str = str(settings.config.get('settings')) # Simplified for this example
# if "tts" in settings.config.get("settings", {}):
# if "tiktok_sessionid" in settings.config["settings"]["tts"]:
# config_settings_str = config_settings_str.replace(settings.config["settings"]["tts"]["tiktok_sessionid"], "REDACTED")
# if "elevenlabs_api_key" in settings.config["settings"]["tts"]:
# config_settings_str = config_settings_str.replace(settings.config["settings"]["tts"]["elevenlabs_api_key"], "REDACTED")
# logging.error(f"Sorry, something went wrong with this version!\nVersion: {__VERSION__}\nError: {err}\nConfig (sensitive fields redacted): {config_settings_str}")
# Re-raise if you want Python's default exception printing to also occur,
# or if something else higher up should handle it.
# For a CLI app, often we log and then exit.
shutdown_app() # Ensure cleanup and exit
finally:
if _current_reddit_id_for_cleanup:
logging.info(f"Performing final cleanup for ID: {_current_reddit_id_for_cleanup}")
cleanup(_current_reddit_id_for_cleanup)

@ -0,0 +1,75 @@
import unittest
import sys
from pathlib import Path
# Add project root to sys.path to allow importing project modules
# Assuming 'tests' directory is at the root of the project, or one level down.
# If tests/ is in root, then Path(__file__).parent.parent should be project root.
# If tests/ is utils/tests/, then Path(__file__).parent.parent.parent
# For now, let's assume tests/ is at the project root.
# If this script is run from the project root (e.g., python -m unittest discover),
# then imports might work without sys.path modification if modules are packaged or top-level.
# However, to be safe for direct execution of this test file or discovery from tests/ dir:
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from video_creation.final_video import name_normalize
class TestNameNormalize(unittest.TestCase):
def test_empty_string(self):
self.assertEqual(name_normalize(""), "")
def test_no_special_chars(self):
self.assertEqual(name_normalize("Simple Name 123"), "Simple Name 123")
def test_forbidden_chars(self):
self.assertEqual(name_normalize('Test? \\ " * : | < > Name'), "Test Name")
def test_slash_variants_word_or(self):
self.assertEqual(name_normalize("cat/dog"), "cat or dog")
self.assertEqual(name_normalize("cat / dog"), "cat or dog")
self.assertEqual(name_normalize("cat /dog"), "cat or dog")
self.assertEqual(name_normalize("cat/ dog"), "cat or dog")
def test_slash_variants_numbers_of(self):
self.assertEqual(name_normalize("1/2"), "1 of 2")
self.assertEqual(name_normalize("1 / 2"), "1 of 2")
self.assertEqual(name_normalize("10 / 20"), "10 of 20")
def test_slash_variants_with_without(self):
self.assertEqual(name_normalize("test w/ feature"), "test with feature")
self.assertEqual(name_normalize("test W / feature"), "test with feature")
self.assertEqual(name_normalize("test w/o feature"), "test without feature")
self.assertEqual(name_normalize("test W / O feature"), "test without feature")
self.assertEqual(name_normalize("test W / 0 feature"), "test without feature") # '0' for 'o'
def test_remove_remaining_slashes(self):
self.assertEqual(name_normalize("a/b/c"), "a or b or c") # First pass
self.assertEqual(name_normalize("leading/trailing/"), "leading or trailing") # after or, / is removed
# The function applies rules sequentially. "a/b/c" -> "a or b/c" -> "a or b or c"
# "test / only / remove" -> "test or only or remove"
# A single remaining slash after other rules: "path/to/file" -> "path or to or file"
# If a literal single slash needs removing without 'or', 'of', 'with', 'without' interpretation:
# e.g. "text/ single" -> "text single" (this is what it currently does due to the final re.sub(r"\/", r"", name))
self.assertEqual(name_normalize("text/ single"), "text single")
def test_combined_rules(self):
self.assertEqual(name_normalize('File <1>/<2> w/ option?'), "File 1 of 2 with option")
def test_translation_skip(self):
# This test assumes 'post_lang' is not set in settings for name_normalize to skip translation.
# To properly test translation, we'd need to mock settings.config or have a way to set it.
# For now, testing the non-translation path.
# If settings.config["reddit"]["thread"]["post_lang"] is None or empty, it should not translate.
# This requires settings to be loaded; for a unit test, it's better if name_normalize
# can take lang as a parameter or if settings is easily mockable.
# For now, we assume the default state or test its behavior when lang is None.
# To test this properly, name_normalize might need a slight refactor
# to accept 'lang' as an argument, or we mock 'settings.config'.
# Given current structure, we just test a name that would be unchanged if no translation.
self.assertEqual(name_normalize("A simple name"), "A simple name")
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,77 @@
import unittest
import sys
from pathlib import Path
# Add project root to sys.path
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
# Assuming _safe_str_to_bool is accessible for testing.
# If it's meant to be private, this import might be an issue,
# but for unit testing helpers, it's often practical.
from utils.settings import _safe_str_to_bool
class TestSafeStrToBool(unittest.TestCase):
def test_true_strings(self):
self.assertTrue(_safe_str_to_bool("true"))
self.assertTrue(_safe_str_to_bool("True"))
self.assertTrue(_safe_str_to_bool("TRUE"))
self.assertTrue(_safe_str_to_bool("yes"))
self.assertTrue(_safe_str_to_bool("Yes"))
self.assertTrue(_safe_str_to_bool("1"))
self.assertTrue(_safe_str_to_bool("on"))
self.assertTrue(_safe_str_to_bool("On"))
def test_false_strings(self):
self.assertFalse(_safe_str_to_bool("false"))
self.assertFalse(_safe_str_to_bool("False"))
self.assertFalse(_safe_str_to_bool("FALSE"))
self.assertFalse(_safe_str_to_bool("no"))
self.assertFalse(_safe_str_to_bool("No"))
self.assertFalse(_safe_str_to_bool("0"))
self.assertFalse(_safe_str_to_bool("off"))
self.assertFalse(_safe_str_to_bool("Off"))
def test_boolean_input(self):
self.assertTrue(_safe_str_to_bool(True))
self.assertFalse(_safe_str_to_bool(False))
def test_integer_input(self):
# Note: The function converts input to str, so int 1 becomes "1" -> True
self.assertTrue(_safe_str_to_bool(1))
self.assertFalse(_safe_str_to_bool(0))
# Other integers will raise ValueError as they don't match "true"/"false" strings
with self.assertRaises(ValueError):
_safe_str_to_bool(2)
with self.assertRaises(ValueError):
_safe_str_to_bool(-1)
def test_invalid_strings(self):
with self.assertRaises(ValueError):
_safe_str_to_bool("T")
with self.assertRaises(ValueError):
_safe_str_to_bool("F")
with self.assertRaises(ValueError):
_safe_str_to_bool("Y")
with self.assertRaises(ValueError):
_safe_str_to_bool("N")
with self.assertRaises(ValueError):
_safe_str_to_bool("maybe")
with self.assertRaises(ValueError):
_safe_str_to_bool("") # Empty string
with self.assertRaises(ValueError):
_safe_str_to_bool(" true ") # Contains spaces, current impl fails
def test_string_with_spaces_strict(self):
# Current implementation is strict about surrounding spaces.
# If " true ".strip() was used, this would pass.
# Testing current behavior.
with self.assertRaises(ValueError):
_safe_str_to_bool(" true ")
with self.assertRaises(ValueError):
_safe_str_to_bool("false ")
if __name__ == '__main__':
unittest.main()

@ -1,20 +1,62 @@
import os
import shutil
from os.path import exists
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def _listdir(d): # listdir with full path
return [os.path.join(d, f) for f in os.listdir(d)]
def cleanup(reddit_id) -> int:
"""Deletes all temporary assets in assets/temp
# The _listdir function is no longer needed with pathlib's rglob or iterdir.
def cleanup(reddit_id_or_path: str) -> int:
"""
Deletes the specified temporary assets directory.
The input can be just the reddit_id, or a full path to the directory.
Returns:
int: How many files were deleted
int: 1 if directory was found and removed, 0 otherwise.
"""
directory = f"../assets/temp/{reddit_id}/"
if exists(directory):
shutil.rmtree(directory)
# Determine if the input is a full path or just an ID
# This makes the function more flexible if a direct path is ever passed.
if Path(reddit_id_or_path).is_absolute() or Path(reddit_id_or_path).parent != Path("."):
# Looks like a full or relative path with parent components
temp_dir_to_delete = Path(reddit_id_or_path)
else:
# Assume it's just the reddit_id, construct path relative to expected structure
# The original path "../assets/temp/" implies this script might be run from a different CWD.
# For robustness, let's define base path relative to this script file's location or a well-known project root.
# Assuming this script is in `utils/` and assets is `../assets/` from there.
# A more robust way would be to have a global constant for project root or assets root.
# For now, mimicking original relative path logic but with pathlib:
# current_script_dir = Path(__file__).parent
# temp_base_dir = current_script_dir.parent / "assets" / "temp"
# For simplicity and consistency with other path constructions, let's assume a base assets path.
# Let's use a path relative to a potential project root if run from there.
# Or, more simply, the original relative path.
# The original `../assets/temp/` suggests it's being called from a script one level down from project root.
# e.g. if project_root/main.py calls it.
# Let's make it relative to CWD for now as `Path()` defaults to that.
# The original path was "../assets/temp/{reddit_id}/"
# If main.py is in root, and it calls something in utils which calls this,
# then Path("assets/temp") would be more appropriate from root.
# The `../` is concerning. Let's assume this is called from a script within `utils` or similar.
# For now, to match original intent:
# If reddit_id_or_path is just an ID, it implies `assets/temp/{ID}` from some root.
# The original path `../assets/temp/{reddit_id}/` means from where `cleanup.py` is, go up one, then to assets.
# This means project_root/assets/temp/{reddit_id} if cleanup.py is in project_root/utils/
# Safest assumption: the caller (main.py) provides the `safe_thread_id`.
# `main.py` is in the root. `assets` is also in the root.
# So, the path should be `assets/temp/{reddit_id_or_path}`.
temp_dir_to_delete = Path("assets") / "temp" / reddit_id_or_path
logger.info(f"Attempting to cleanup temporary directory: {temp_dir_to_delete}")
return 1
if temp_dir_to_delete.exists() and temp_dir_to_delete.is_dir():
try:
shutil.rmtree(temp_dir_to_delete)
logger.info(f"Successfully removed directory: {temp_dir_to_delete}")
return 1 # Indicate one directory tree was removed
except OSError as e:
logger.error(f"Error removing directory {temp_dir_to_delete}: {e}", exc_info=True)
return 0 # Indicate failure or partial success
else:
logger.warning(f"Temporary directory {temp_dir_to_delete} not found or is not a directory. Skipping cleanup for it.")
return 0

@ -1,104 +1,132 @@
import os
import os # Keep for os.name and os.walk for now, will replace parts
import subprocess
import zipfile
import logging # Added for logging
from pathlib import Path # Added for pathlib
import shutil # For rmtree
import requests
logger = logging.getLogger(__name__)
def ffmpeg_install_windows():
logger.info("Attempting to install FFmpeg for Windows...")
try:
ffmpeg_url = (
"https://github.com/GyanD/codexffmpeg/releases/download/6.0/ffmpeg-6.0-full_build.zip"
)
ffmpeg_zip_filename = "ffmpeg.zip"
ffmpeg_extracted_folder = "ffmpeg"
# Check if ffmpeg.zip already exists
if os.path.exists(ffmpeg_zip_filename):
os.remove(ffmpeg_zip_filename)
# Download FFmpeg
r = requests.get(ffmpeg_url)
with open(ffmpeg_zip_filename, "wb") as f:
f.write(r.content)
# Check if the extracted folder already exists
if os.path.exists(ffmpeg_extracted_folder):
# Remove existing extracted folder and its contents
for root, dirs, files in os.walk(ffmpeg_extracted_folder, topdown=False):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
os.rmdir(os.path.join(root, dir))
os.rmdir(ffmpeg_extracted_folder)
# Extract FFmpeg
with zipfile.ZipFile(ffmpeg_zip_filename, "r") as zip_ref:
zip_ref.extractall()
os.remove("ffmpeg.zip")
# Rename and move files
os.rename(f"{ffmpeg_extracted_folder}-6.0-full_build", ffmpeg_extracted_folder)
for file in os.listdir(os.path.join(ffmpeg_extracted_folder, "bin")):
os.rename(
os.path.join(ffmpeg_extracted_folder, "bin", file),
os.path.join(".", file),
)
os.rmdir(os.path.join(ffmpeg_extracted_folder, "bin"))
for file in os.listdir(os.path.join(ffmpeg_extracted_folder, "doc")):
os.remove(os.path.join(ffmpeg_extracted_folder, "doc", file))
for file in os.listdir(os.path.join(ffmpeg_extracted_folder, "presets")):
os.remove(os.path.join(ffmpeg_extracted_folder, "presets", file))
os.rmdir(os.path.join(ffmpeg_extracted_folder, "presets"))
os.rmdir(os.path.join(ffmpeg_extracted_folder, "doc"))
os.remove(os.path.join(ffmpeg_extracted_folder, "LICENSE"))
os.remove(os.path.join(ffmpeg_extracted_folder, "README.txt"))
os.rmdir(ffmpeg_extracted_folder)
print(
"FFmpeg installed successfully! Please restart your computer and then re-run the program."
)
ffmpeg_url = "https://github.com/GyanD/codexffmpeg/releases/download/6.0/ffmpeg-6.0-full_build.zip"
ffmpeg_zip_path = Path("ffmpeg.zip")
ffmpeg_extracted_base_dir_name = "ffmpeg-6.0-full_build" # Name of dir inside zip
ffmpeg_final_dir = Path("ffmpeg_gyan") # Temp dir for extraction and manipulation
if ffmpeg_zip_path.exists():
logger.debug(f"Removing existing FFmpeg zip file: {ffmpeg_zip_path}")
ffmpeg_zip_path.unlink()
logger.info(f"Downloading FFmpeg from {ffmpeg_url}...")
r = requests.get(ffmpeg_url, stream=True)
r.raise_for_status() # Check for download errors
with open(ffmpeg_zip_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
logger.info("FFmpeg downloaded successfully.")
if ffmpeg_final_dir.exists():
logger.debug(f"Removing existing FFmpeg extracted directory: {ffmpeg_final_dir}")
shutil.rmtree(ffmpeg_final_dir)
logger.info(f"Extracting {ffmpeg_zip_path}...")
with zipfile.ZipFile(ffmpeg_zip_path, "r") as zip_ref:
zip_ref.extractall(ffmpeg_final_dir) # Extract into a specific folder first
# The actual binaries are in ffmpeg-6.0-full_build/bin/
extracted_ffmpeg_path = ffmpeg_final_dir / ffmpeg_extracted_base_dir_name / "bin"
target_install_dir = Path(".") # Current directory
if not extracted_ffmpeg_path.is_dir():
logger.error(f"FFmpeg binaries not found at expected path: {extracted_ffmpeg_path}")
raise FileNotFoundError(f"FFmpeg binaries not found after extraction at {extracted_ffmpeg_path}")
logger.info(f"Moving FFmpeg binaries from {extracted_ffmpeg_path} to {target_install_dir}...")
for item in extracted_ffmpeg_path.iterdir():
if item.is_file() and item.name.startswith("ffmpeg") or item.name.startswith("ffprobe"): #or item.name.startswith("ffplay")
target_file = target_install_dir / item.name
logger.debug(f"Moving {item} to {target_file}")
item.rename(target_file)
logger.debug(f"Cleaning up temporary files: {ffmpeg_zip_path}, {ffmpeg_final_dir}")
ffmpeg_zip_path.unlink() # Remove zip file
shutil.rmtree(ffmpeg_final_dir) # Remove the whole temp extraction folder
logger.info("FFmpeg installed successfully for Windows! Please restart your computer and then re-run the program.")
# No exit() here, let the caller decide.
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download FFmpeg: {e}", exc_info=True)
raise RuntimeError(f"FFmpeg download failed: {e}")
except zipfile.BadZipFile as e:
logger.error(f"Failed to extract FFmpeg zip file (it might be corrupted): {e}", exc_info=True)
raise RuntimeError(f"FFmpeg extraction failed (BadZipFile): {e}")
except Exception as e:
print(
"An error occurred while trying to install FFmpeg. Please try again. Otherwise, please install FFmpeg manually and try again."
)
print(e)
exit()
logger.error(f"An error occurred during Windows FFmpeg installation: {e}", exc_info=True)
logger.error("Please try installing FFmpeg manually and try again.")
raise RuntimeError(f"Windows FFmpeg installation error: {e}")
def ffmpeg_install_linux():
logger.info("Attempting to install FFmpeg for Linux using apt...")
try:
subprocess.run(
"sudo apt install ffmpeg",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception as e:
print(
"An error occurred while trying to install FFmpeg. Please try again. Otherwise, please install FFmpeg manually and try again."
# Using check=True will raise CalledProcessError if apt fails
result = subprocess.run(
"sudo apt update && sudo apt install -y ffmpeg", # Added -y for non-interactive
shell=True, # shell=True is a security risk if command is from variable
check=True, # Raise exception on non-zero exit
capture_output=True, text=True # Capture output
)
print(e)
exit()
print("FFmpeg installed successfully! Please re-run the program.")
exit()
logger.info("FFmpeg installation via apt completed.")
logger.debug(f"apt stdout: {result.stdout}")
logger.debug(f"apt stderr: {result.stderr}")
logger.info("FFmpeg (Linux) installed successfully! Please re-run the program if this was the first time.")
# No exit() here
except subprocess.CalledProcessError as e:
logger.error(f"Failed to install FFmpeg using apt. Return code: {e.returncode}")
logger.error(f"apt stdout: {e.stdout}")
logger.error(f"apt stderr: {e.stderr}")
logger.error("Please try installing FFmpeg manually (e.g., 'sudo apt install ffmpeg') and try again.")
raise RuntimeError(f"Linux FFmpeg installation via apt failed: {e}")
except Exception as e: # Catch other errors like permissions if sudo is not passwordless
logger.error(f"An unexpected error occurred during Linux FFmpeg installation: {e}", exc_info=True)
raise RuntimeError(f"Unexpected Linux FFmpeg installation error: {e}")
def ffmpeg_install_mac():
logger.info("Attempting to install FFmpeg for macOS using Homebrew...")
try:
subprocess.run(
# Check if Homebrew is installed first
subprocess.run(["brew", "--version"], check=True, capture_output=True)
logger.debug("Homebrew found.")
except (FileNotFoundError, subprocess.CalledProcessError):
logger.error("Homebrew is not installed or not in PATH. Please install Homebrew first (see https://brew.sh/).")
raise EnvironmentError("Homebrew not found. FFmpeg installation via Homebrew requires Homebrew.")
try:
result = subprocess.run(
"brew install ffmpeg",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except FileNotFoundError:
print(
"Homebrew is not installed. Please install it and try again. Otherwise, please install FFmpeg manually and try again."
shell=True, # shell=True for brew install command might be okay but direct execution is safer if possible
check=True,
capture_output=True, text=True
)
exit()
print("FFmpeg installed successfully! Please re-run the program.")
exit()
logger.info("FFmpeg installation via Homebrew completed.")
logger.debug(f"brew stdout: {result.stdout}")
logger.debug(f"brew stderr: {result.stderr}")
logger.info("FFmpeg (macOS) installed successfully! Please re-run the program if this was the first time.")
# No exit()
except subprocess.CalledProcessError as e:
logger.error(f"Failed to install FFmpeg using Homebrew. Return code: {e.returncode}")
logger.error(f"brew stdout: {e.stdout}")
logger.error(f"brew stderr: {e.stderr}")
logger.error("Please try installing FFmpeg manually (e.g., 'brew install ffmpeg') and try again.")
raise RuntimeError(f"macOS FFmpeg installation via Homebrew failed: {e}")
except Exception as e: # Catch other unexpected errors
logger.error(f"An unexpected error occurred during macOS FFmpeg installation: {e}", exc_info=True)
raise RuntimeError(f"Unexpected macOS FFmpeg installation error: {e}")
def ffmpeg_install():
@ -108,35 +136,73 @@ def ffmpeg_install():
["ffmpeg", "-version"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stderr=subprocess.PIPE, # Keep PIPE to avoid printing to console unless check=True fails
)
except FileNotFoundError as e:
# Check if there's ffmpeg.exe in the current directory
if os.path.exists("./ffmpeg.exe"):
print(
"FFmpeg is installed on this system! If you are seeing this error for the second time, restart your computer."
logger.info("FFmpeg is already installed and accessible in PATH.")
except FileNotFoundError:
logger.warning("FFmpeg not found in PATH.")
# Check if there's ffmpeg.exe in the current directory (Windows specific check)
if os.name == "nt" and Path("./ffmpeg.exe").exists():
logger.info(
"ffmpeg.exe found in the current directory. Consider adding it to PATH or ensuring it's used correctly."
)
print("FFmpeg is not installed on this system.")
resp = input(
"We can try to automatically install it for you. Would you like to do that? (y/n): "
)
if resp.lower() == "y":
print("Installing FFmpeg...")
if os.name == "nt":
ffmpeg_install_windows()
elif os.name == "posix":
ffmpeg_install_linux()
elif os.name == "mac":
ffmpeg_install_mac()
else:
print("Your OS is not supported. Please install FFmpeg manually and try again.")
exit()
# If this message appears again after restart, user might need to add CWD to PATH or handle it.
# For now, assume if it's there, it might be usable by the app if CWD is in PATH implicitly or explicitly.
return # Assume it's "installed" if present locally on Windows
logger.info("FFmpeg is not installed or not in PATH.")
# Use a local Rich Console for this interactive part, as logging handlers might be configured differently
from rich.console import Console as RichConsole
local_console = RichConsole()
try:
resp = local_console.input(
"[yellow]FFmpeg is not detected. Would you like to attempt automatic installation? (y/n):[/yellow] "
).strip().lower()
except Exception: # Catch potential errors if input is not from a real TTY
logger.warning("Could not get user input for FFmpeg installation. Assuming 'no'.")
resp = "n"
if resp == "y":
logger.info("Attempting to install FFmpeg automatically...")
try:
if os.name == "nt": # Windows
ffmpeg_install_windows()
elif sys.platform == "darwin": # macOS
ffmpeg_install_mac()
elif os.name == "posix": # Linux and other POSIX
ffmpeg_install_linux()
else:
logger.error(f"Automatic FFmpeg installation is not supported for your OS: {os.name} / {sys.platform}.")
raise EnvironmentError(f"Unsupported OS for automatic FFmpeg installation: {os.name}")
# After installation attempt, re-check
logger.info("Re-checking FFmpeg version after installation attempt...")
subprocess.run(["ffmpeg", "-version"], check=True, capture_output=True)
logger.info("FFmpeg successfully installed and verified.")
except (RuntimeError, EnvironmentError) as install_err: # Catch errors from install functions
logger.error(f"Automatic FFmpeg installation failed: {install_err}")
logger.info("Please install FFmpeg manually and add it to your system's PATH.")
# Do not exit here, let main.py handle if ffmpeg is critical
raise # Re-raise to indicate to main.py that ffmpeg is still not available.
except Exception as e:
logger.error(f"An unexpected error occurred during automatic FFmpeg installation process: {e}", exc_info=True)
logger.info("Please install FFmpeg manually and add it to your system's PATH.")
raise RuntimeError(f"Unexpected FFmpeg auto-install error: {e}")
else:
print("Please install FFmpeg manually and try again.")
exit()
except Exception as e:
print(
"Welcome fellow traveler! You're one of the few who have made it this far. We have no idea how you got at this error, but we're glad you're here. Please report this error to the developer, and we'll try to fix it as soon as possible. Thank you for your patience!"
)
print(e)
logger.info("User declined automatic FFmpeg installation. Please install FFmpeg manually.")
raise FileNotFoundError("FFmpeg not found and user declined installation.")
except subprocess.CalledProcessError as e:
# This means ffmpeg -version returned non-zero, which is unusual but possible.
logger.warning(f"FFmpeg check command 'ffmpeg -version' executed but returned an error (code {e.returncode}). FFmpeg might have issues.")
logger.debug(f"ffmpeg -version stdout: {e.stdout.decode(errors='ignore') if e.stdout else ''}")
logger.debug(f"ffmpeg -version stderr: {e.stderr.decode(errors='ignore') if e.stderr else ''}")
# Proceed cautiously, it might still work.
except Exception as e: # Catch any other unexpected error during initial check
logger.error(f"An unexpected error occurred while checking for FFmpeg: {e}", exc_info=True)
# This is a critical failure if we can't even check for ffmpeg.
raise RuntimeError(f"Failed to check for FFmpeg: {e}")
# Return None implicitly if execution reaches here without error
return None

@ -1,12 +1,41 @@
import json
import re
from pathlib import Path
from typing import Dict, Callable, Any # Added Callable and Any
import toml
import tomlkit
from flask import flash
# --- Helper for safe type conversion (copied from utils/settings.py) ---
def _safe_str_to_bool(val: Any) -> bool:
"""Converts a string to boolean in a case-insensitive way."""
if isinstance(val, bool):
return val
val_str = str(val).lower()
if val_str in ("true", "yes", "1", "on"):
return True
if val_str in ("false", "no", "0", "off"):
return False
raise ValueError(f"Cannot convert '{val}' to boolean.")
_TYPE_CONVERTERS: Dict[str, Callable[[Any], Any]] = {
"str": str,
"int": int,
"float": float,
"bool": _safe_str_to_bool,
}
def _get_safe_type_converter(type_str: str) -> Callable[[Any], Any]:
"""Returns a safe type conversion function based on a type string."""
converter = _TYPE_CONVERTERS.get(type_str)
if converter is None:
raise ValueError(f"Unsupported type string for conversion: {type_str}. Supported types: {list(_TYPE_CONVERTERS.keys())}")
return converter
# --- End of helper ---
# Get validation checks from template
def get_checks():
template = toml.load("utils/.config.template.toml")
@ -14,9 +43,9 @@ def get_checks():
def unpack_checks(obj: dict):
for key in obj.keys():
if "optional" in obj[key].keys():
if "optional" in obj[key].keys(): # Assuming "optional" key presence indicates a checkable item
checks[key] = obj[key]
else:
elif isinstance(obj[key], dict): # Recurse only if it's a dictionary
unpack_checks(obj[key])
unpack_checks(template)
@ -25,7 +54,9 @@ def get_checks():
# Get current config (from config.toml) as dict
def get_config(obj: dict, done={}):
def get_config(obj: dict, done=None): # Changed default for done to None
if done is None:
done = {}
for key in obj.keys():
if not isinstance(obj[key], dict):
done[key] = obj[key]
@ -36,17 +67,27 @@ def get_config(obj: dict, done={}):
# Checks if value is valid
def check(value, checks):
def check(value, checks): # `checks` here is the specific check dict for one item
incorrect = False
original_value = value
if value == "False":
value = ""
# The line `if value == "False": value = ""` was removed.
# _safe_str_to_bool will handle "False" string correctly for boolean conversions.
# If it was meant for string fields, that logic should be more explicit if needed.
if not incorrect and "type" in checks:
type_str = checks["type"]
try:
value = eval(checks["type"])(value)
except Exception:
converter = _get_safe_type_converter(type_str)
value = converter(value)
except (ValueError, TypeError) as e:
# In GUI, direct print might not be visible. Flash message is handled by modify_settings.
# For now, just mark as incorrect. Consider logging here.
# print(f"Debug: Conversion error for '{original_value}' to '{type_str}': {e}") # Debug print
incorrect = True
except Exception: # Catch any other unexpected errors
incorrect = True
if (
not incorrect and "options" in checks and value not in checks["options"]
@ -62,9 +103,11 @@ def check(value, checks):
): # FAILSTATE Value doesn't match regex, or has regex but is not a string.
incorrect = True
# Length/Value checks for non-iterables (int, float)
if (
not incorrect
and not hasattr(value, "__iter__")
and not isinstance(value, str) # Explicitly exclude strings
and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or ("nmax" in checks and checks["nmax"] is not None and value > checks["nmax"])
@ -72,6 +115,7 @@ def check(value, checks):
):
incorrect = True
# Length checks for iterables (str, list)
if (
not incorrect
and hasattr(value, "__iter__")
@ -83,7 +127,7 @@ def check(value, checks):
incorrect = True
if incorrect:
return "Error"
return "Error" # Special marker for modify_settings to flash an error
return value
@ -125,26 +169,28 @@ def modify_settings(data: dict, config_load, checks: dict):
# Delete background video
def delete_background(key):
# Read backgrounds.json
with open("utils/backgrounds.json", "r", encoding="utf-8") as backgrounds:
data = json.load(backgrounds)
# Remove background from backgrounds.json
with open("utils/backgrounds.json", "w", encoding="utf-8") as backgrounds:
if data.pop(key, None):
json.dump(data, backgrounds, ensure_ascii=False, indent=4)
else:
flash("Couldn't find this background. Try refreshing the page.", "error")
return
# Remove background video from ".config.template.toml"
config = tomlkit.loads(Path("utils/.config.template.toml").read_text())
config["settings"]["background"]["background_choice"]["options"].remove(key)
backgrounds_json_path = Path("utils/backgrounds.json")
try:
with open(backgrounds_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
flash(f"Error reading backgrounds file: {e}", "error")
return
with Path("utils/.config.template.toml").open("w") as toml_file:
toml_file.write(tomlkit.dumps(config))
if key in data:
data.pop(key)
try:
with open(backgrounds_json_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
flash(f'Successfully removed "{key}" background!')
except IOError as e:
flash(f"Error writing backgrounds file: {e}", "error")
else:
flash("Couldn't find this background. Try refreshing the page.", "error")
return
flash(f'Successfully removed "{key}" background!')
# The part modifying ".config.template.toml" is removed.
# The available choices will be dynamically loaded from backgrounds.json by the application.
# Add background video
@ -193,20 +239,25 @@ def add_background(youtube_uri, filename, citation, position):
return
# Add background video to json file
with open("utils/backgrounds.json", "r+", encoding="utf-8") as backgrounds:
data = json.load(backgrounds)
data[filename] = [youtube_uri, filename + ".mp4", citation, position]
backgrounds.seek(0)
json.dump(data, backgrounds, ensure_ascii=False, indent=4)
# Add background video to ".config.template.toml"
config = tomlkit.loads(Path("utils/.config.template.toml").read_text())
config["settings"]["background"]["background_choice"]["options"].append(filename)
with Path("utils/.config.template.toml").open("w") as toml_file:
toml_file.write(tomlkit.dumps(config))
backgrounds_json_path = Path("utils/backgrounds.json")
try:
with open(backgrounds_json_path, "r+", encoding="utf-8") as f:
# Load existing data, or initialize if file is empty/invalid
try:
data = json.load(f)
except json.JSONDecodeError:
data = {} # Initialize with empty dict if file is empty or malformed
data[filename] = [youtube_uri, filename + ".mp4", citation, position]
f.seek(0) # Rewind to the beginning of the file
f.truncate() # Clear the file content before writing new data
json.dump(data, f, ensure_ascii=False, indent=4)
flash(f'Added "{citation}-{filename}.mp4" as a new background video!')
except IOError as e:
flash(f"Error writing to backgrounds file: {e}", "error")
return
flash(f'Added "{citation}-{filename}.mp4" as a new background video!')
# The part modifying ".config.template.toml" is removed.
# The available choices will be dynamically loaded from backgrounds.json by the application.
return

@ -1,6 +1,7 @@
import os
# import os # No longer needed
import re
import textwrap
from pathlib import Path # Added pathlib
from PIL import Image, ImageDraw, ImageFont
from rich.progress import track
@ -58,18 +59,46 @@ def imagemaker(theme, reddit_obj: dict, txtclr, padding=5, transparent=False) ->
Render Images for video
"""
texts = reddit_obj["thread_post"]
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
# Use safe_thread_id if available from prior processing, otherwise sanitize
safe_id = reddit_obj.get("safe_thread_id", re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]))
# Define font paths using pathlib for consistency, then convert to string for PIL
# Assuming a FONTS_DIR constant would be defined similarly to how it's done in final_video.py
# For now, let's define it locally or assume it's passed/configured.
# For this change, I'll define a local FONTS_DIR relative to this file's assumed location if not available globally.
# A better long-term solution is a shared constants/config for such paths.
# Assuming this utils/imagenarator.py is in utils/, and fonts/ is at project_root/fonts/
# So, Path(__file__).parent.parent / "fonts"
# For simplicity, let's use a relative path from CWD, assuming CWD is project root.
fonts_dir = Path("fonts")
roboto_bold_path = str(fonts_dir / "Roboto-Bold.ttf")
roboto_regular_path = str(fonts_dir / "Roboto-Regular.ttf")
if transparent:
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 100)
font = ImageFont.truetype(roboto_bold_path, 100)
else:
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Regular.ttf"), 100)
size = (1920, 1080)
font = ImageFont.truetype(roboto_regular_path, 100)
image = Image.new("RGBA", size, theme)
size = (1920, 1080) # Consider making size configurable
for idx, text in track(enumerate(texts), "Rendering Image"):
image = Image.new("RGBA", size, theme)
text = process_text(text, False)
# Ensure output directory exists
output_dir = Path("assets") / "temp" / safe_id / "png"
output_dir.mkdir(parents=True, exist_ok=True)
for idx, text in track(enumerate(texts), "Rendering Images for Storymode"): # Changed description
image = Image.new("RGBA", size, theme) # Create a fresh image for each text segment
text = process_text(text, False) # Assuming process_text is defined elsewhere
draw_multiple_line_text(image, text, font, txtclr, padding, wrap=30, transparent=transparent)
image.save(f"assets/temp/{id}/png/img{idx}.png")
output_image_path = output_dir / f"img{idx}.png"
try:
image.save(output_image_path)
except Exception as e:
# Log error if imagemaker is integrated with logging
# For now, print to stderr or raise
print(f"Error saving image {output_image_path}: {e}") # Replace with logger.error if available
# Depending on desired behavior, either continue or raise e
# For now, let's continue to try and process other images.
# Consider adding `logger.error(..., exc_info=True)` here
pass

@ -1,16 +1,51 @@
import re
import json # Added import
from pathlib import Path
from typing import Dict, Tuple
from typing import Dict, Tuple, Callable, Any
import toml
from rich.console import Console
import logging # Added for logging
from rich.console import Console # Keep for rich formatting in handle_input if needed, but prefer logging for app messages
from utils.console import handle_input
from utils.console import handle_input # handle_input uses console.print, will need review
console = Console()
# console = Console() # Replaced by logger for general messages
logger = logging.getLogger(__name__)
config = dict # autocomplete
# --- Helper for safe type conversion ---
def _safe_str_to_bool(val: Any) -> bool:
"""Converts a string to boolean in a case-insensitive way."""
if isinstance(val, bool):
return val
val_str = str(val).lower()
if val_str in ("true", "yes", "1", "on"):
return True
if val_str in ("false", "no", "0", "off"):
return False
raise ValueError(f"Cannot convert '{val}' to boolean.")
_TYPE_CONVERTERS: Dict[str, Callable[[Any], Any]] = {
"str": str,
"int": int,
"float": float,
"bool": _safe_str_to_bool,
# Add other types here if needed, e.g., list, dict, but they might require more complex parsing
# For now, assuming basic types are used in the config template's "type" field.
}
def _get_safe_type_converter(type_str: str) -> Callable[[Any], Any]:
"""Returns a safe type conversion function based on a type string."""
converter = _TYPE_CONVERTERS.get(type_str)
if converter is None:
# Fallback or raise error if type_str is not supported
# For safety, let's raise an error if an unknown type string is provided.
raise ValueError(f"Unsupported type string for conversion: {type_str}. Supported types: {list(_TYPE_CONVERTERS.keys())}")
return converter
# --- End of helper ---
def crawl(obj: dict, func=lambda x, y: print(x, y, end="\n"), path=None):
if path is None: # path Default argument value is mutable
path = []
@ -26,40 +61,72 @@ def check(value, checks, name):
return checks[key] if key in checks else default_result
incorrect = False
if value == {}:
original_value = value # Keep original value for re-input if conversion fails
if value == {}: # Treat empty dict as incorrect for a setting expecting a value
incorrect = True
if not incorrect and "type" in checks:
type_str = checks["type"]
try:
value = eval(checks["type"])(value)
except:
converter = _get_safe_type_converter(type_str)
value = converter(value)
except (ValueError, TypeError) as e: # Catch conversion errors
logger.warning(f"Could not convert value '{original_value}' for '{name}' to type '{type_str}'. Error: {e}")
incorrect = True
except Exception as e: # Catch any other unexpected errors during conversion
logger.error(f"Unexpected error converting value for '{name}' to type '{type_str}'. Error: {e}", exc_info=True)
incorrect = True
# Dynamic options loading for background_choice
current_options = checks.get("options")
if name == "background_choice" and "options" in checks:
try:
with open(Path(__file__).parent / "backgrounds.json", "r", encoding="utf-8") as f:
background_data = json.load(f)
current_options = list(background_data.keys())
if not current_options:
logger.warning("No backgrounds found in backgrounds.json. Using fallback options if available from template.")
current_options = checks.get("options", ["DEFAULT_BACKGROUND_FALLBACK"])
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.warning(f"Could not load backgrounds from backgrounds.json: {e}. Using template options if available.")
current_options = checks.get("options", ["DEFAULT_BACKGROUND_FALLBACK"])
if (
not incorrect and "options" in checks and value not in checks["options"]
not incorrect and current_options is not None and value not in current_options
): # FAILSTATE Value is not one of the options
incorrect = True
elif ( # Original check if not background_choice or if current_options remained None (should not happen with fallbacks)
not incorrect and name != "background_choice" and "options" in checks and value not in checks["options"]
):
incorrect = True
if (
not incorrect
and "regex" in checks
and (
(isinstance(value, str) and re.match(checks["regex"], value) is None)
or not isinstance(value, str)
or not isinstance(value, str) # Ensure value is string if regex is present
)
): # FAILSTATE Value doesn't match regex, or has regex but is not a string.
incorrect = True
# Length/Value checks for non-iterables (int, float)
if (
not incorrect
and not hasattr(value, "__iter__")
and not hasattr(value, "__iter__") # Ensure it's not a string or list here
and not isinstance(value, str) # Explicitly exclude strings from this numeric check
and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or ("nmax" in checks and checks["nmax"] is not None and value > checks["nmax"])
)
):
incorrect = True
# Length checks for iterables (str, list)
if (
not incorrect
and hasattr(value, "__iter__")
and hasattr(value, "__iter__") # Applies to strings, lists, etc.
and (
("nmin" in checks and checks["nmin"] is not None and len(value) < checks["nmin"])
or ("nmax" in checks and checks["nmax"] is not None and len(value) > checks["nmax"])
@ -68,6 +135,23 @@ def check(value, checks, name):
incorrect = True
if incorrect:
# Get the type converter for handle_input
# If get_check_value("type", False) was intended to pass the type string itself,
# then we might not need _get_safe_type_converter here, but handle_input needs to be aware.
# Assuming handle_input expects a callable type constructor or our safe converter.
input_type_str = get_check_value("type", None)
input_type_callable = None
if input_type_str:
try:
input_type_callable = _get_safe_type_converter(input_type_str)
except ValueError as e:
logger.warning(f"Invalid type '{input_type_str}' in template for '{name}': {e}. Defaulting to string input for prompt.")
input_type_callable = str
else:
logger.debug(f"No type specified in template for '{name}'. Defaulting to string input for prompt.")
input_type_callable = str
value = handle_input(
message=(
(("[blue]Example: " + str(checks["example"]) + "\n") if "example" in checks else "")
@ -78,7 +162,7 @@ def check(value, checks, name):
+ str(name)
+ "[#F7768E bold]=",
extra_info=get_check_value("explanation", ""),
check_type=eval(get_check_value("type", "False")),
check_type=input_type_callable, # Pass the callable converter
default=get_check_value("default", NotImplemented),
match=get_check_value("regex", ""),
err_message=get_check_value("input_error", "Incorrect input"),
@ -112,46 +196,63 @@ def check_toml(template_file, config_file) -> Tuple[bool, Dict]:
config = None
try:
template = toml.load(template_file)
logger.debug(f"Successfully loaded template file: {template_file}")
except Exception as error:
console.print(f"[red bold]Encountered error when trying to to load {template_file}: {error}")
logger.error(f"Encountered error when trying to load template file {template_file}: {error}", exc_info=True)
return False
try:
config = toml.load(config_file)
except toml.TomlDecodeError:
console.print(
f"""[blue]Couldn't read {config_file}.
Overwrite it?(y/n)"""
)
if not input().startswith("y"):
print("Unable to read config, and not allowed to overwrite it. Giving up.")
logger.debug(f"Successfully loaded config file: {config_file}")
except toml.TomlDecodeError as e:
logger.error(f"Couldn't decode TOML from {config_file}: {e}")
# Rich print for interactive part, then log the choice
console = Console() # Local console for this interactive part
console.print(f"""[blue]Malformed configuration file detected at {config_file}.
It might be corrupted.
Overwrite with a fresh configuration based on the template? (y/n)[/blue]""")
choice = input().strip().lower()
logger.info(f"User choice for overwriting malformed config {config_file}: {choice}")
if not choice.startswith("y"):
logger.warning(f"User chose not to overwrite malformed config {config_file}. Cannot proceed.")
return False
else:
try:
with open(config_file, "w") as f:
f.write("")
except:
console.print(
f"[red bold]Failed to overwrite {config_file}. Giving up.\nSuggestion: check {config_file} permissions for the user."
)
with open(config_file, "w", encoding="utf-8") as f:
f.write("") # Create an empty file to be populated by template
config = {} # Start with an empty config dict
logger.info(f"Malformed config {config_file} cleared for fresh population.")
except IOError as ioe:
logger.error(f"Failed to clear/overwrite malformed config file {config_file}: {ioe}", exc_info=True)
return False
except FileNotFoundError:
console.print(
f"""[blue]Couldn't find {config_file}
Creating it now."""
)
logger.info(f"Config file {config_file} not found. Creating it now based on template.")
try:
with open(config_file, "x") as f:
f.write("")
# Create the file by opening in 'w' mode, then it will be populated by toml.dump later
# No need to write "" explicitly if we are going to dump template content or an empty dict.
# For safety, ensure parent directory exists if config_file includes directories.
Path(config_file).parent.mkdir(parents=True, exist_ok=True)
with open(config_file, "w", encoding="utf-8") as f:
# Start with an empty config, to be filled by crawling the template
toml.dump({}, f)
config = {}
except:
console.print(
f"[red bold]Failed to write to {config_file}. Giving up.\nSuggestion: check the folder's permissions for the user."
)
logger.info(f"New config file {config_file} created.")
except IOError as e:
logger.error(f"Failed to create new config file {config_file}: {e}", exc_info=True)
return False
console.print(
"""\
[blue bold]###############################
logger.info(
"Checking TOML configuration. User will be prompted for any missing/invalid essential values."
)
# The following banner is fine with print as it's a one-time display for interactive setup.
+ # However, for consistency, it could also be logged at INFO level if desired.
+ # For now, let's keep it as console.print for its specific formatting.
+ # If RichHandler is active for logging, logger.info would also use Rich.
+ # To ensure it uses the local `console` for this specific print:
+ local_console_for_banner = Console()
+ local_console_for_banner.print(
+ """\
+[blue bold]###############################
# #
# Checking TOML configuration #
# #

@ -1,76 +1,104 @@
import json
from os.path import exists
# from os.path import exists # Replaced by pathlib
from pathlib import Path # Added pathlib
import logging # Added logging
from utils import settings
from utils.ai_methods import sort_by_similarity
from utils.console import print_substep
# from utils.console import print_substep # Replaced by logger
logger = logging.getLogger(__name__)
def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similarity_scores=None):
"""_summary_
"""
Finds a suitable Reddit submission that has not been processed yet.
Args:
submissions (list): List of posts that are going to potentially be generated into a video
subreddit (praw.Reddit.SubredditHelper): Chosen subreddit
submissions (list): List of PRAW submission objects.
subreddit (praw.Reddit.SubredditHelper): The subreddit object.
times_checked (int): Counter for recursion depth (related to time filters).
similarity_scores (Optional[list]): Scores if AI similarity is used.
Returns:
Any: The submission that has not been done
Union[praw.models.Submission, Tuple[praw.models.Submission, float], None]:
The suitable submission, or (submission, score) if scores provided, or None if no suitable post found.
"""
# Second try of getting a valid Submission
logger.info(f"Checking {len(submissions)} submissions for suitability (Attempt: {times_checked + 1}).")
if times_checked and settings.config["ai"]["ai_similarity_enabled"]:
print("Sorting based on similarity for a different date filter and thread limit..")
logger.info("AI similarity enabled. Sorting submissions for current batch...")
submissions = sort_by_similarity(
submissions, keywords=settings.config["ai"]["ai_similarity_enabled"]
)
# recursively checks if the top submission in the list was already done.
if not exists("./video_creation/data/videos.json"):
with open("./video_creation/data/videos.json", "w+") as f:
videos_json_path = Path("./video_creation/data/videos.json")
if not videos_json_path.exists():
logger.info(f"{videos_json_path} not found. Creating an empty list.")
videos_json_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent dir exists
with open(videos_json_path, "w+", encoding="utf-8") as f:
json.dump([], f)
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
done_videos = json.load(done_vids_raw)
try:
with open(videos_json_path, "r", encoding="utf-8") as done_vids_raw:
done_videos = json.load(done_vids_raw)
except (json.JSONDecodeError, FileNotFoundError) as e: # Added FileNotFoundError just in case
logger.error(f"Error reading or decoding {videos_json_path}: {e}. Assuming no videos are done.", exc_info=True)
done_videos = []
for i, submission in enumerate(submissions):
logger.debug(f"Checking submission: {submission.id} - '{submission.title[:50]}...'")
if already_done(done_videos, submission):
logger.debug(f"Submission {submission.id} already processed. Skipping.")
continue
if submission.over_18:
try:
if not settings.config["settings"]["allow_nsfw"]:
print_substep("NSFW Post Detected. Skipping...")
logger.info(f"NSFW Post {submission.id} detected and allow_nsfw is false. Skipping.")
continue
except AttributeError:
print_substep("NSFW settings not defined. Skipping NSFW post...")
except KeyError: # If allow_nsfw setting is missing
logger.warning(f"NSFW setting 'allow_nsfw' not defined in config. Skipping NSFW post {submission.id}.")
continue
if submission.stickied:
print_substep("This post was pinned by moderators. Skipping...")
logger.info(f"Submission {submission.id} is stickied. Skipping.")
continue
if (
submission.num_comments <= int(settings.config["reddit"]["thread"]["min_comments"])
and not settings.config["settings"]["storymode"]
):
print_substep(
f'This post has under the specified minimum of comments ({settings.config["reddit"]["thread"]["min_comments"]}). Skipping...'
min_comments = int(settings.config["reddit"]["thread"]["min_comments"])
if not settings.config["settings"]["storymode"] and submission.num_comments <= min_comments:
logger.info(
f"Submission {submission.id} has {submission.num_comments} comments (min: {min_comments}). Skipping."
)
continue
if settings.config["settings"]["storymode"]:
if not submission.selftext:
print_substep("You are trying to use story mode on post with no post text")
logger.info(f"Storymode enabled, but submission {submission.id} has no selftext. Skipping.")
continue
else:
# Check for the length of the post text
if len(submission.selftext) > (
settings.config["settings"]["storymode_max_length"] or 2000
):
print_substep(
f"Post is too long ({len(submission.selftext)}), try with a different post. ({settings.config['settings']['storymode_max_length']} character limit)"
story_max_len = settings.config["settings"].get("storymode_max_length", 2000) # Use .get for safety
story_min_len = 30 # Hardcoded in original, could be config
if len(submission.selftext) > story_max_len:
logger.info(
f"Storymode: Post {submission.id} selftext too long ({len(submission.selftext)} chars, limit: {story_max_len}). Skipping."
)
continue
elif len(submission.selftext) < 30:
elif len(submission.selftext) < story_min_len:
logger.info(
f"Storymode: Post {submission.id} selftext too short ({len(submission.selftext)} chars, min: {story_min_len}). Skipping."
)
continue
if settings.config["settings"]["storymode"] and not submission.is_self:
continue
if similarity_scores is not None:
if not submission.is_self: # Storymode usually implies self-posts
logger.info(f"Storymode enabled, but submission {submission.id} is not a self-post. Skipping.")
continue
logger.info(f"Found suitable submission: {submission.id} - '{submission.title[:50]}...'")
if similarity_scores is not None and i < len(similarity_scores): # Check index bounds for safety
return submission, similarity_scores[i].item()
return submission
print("all submissions have been done going by top submission order")
logger.warning("All submissions in the current batch were unsuitable or already processed.")
VALID_TIME_FILTERS = [
"day",
"hour",
@ -78,19 +106,35 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
"week",
"year",
"all",
] # set doesn't have __getitem__
index = times_checked + 1
if index == len(VALID_TIME_FILTERS):
print("All submissions have been done.")
]
current_time_filter_index = times_checked # times_checked is 0-indexed for the list
if current_time_filter_index >= len(VALID_TIME_FILTERS) -1 : # -1 because we use current_time_filter_index for next
logger.info("All time filters exhausted. No more submissions to check.")
return None # Base case for recursion: no more filters to try
next_time_filter_index = current_time_filter_index + 1
next_time_filter = VALID_TIME_FILTERS[next_time_filter_index]
# Limit calculation: original was `(50 if int(index) == 0 else index + 1 * 50)`
# This seemed to try and increase limit. Let's use a simpler, potentially larger fixed limit for subsequent tries.
# Or keep it simple for now. The original logic for limit was a bit complex.
# Let's use a fixed limit for deeper searches for now.
next_limit = settings.config["reddit"]["thread"].get("thread_limit", 25) * (next_time_filter_index + 1) # Increase limit slightly
logger.info(f"Trying next time_filter '{next_time_filter}' with limit {next_limit} for subreddit '{subreddit.display_name}'.")
try:
next_submissions = list(subreddit.top(time_filter=next_time_filter, limit=next_limit))
except Exception as e:
logger.error(f"Error fetching submissions for subreddit '{subreddit.display_name}' with filter '{next_time_filter}': {e}", exc_info=True)
return None # Cannot proceed if fetching fails
return get_subreddit_undone(
subreddit.top(
time_filter=VALID_TIME_FILTERS[index],
limit=(50 if int(index) == 0 else index + 1 * 50),
),
next_submissions,
subreddit,
times_checked=index,
) # all the videos in hot have already been done
times_checked=next_time_filter_index, # Pass the new index
# similarity_scores are not passed for subsequent calls as they were for the initial batch
)
def already_done(done_videos: list, submission) -> bool:

@ -1,6 +1,8 @@
from __future__ import annotations
import os
import os # os.path will be replaced by pathlib
from pathlib import Path # Added for pathlib
import logging # Added for logging
from typing import List, Optional
from google.oauth2.credentials import Credentials
@ -11,23 +13,57 @@ from google.auth.transport.requests import Request
SCOPES = ["https://www.googleapis.com/auth/youtube.upload"]
CLIENT_SECRETS_FILE = "youtube_client_secrets.json"
TOKEN_FILE = "youtube_token.json"
CLIENT_SECRETS_FILE = Path("youtube_client_secrets.json") # Use Path
TOKEN_FILE = Path("youtube_token.json") # Use Path
logger = logging.getLogger(__name__)
def _get_service():
"""Return an authenticated YouTube service object."""
creds: Optional[Credentials] = None
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
logger.debug(f"Looking for existing token file at: {TOKEN_FILE}")
if TOKEN_FILE.exists():
try:
creds = Credentials.from_authorized_user_file(str(TOKEN_FILE), SCOPES)
logger.info(f"Loaded credentials from {TOKEN_FILE}")
except Exception as e:
logger.warning(f"Failed to load credentials from {TOKEN_FILE}: {e}. Will attempt re-authentication.")
creds = None # Ensure creds is None if loading failed
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
creds = flow.run_console()
with open(TOKEN_FILE, "w") as token:
token.write(creds.to_json())
logger.info("Credentials expired. Attempting to refresh token...")
try:
creds.refresh(Request())
logger.info("Credentials refreshed successfully.")
except Exception as e:
logger.error(f"Failed to refresh credentials: {e}. Will attempt re-authentication.", exc_info=True)
creds = None # Force re-authentication
if not creds or not creds.valid: # Check again if refresh failed or was not attempted
logger.info("No valid credentials found or refresh failed. Starting new OAuth2 flow...")
if not CLIENT_SECRETS_FILE.exists():
logger.error(f"Client secrets file '{CLIENT_SECRETS_FILE}' not found. Cannot authenticate.")
raise FileNotFoundError(f"YouTube client secrets file '{CLIENT_SECRETS_FILE}' is required for authentication.")
try:
flow = InstalledAppFlow.from_client_secrets_file(str(CLIENT_SECRETS_FILE), SCOPES)
# run_console will print to stdout and read from stdin.
logger.info("Please follow the instructions in your browser to authenticate.")
creds = flow.run_console()
logger.info("OAuth2 flow completed. Credentials obtained.")
except Exception as e:
logger.error(f"OAuth2 flow failed: {e}", exc_info=True)
raise RuntimeError(f"Failed to obtain YouTube credentials via OAuth2 flow: {e}")
try:
with open(TOKEN_FILE, "w", encoding="utf-8") as token_file_handle:
token_file_handle.write(creds.to_json())
logger.info(f"Credentials saved to {TOKEN_FILE}")
except IOError as e:
logger.error(f"Failed to save token file to {TOKEN_FILE}: {e}", exc_info=True)
# Not raising here as the service might still work with in-memory creds for this session.
logger.debug("Returning YouTube service object.")
return build("youtube", "v3", credentials=creds)

@ -8,12 +8,15 @@ from typing import Any, Dict, Tuple
import yt_dlp
from moviepy.editor import AudioFileClip, VideoFileClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
import logging # Added for logging
from utils import settings
from utils.console import print_step, print_substep
# from utils.console import print_step, print_substep # To be replaced by logging
logger = logging.getLogger(__name__)
def load_background_options():
logger.debug("Loading background options from JSON files...")
background_options = {}
# Load background videos
with open("./utils/background_videos.json") as json_file:
@ -31,8 +34,12 @@ def load_background_options():
pos = background_options["video"][name][3]
if pos != "center":
background_options["video"][name][3] = lambda t: ("center", pos + t)
# This lambda modification is tricky and might have unintended consequences if state is not handled carefully.
# For logging purposes, we'll assume it's correct.
logger.debug(f"Modifying position for background video '{name}' from '{pos}' to a lambda function.")
background_options["video"][name][3] = lambda t, p=pos: ("center", p + t) # Ensure pos is captured correctly
logger.info("Background options loaded and processed.")
return background_options
@ -61,16 +68,31 @@ def get_background_config(mode: str):
"""Fetch the background/s configuration"""
try:
choice = str(settings.config["settings"]["background"][f"background_{mode}"]).casefold()
except AttributeError:
print_substep("No background selected. Picking random background'")
logger.debug(f"User's configured background choice for {mode}: {choice}")
except KeyError: # More specific exception if the key itself is missing
logger.warning(f"Background setting for '{mode}' not found in config. Picking random background.")
choice = None
except AttributeError: # Should not happen if config structure is as expected
logger.warning(f"Attribute error accessing background setting for '{mode}'. Picking random background.")
choice = None
# Handle default / not supported background using default option.
# Default : pick random from supported background.
if not choice or choice not in background_options[mode]:
choice = random.choice(list(background_options[mode].keys()))
if not choice:
logger.info(f"No background {mode} explicitly chosen or found. Selecting a random one.")
else: # Choice was made but not found in available options
logger.warning(f"Chosen background {mode} '{choice}' not found in available options. Selecting a random one.")
available_keys = list(background_options[mode].keys())
if not available_keys:
logger.error(f"No background {mode} options available at all (e.g., from JSON). Cannot select a background.")
raise ValueError(f"No background {mode} options available. Check background JSON files.")
choice = random.choice(available_keys)
logger.info(f"Randomly selected background {mode}: {choice}")
return background_options[mode][choice]
selected_config = background_options[mode][choice]
logger.debug(f"Final selected background {mode} config: Name='{choice}', URI='{selected_config[0]}', Filename='{selected_config[1]}'")
return selected_config
def download_background_video(background_config: Tuple[str, str, str, Any]):
@ -78,90 +100,207 @@ def download_background_video(background_config: Tuple[str, str, str, Any]):
Path("./assets/backgrounds/video/").mkdir(parents=True, exist_ok=True)
# note: make sure the file name doesn't include an - in it
uri, filename, credit, _ = background_config
if Path(f"assets/backgrounds/video/{credit}-{filename}").is_file():
output_dir = Path("./assets/backgrounds/video/")
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{credit}-{filename}"
if output_path.is_file():
logger.info(f"Background video {output_path} already exists. Skipping download.")
return
print_step(
"We need to download the backgrounds videos. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds videos... please be patient 🙏 ")
print_substep(f"Downloading {filename} from {uri}")
logger.info("Background video(s) need to be downloaded (only done once).")
logger.info(f"Downloading background video: {filename} from {uri} to {output_path}")
ydl_opts = {
"format": "bestvideo[height<=1080][ext=mp4]",
"outtmpl": f"assets/backgrounds/video/{credit}-{filename}",
"format": "bestvideo[height<=1080][ext=mp4]", # Ensure MP4 for compatibility
"outtmpl": str(output_path), # yt-dlp expects string path
"retries": 10,
"quiet": True, # Suppress yt-dlp console output, rely on our logging
"noplaylist": True, # Download only single video if URI is a playlist
"logger": logger, # Pass our logger to yt-dlp if it supports it (might not directly)
# Alternatively, capture its stdout/stderr if needed.
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download(uri)
print_substep("Background video downloaded successfully! 🎉", style="bold green")
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([uri]) # Pass URI as a list
logger.info(f"Background video '{filename}' downloaded successfully!")
except yt_dlp.utils.DownloadError as e:
logger.error(f"Failed to download background video {filename} from {uri}: {e}")
# Consider raising an exception or specific error handling
raise RuntimeError(f"yt-dlp failed to download {uri}: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred during video download for {filename}: {e}", exc_info=True)
raise
def download_background_audio(background_config: Tuple[str, str, str]):
"""Downloads the background/s audio from YouTube."""
Path("./assets/backgrounds/audio/").mkdir(parents=True, exist_ok=True)
# note: make sure the file name doesn't include an - in it
uri, filename, credit = background_config
if Path(f"assets/backgrounds/audio/{credit}-{filename}").is_file():
return
print_step(
"We need to download the backgrounds audio. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds audio... please be patient 🙏 ")
print_substep(f"Downloading {filename} from {uri}")
output_dir = Path("./assets/backgrounds/audio/")
output_dir.mkdir(parents=True, exist_ok=True)
# yt-dlp will add the correct extension based on 'bestaudio' format.
# We'll save the path without extension in outtmpl, then find the downloaded file.
# For simplicity, let's assume it saves as {credit}-{filename}.mp3 or similar.
# A more robust way is to hook into yt-dlp's progress hooks to get the exact filename.
base_output_path_str = str(output_dir / f"{credit}-{filename}")
# Check if any audio file with this base name exists (e.g. .mp3, .m4a, .opus)
# This is a simple check; yt-dlp might choose different extensions.
# For now, we check for common ones or rely on re-download if specific extension is unknown.
# A better approach would be to not check and let yt-dlp handle "already downloaded".
# If we simply check for `output_dir / f"{credit}-{filename}.mp3"`, it might miss other formats.
# For now, let's assume we want to ensure an .mp3 for consistency if possible, or let yt-dlp choose.
# The current ydl_opts doesn't force mp3, it uses 'bestaudio/best'.
# Simplified check: if a file with the base name exists (regardless of common audio extensions), skip.
# This isn't perfect. yt-dlp's own download archive is better.
# For now, let's check for a common one like .mp3 for the skip logic.
# This means if it downloaded as .opus, it might re-download.
# The most robust way is to let yt-dlp manage this via its download archive or by checking its output.
# Given the current structure, we'll keep a simple check.
potential_output_file = output_dir / f"{credit}-{filename}.mp3" # Assuming mp3 for check
if potential_output_file.is_file(): # Simple check, might not cover all cases if format changes
logger.info(f"Background audio {potential_output_file} seems to exist. Skipping download.")
return
logger.info("Background audio(s) need to be downloaded (only done once).")
logger.info(f"Downloading background audio: {filename} from {uri} to {base_output_path_str} (extension auto-detected)")
ydl_opts = {
"outtmpl": f"./assets/backgrounds/audio/{credit}-{filename}",
"format": "bestaudio/best",
"extract_audio": True,
"outtmpl": base_output_path_str, # yt-dlp adds extension
"format": "bestaudio[ext=mp3]/bestaudio", # Prefer mp3, fallback to best audio
"extract_audio": True, # Ensure only audio is downloaded
"quiet": True,
"noplaylist": True,
"logger": logger, # Pass logger
# "postprocessors": [{ # Example to force mp3, requires ffmpeg
# 'key': 'FFmpegExtractAudio',
# 'preferredcodec': 'mp3',
# 'preferredquality': '192', # Bitrate
# }],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([uri])
print_substep("Background audio downloaded successfully! 🎉", style="bold green")
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([uri])
logger.info(f"Background audio '{filename}' downloaded successfully!")
except yt_dlp.utils.DownloadError as e:
logger.error(f"Failed to download background audio {filename} from {uri}: {e}")
raise RuntimeError(f"yt-dlp failed to download audio {uri}: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred during audio download for {filename}: {e}", exc_info=True)
raise
def chop_background(background_config: Dict[str, Tuple], video_length: int, reddit_object: dict):
"""Generates the background audio and footage to be used in the video and writes it to assets/temp/background.mp3 and assets/temp/background.mp4
"""Generates the background audio and footage to be used in the video."""
# reddit_object["thread_id"] should be used if "safe_thread_id" is not reliably passed.
# Assuming "safe_thread_id" is available from the refactored main.py.
safe_id = reddit_object.get("safe_thread_id", re.sub(r"[^\w\s-]", "", reddit_object["thread_id"]))
temp_dir = Path(f"assets/temp/{safe_id}")
temp_dir.mkdir(parents=True, exist_ok=True) # Ensure temp dir exists
Args:
background_config (Dict[str,Tuple]]) : Current background configuration
video_length (int): Length of the clip where the background footage is to be taken out of
"""
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
background_audio_volume = settings.config["settings"]["background"].get("background_audio_volume", 0)
if settings.config["settings"]["background"][f"background_audio_volume"] == 0:
print_step("Volume was set to 0. Skipping background audio creation . . .")
if background_audio_volume == 0:
logger.info("Background audio volume is 0. Skipping background audio chopping.")
else:
print_step("Finding a spot in the backgrounds audio to chop...✂️")
audio_choice = f"{background_config['audio'][2]}-{background_config['audio'][1]}"
background_audio = AudioFileClip(f"assets/backgrounds/audio/{audio_choice}")
logger.info("Processing background audio chop...")
# Ensure background_config['audio'] has enough elements
if len(background_config['audio']) < 3:
logger.error(f"Audio background config is malformed: {background_config['audio']}. Expected at least 3 elements (uri, filename, credit).")
raise ValueError("Malformed audio background configuration.")
audio_credit = background_config['audio'][2]
audio_filename_part = background_config['audio'][1]
# Try to find the downloaded audio file (yt-dlp might add various extensions)
# Common extensions: mp3, m4a, ogg, wav, opus
# This is a bit fragile; ideally, yt-dlp would report the exact output filename.
audio_base_path = Path(f"assets/backgrounds/audio/{audio_credit}-{audio_filename_part}")
actual_audio_file = None
for ext in [".mp3", ".m4a", ".ogg", ".wav", ".opus"]: # Common audio extensions
if (audio_base_path.with_suffix(ext)).exists():
actual_audio_file = audio_base_path.with_suffix(ext)
break
if not actual_audio_file:
logger.error(f"Downloaded background audio file not found for base: {audio_base_path}. Searched common extensions.")
# Fallback: try with original filename directly, maybe it had an extension already
if Path(f"assets/backgrounds/audio/{audio_credit}-{audio_filename_part}").exists():
actual_audio_file = Path(f"assets/backgrounds/audio/{audio_credit}-{audio_filename_part}")
else:
raise FileNotFoundError(f"Background audio {audio_base_path} with common extensions not found.")
logger.debug(f"Using background audio file: {actual_audio_file}")
background_audio_clip = AudioFileClip(str(actual_audio_file))
start_time_audio, end_time_audio = get_start_and_end_times(
video_length, background_audio.duration
video_length, background_audio_clip.duration
)
background_audio = background_audio.subclip(start_time_audio, end_time_audio)
background_audio.write_audiofile(f"assets/temp/{id}/background.mp3")
logger.debug(f"Chopping audio from {start_time_audio}s to {end_time_audio}s.")
chopped_audio = background_audio_clip.subclip(start_time_audio, end_time_audio)
chopped_audio.write_audiofile(str(temp_dir / "background.mp3"))
logger.info("Background audio chopped and saved successfully.")
background_audio_clip.close() # Release file handle
chopped_audio.close()
logger.info("Processing background video chop...")
if len(background_config['video']) < 2:
logger.error(f"Video background config is malformed: {background_config['video']}. Expected at least 2 elements (uri, filename).")
raise ValueError("Malformed video background configuration.")
video_credit = background_config['video'][2] # Credit is the 3rd element
video_filename_part = background_config['video'][1] # Filename is the 2nd element
# Assuming video is always mp4 due to ydl_opts format preference
video_source_path = Path(f"assets/backgrounds/video/{video_credit}-{video_filename_part}")
if not video_source_path.exists():
# This case should ideally be caught by download_background_video if it fails.
logger.error(f"Background video file {video_source_path} not found for chopping.")
raise FileNotFoundError(f"Background video {video_source_path} not found.")
logger.debug(f"Using background video file: {video_source_path}")
# Getting duration directly with moviepy can be slow for long videos if it re-scans.
# yt-dlp usually provides duration metadata. If not, moviepy will find it.
# For now, assume VideoFileClip is efficient enough or duration is known.
# If performance is an issue, get duration from yt-dlp metadata during download.
try:
video_clip_for_duration = VideoFileClip(str(video_source_path))
video_duration = video_clip_for_duration.duration
video_clip_for_duration.close() # Close after getting duration
except Exception as e:
logger.error(f"Could not read duration from video file {video_source_path} using MoviePy: {e}", exc_info=True)
raise RuntimeError(f"Failed to get duration for {video_source_path}")
print_step("Finding a spot in the backgrounds video to chop...✂️")
video_choice = f"{background_config['video'][2]}-{background_config['video'][1]}"
background_video = VideoFileClip(f"assets/backgrounds/video/{video_choice}")
start_time_video, end_time_video = get_start_and_end_times(
video_length, background_video.duration
video_length, video_duration
)
# Extract video subclip
logger.debug(f"Chopping video from {start_time_video}s to {end_time_video}s.")
target_video_path = str(temp_dir / "background.mp4")
try:
ffmpeg_extract_subclip(
f"assets/backgrounds/video/{video_choice}",
str(video_source_path),
start_time_video,
end_time_video,
targetname=f"assets/temp/{id}/background.mp4",
targetname=target_video_path,
)
except (OSError, IOError): # ffmpeg issue see #348
print_substep("FFMPEG issue. Trying again...")
with VideoFileClip(f"assets/backgrounds/video/{video_choice}") as video:
new = video.subclip(start_time_video, end_time_video)
new.write_videofile(f"assets/temp/{id}/background.mp4")
print_substep("Background video chopped successfully!", style="bold green")
return background_config["video"][2]
except (OSError, IOError) as e: # ffmpeg issue see #348
logger.warning(f"ffmpeg_extract_subclip failed ({e}). Retrying with MoviePy's subclip method...")
try:
with VideoFileClip(str(video_source_path)) as video_file_clip: # Ensure resources are closed
new_subclip = video_file_clip.subclip(start_time_video, end_time_video)
new_subclip.write_videofile(target_video_path, logger='bar' if settings.config['settings'].get('verbose_ffmpeg', False) else None) # MoviePy's own progress bar
except Exception as moviepy_e:
logger.error(f"MoviePy subclip method also failed: {moviepy_e}", exc_info=True)
raise RuntimeError(f"Both ffmpeg_extract_subclip and MoviePy subclip failed for {video_source_path}")
logger.info("Background video chopped successfully!")
return background_config["video"][2] # Return credit
# Create a tuple for downloads background (background_audio_options, background_video_options)

File diff suppressed because it is too large Load Diff

@ -5,16 +5,18 @@ from typing import Dict, Final
import translators
from playwright.sync_api import ViewportSize, sync_playwright
from rich.progress import track
from rich.progress import track # Keep for progress bar
import logging # Added for logging
from utils import settings
from utils.console import print_step, print_substep
# from utils.console import print_step, print_substep # To be replaced by logging
from utils.imagenarator import imagemaker
from utils.playwright import clear_cookie_by_name
from utils.videos import save_data
__all__ = ["get_screenshots_of_reddit_posts"]
logger = logging.getLogger(__name__)
def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
"""Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png
@ -26,13 +28,16 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# settings values
W: Final[int] = int(settings.config["settings"]["resolution_w"])
H: Final[int] = int(settings.config["settings"]["resolution_h"])
lang: Final[str] = settings.config["reddit"]["thread"]["post_lang"]
lang: Final[str] = settings.config["reddit"]["thread"].get("post_lang") # Use .get for safety
storymode: Final[bool] = settings.config["settings"]["storymode"]
print_step("Downloading screenshots of reddit posts...")
reddit_id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
# ! Make sure the reddit screenshots folder exists
Path(f"assets/temp/{reddit_id}/png").mkdir(parents=True, exist_ok=True)
logger.info("Downloading screenshots of reddit posts...")
# Use safe_thread_id if available from prior processing, otherwise sanitize
reddit_id = reddit_object.get("safe_thread_id", re.sub(r"[^\w\s-]", "", reddit_object["thread_id"]))
screenshot_dir = Path(f"assets/temp/{reddit_id}/png")
screenshot_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Ensured screenshot directory exists: {screenshot_dir}")
# set the theme and disable non-essential cookies
if settings.config["settings"]["theme"] == "dark":
@ -59,71 +64,92 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
txtcolor = (0, 0, 0)
transparent = False
logger.debug(f"Theme: {settings.config['settings']['theme']}, BGColor: {bgcolor}, TextColor: {txtcolor}, Transparent: {transparent}")
if storymode and settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
print_substep("Generating images...")
return imagemaker(
theme=bgcolor,
reddit_obj=reddit_object,
txtclr=txtcolor,
transparent=transparent,
)
screenshot_num: int
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
browser = p.chromium.launch(
headless=True
) # headless=False will show the browser for debugging purposes
# Device scale factor (or dsf for short) allows us to increase the resolution of the screenshots
# When the dsf is 1, the width of the screenshot is 600 pixels
# so we need a dsf such that the width of the screenshot is greater than the final resolution of the video
dsf = (W // 600) + 1
context = browser.new_context(
locale=lang or "en-us",
color_scheme="dark",
viewport=ViewportSize(width=W, height=H),
device_scale_factor=dsf,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
)
cookies = json.load(cookie_file)
cookie_file.close()
context.add_cookies(cookies) # load preference cookies
# Login to Reddit
print_substep("Logging in to Reddit...")
page = context.new_page()
page.goto("https://www.reddit.com/login", timeout=0)
page.set_viewport_size(ViewportSize(width=1920, height=1080))
page.wait_for_load_state()
logger.info("Storymode method 1 selected. Generating images directly using imagemaker.")
try:
imagemaker(
theme=bgcolor,
reddit_obj=reddit_object,
txtclr=txtcolor,
transparent=transparent,
)
logger.info("Imagemaker generation complete for storymode method 1.")
return # End of function for this storymode type
except Exception as e:
logger.error(f"Error during imagemaker generation for storymode: {e}", exc_info=True)
# Decide if to raise or handle. For now, re-raise to signal failure.
raise RuntimeError(f"Imagemaker failed for storymode: {e}")
# screenshot_num: int # Type hint already present in function signature
logger.info("Proceeding with Playwright for screenshot generation.")
try:
with sync_playwright() as p:
logger.info("Launching Headless Browser (Playwright)...")
browser_launch_options = {"headless": True}
# Example: Add proxy from settings if configured
# proxy_settings = settings.config["settings"].get("proxy")
# if proxy_settings and proxy_settings.get("server"):
# browser_launch_options["proxy"] = proxy_settings
# logger.info(f"Using proxy for Playwright: {proxy_settings.get('server')}")
browser = p.chromium.launch(**browser_launch_options)
dsf = (W // 600) + 1 # Ensure dsf is at least 1, even if W < 600
logger.debug(f"Device Scale Factor (DSF) calculated: {dsf} for width {W}")
context = browser.new_context(
locale=lang or "en-US", # Ensure valid locale format
color_scheme="dark" if settings.config["settings"]["theme"] in ["dark", "transparent"] else "light",
viewport=ViewportSize(width=W, height=H), # Using W, H for viewport
device_scale_factor=dsf,
# Consider making user_agent configurable or updating it periodically
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
)
cookies = json.load(cookie_file)
cookie_file.close() # Ensure file is closed after loading
context.add_cookies(cookies)
logger.debug("Cookies added to browser context.")
page = context.new_page()
logger.info("Logging in to Reddit via Playwright...")
page.goto("https://www.reddit.com/login", timeout=60000) # Increased timeout
# Using a more specific viewport for login page if needed, then change for screenshots
page.set_viewport_size(ViewportSize(width=1280, height=720)) # Standard size for login page interaction
page.wait_for_load_state("domcontentloaded") # Wait for DOM, not necessarily all resources
page.locator('input[name="username"]').fill(settings.config["reddit"]["creds"]["username"])
page.locator('input[name="password"]').fill(settings.config["reddit"]["creds"]["password"])
page.get_by_role("button", name="Log In").click()
try:
# Wait for either a successful navigation OR a login error message
# This makes it more robust than a fixed timeout.
page.wait_for_url("https://www.reddit.com/", timeout=15000) # Wait for redirect to main page
logger.info("Reddit login appears successful (navigated to main page).")
except Exception: # TimeoutError from Playwright if URL doesn't change
logger.debug("Did not navigate to main page after login attempt, checking for error messages.")
login_error_div = page.locator(".AnimatedForm__errorMessage").first
if login_error_div.is_visible(timeout=2000): # Brief check for error message
login_error_message = login_error_div.inner_text()
if login_error_message and login_error_message.strip():
logger.error(f"Reddit login failed. Error message: {login_error_message.strip()}")
# exit() is too abrupt. Raise an exception.
raise ConnectionRefusedError(f"Reddit login failed: {login_error_message.strip()}. Please check credentials.")
else:
logger.info("Login error div present but empty, assuming login was okay or redirected quickly.")
else:
logger.warning("Reddit login status unclear after timeout and no visible error message. Proceeding cautiously.")
page.locator(f'input[name="username"]').fill(settings.config["reddit"]["creds"]["username"])
page.locator(f'input[name="password"]').fill(settings.config["reddit"]["creds"]["password"])
page.get_by_role("button", name="Log In").click()
page.wait_for_timeout(5000)
login_error_div = page.locator(".AnimatedForm__errorMessage").first
if login_error_div.is_visible():
login_error_message = login_error_div.inner_text()
if login_error_message.strip() == "":
# The div element is empty, no error
pass
else:
# The div contains an error message
print_substep(
"Your reddit credentials are incorrect! Please modify them accordingly in the config.toml file.",
style="red",
)
exit()
else:
pass
page.wait_for_load_state()
# Handle the redesign
# Check if the redesign optout cookie is set
# Handle the redesign - this logic might be outdated for current Reddit
# It's often better to ensure the account used is set to the desired UI (old/new)
# or accept the default UI Reddit provides.
# For now, keeping it but logging.
logger.debug("Checking for redesign opt-out button...")
if page.locator("#redesign-beta-optin-btn").is_visible():
# Clear the redesign optout cookie
clear_cookie_by_name(context, "redesign_optout")
@ -139,128 +165,200 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).is_visible():
# This means the post is NSFW and requires to click the proceed button.
print_substep("Post is NSFW. You are spicy...")
page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).click()
page.wait_for_load_state() # Wait for page to fully load
# translate code
if page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).is_visible():
page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).click() # Interest popup is showing, this code will close it
# The selector is very specific and might break easily.
nsfw_button_selector = "div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button" # Simplified part of original
# A more robust selector might be based on text or a more stable attribute if available.
# Example: page.locator('button:has-text("Yes")') or similar for NSFW confirmation.
# For now, using a part of the original selector. This is fragile.
# This specific selector "#t3_12hmbug > ..." is tied to a post ID and will not work generally.
# A more general approach is needed, perhaps looking for buttons with "yes" or "proceed" text within a modal.
# For this refactor, I'll use a placeholder for a more general NSFW button.
# A better selector would be like: page.locator('[data-testid="content-gate"] button:has-text("View")')
# or page.get_by_role("button", name=re.compile(r"yes|view|proceed", re.IGNORECASE))
# The original selector was extremely brittle.
# Simplified NSFW check (this might need adjustment based on actual Reddit UI)
# Try to find a common NSFW confirmation button
# This is a guess, actual selector might be different:
nsfw_proceed_button = page.locator('button:has-text("View")').or_(page.locator('button:has-text("Yes, I am over 18")'))
if nsfw_proceed_button.first.is_visible(timeout=2000):
logger.info("Post is marked NSFW. Attempting to click proceed button.")
try:
nsfw_proceed_button.first.click()
page.wait_for_load_state("domcontentloaded", timeout=10000)
logger.info("Clicked NSFW proceed button.")
except Exception as e:
logger.warning(f"Could not click NSFW proceed button or page did not load: {e}")
else:
logger.debug("No obvious NSFW proceed button found, or post is not NSFW.")
# Handle interest popup - also uses a very specific selector that's likely to break.
# Example: page.locator('button[aria-label="Close"]') in a modal.
# The original selector was: "#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > ... > button > i"
# This is too brittle. A more general approach for popups is needed.
# For now, we'll try to find a generic close button if a known popup structure appears.
# This is highly speculative.
# Example: page.get_by_label("Close").or_(page.get_by_title("Close"))
# For now, skipping this as the selector is too unreliable.
# logger.debug("Checking for interest popup...")
if lang:
print_substep("Translating post...")
texts_in_tl = translators.translate_text(
reddit_object["thread_title"],
to_language=lang,
translator="google",
)
page.evaluate(
"tl_content => document.querySelector('[data-adclicklocation=\"title\"] > div > div > h1').textContent = tl_content",
texts_in_tl,
)
logger.info(f"Translating post title to '{lang}'...")
try:
texts_in_tl = translators.translate_text(
reddit_object["thread_title"],
to_language=lang,
translator="google", # Consider making translator configurable
)
# This JS evaluation to change content is also brittle.
page.evaluate(
"tl_content => { try { document.querySelector('[data-adclicklocation=\"title\"] > div > div > h1').textContent = tl_content; } catch(e){ console.error('Failed to set title via JS:', e); } }",
texts_in_tl,
)
logger.info("Post title translation applied via JS (if element found).")
except Exception as e:
logger.warning(f"Failed to translate post title or apply it: {e}")
else:
print_substep("Skipping translation...")
logger.info("Skipping post title translation (no language specified).")
postcontentpath = f"assets/temp/{reddit_id}/png/title.png"
post_content_path = screenshot_dir / "title.png"
logger.info(f"Taking screenshot of post content to {post_content_path}...")
try:
if settings.config["settings"]["zoom"] != 1:
# store zoom settings
zoom = settings.config["settings"]["zoom"]
# zoom the body of the page
page.evaluate("document.body.style.zoom=" + str(zoom))
# as zooming the body doesn't change the properties of the divs, we need to adjust for the zoom
location = page.locator('[data-test-id="post-content"]').bounding_box()
for i in location:
location[i] = float("{:.2f}".format(location[i] * zoom))
page.screenshot(clip=location, path=postcontentpath)
zoom_level = settings.config["settings"].get("zoom", 1.0)
post_content_locator = page.locator('[data-test-id="post-content"]') # Standard Reddit test ID
if not post_content_locator.is_visible(timeout=10000):
logger.error("Post content area '[data-test-id=\"post-content\"]' not found or not visible.")
raise RuntimeError("Failed to find post content for screenshot.")
if zoom_level != 1.0:
logger.debug(f"Applying zoom level: {zoom_level}")
page.evaluate(f"document.body.style.zoom={zoom_level}")
# Bounding box might need adjustment after zoom, or screenshot entire viewport part.
# For simplicity, if zoom is used, screenshot might need manual verification.
# The original bounding_box manipulation after zoom was complex and error-prone.
# A simpler approach for zoom might be to adjust viewport, though dsf already handles resolution.
# For now, we'll screenshot the locator directly after zoom.
# The clip logic after zoom can be unreliable.
# Consider removing zoom or finding a more robust way if it's essential.
# For now, will attempt screenshot of the locator.
post_content_locator.screenshot(path=str(post_content_path))
else:
page.locator('[data-test-id="post-content"]').screenshot(path=postcontentpath)
post_content_locator.screenshot(path=str(post_content_path))
logger.info("Post content screenshot successful.")
except Exception as e:
print_substep("Something went wrong!", style="red")
resp = input(
"Something went wrong with making the screenshots! Do you want to skip the post? (y/n) "
)
if resp.casefold().startswith("y"):
save_data("", "", "skipped", reddit_id, "")
print_substep(
"The post is successfully skipped! You can now restart the program and this post will skipped.",
"green",
)
logger.error(f"Failed to take screenshot of post content: {e}", exc_info=True)
# The original code had interactive prompts here. For unattended operation, we should raise.
# If skipping is desired, it should be based on a config or be more robust.
# For now, re-raise to indicate failure.
# Consider saving data about skipped post here if that logic is to be kept.
# save_data("", "", "screenshot_failed", reddit_id, f"Post content screenshot error: {e}")
raise RuntimeError(f"Failed to take screenshot of post content: {e}")
resp = input("Do you want the error traceback for debugging purposes? (y/n)")
if not resp.casefold().startswith("y"):
exit()
raise e
if storymode:
page.locator('[data-click-id="text"]').first.screenshot(
path=f"assets/temp/{reddit_id}/png/story_content.png"
)
# For story mode, screenshot the main text content area.
# '[data-click-id="text"]' is a common locator for the main post body.
logger.info("Storymode: Taking screenshot of main text content...")
story_content_output_path = screenshot_dir / "story_content.png"
try:
page.locator('[data-click-id="text"]').first.screenshot(path=str(story_content_output_path))
logger.info(f"Story content screenshot saved to {story_content_output_path}")
except Exception as e:
logger.error(f"Failed to take screenshot of story content: {e}", exc_info=True)
# This might be critical for storymode; consider raising.
# For now, just log the error.
else:
# Comment Screenshots
logger.info(f"Preparing to take screenshots for up to {screenshot_num} comments.")
for idx, comment in enumerate(
track(
reddit_object["comments"][:screenshot_num],
"Downloading screenshots...",
reddit_object["comments"][:screenshot_num], # Slicing already handles if fewer comments
"Downloading comment screenshots...",
)
):
# Stop if we have reached the screenshot_num
if idx >= screenshot_num:
if idx >= screenshot_num: # Should not be needed due to slice, but good safeguard
logger.debug("Reached maximum number of comment screenshots.")
break
if page.locator('[data-testid="content-gate"]').is_visible():
page.locator('[data-testid="content-gate"] button').click()
page.goto(f"https://new.reddit.com/{comment['comment_url']}")
# translate code
if settings.config["reddit"]["thread"]["post_lang"]:
comment_tl = translators.translate_text(
comment["comment_body"],
translator="google",
to_language=settings.config["reddit"]["thread"]["post_lang"],
)
page.evaluate(
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',
[comment_tl, comment["comment_id"]],
)
comment_url = f"https://new.reddit.com{comment['comment_url']}" # Ensure full URL
logger.debug(f"Navigating to comment: {comment_url}")
try:
if settings.config["settings"]["zoom"] != 1:
# store zoom settings
zoom = settings.config["settings"]["zoom"]
# zoom the body of the page
page.evaluate("document.body.style.zoom=" + str(zoom))
# scroll comment into view
page.locator(f"#t1_{comment['comment_id']}").scroll_into_view_if_needed()
# as zooming the body doesn't change the properties of the divs, we need to adjust for the zoom
location = page.locator(f"#t1_{comment['comment_id']}").bounding_box()
for i in location:
location[i] = float("{:.2f}".format(location[i] * zoom))
page.screenshot(
clip=location,
path=f"assets/temp/{reddit_id}/png/comment_{idx}.png",
page.goto(comment_url, timeout=30000, wait_until="domcontentloaded")
# page.wait_for_load_state("domcontentloaded", timeout=10000) # Redundant if in goto
except Exception as e: # Playwright TimeoutError etc.
logger.warning(f"Timeout or error navigating to comment {comment['comment_id']}: {e}. Skipping this comment.")
continue # Skip this comment
# Handle content gates (e.g. "continue_viewing" overlays)
# This is a common pattern, might need adjustments.
content_gate_button = page.locator('[data-testid="content-gate"] button').or_(page.get_by_role("button", name=re.compile(r"continue|view", re.IGNORECASE)))
if content_gate_button.first.is_visible(timeout=1000): # Quick check
try:
logger.debug("Content gate detected, attempting to click.")
content_gate_button.first.click(timeout=2000)
page.wait_for_timeout(500) # Brief pause for overlay to disappear
except Exception as e:
logger.warning(f"Could not click content gate button for comment {comment['comment_id']}: {e}")
if lang: # Assuming 'lang' is post_lang from settings
logger.debug(f"Translating comment {comment['comment_id']} to '{lang}'...")
try:
comment_tl = translators.translate_text(
comment["comment_body"],
translator="google",
to_language=lang,
)
# This JS evaluation is highly dependent on Reddit's DOM structure and likely to break.
# A more robust method would be to screenshot first, then overlay translated text if needed via PIL.
# For now, retaining original logic but with logging.
js_to_run = '([tl_content, tl_id]) => { try { document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content; } catch(e) { console.error("Failed to set comment text via JS:", e); } }'
page.evaluate(js_to_run, [comment_tl, comment["comment_id"]])
logger.debug(f"Comment {comment['comment_id']} translation applied via JS (if element found).")
except Exception as e:
logger.warning(f"Failed to translate comment {comment['comment_id']} or apply it: {e}")
comment_ss_path = screenshot_dir / f"comment_{idx}.png"
comment_locator_id = f"#t1_{comment['comment_id']}"
logger.debug(f"Attempting screenshot for comment {comment['comment_id']} (locator: {comment_locator_id}) to {comment_ss_path}")
try:
comment_element = page.locator(comment_locator_id)
if not comment_element.is_visible(timeout=10000):
logger.warning(f"Comment element {comment_locator_id} not visible for screenshot. Skipping.")
continue
comment_element.scroll_into_view_if_needed() # Ensure it's in view
page.wait_for_timeout(200) # Small pause for scrolling to settle
if zoom_level != 1.0:
# As with post content, zoom can make locator.screenshot with clip unreliable.
# Best to avoid zoom or use full page screenshots and crop later if zoom is used.
# For now, attempting direct screenshot of the locator.
logger.debug(f"Applying zoom {zoom_level} for comment screenshot.")
page.evaluate(f"document.body.style.zoom={zoom_level}") # Re-apply zoom if page navigated
comment_element.screenshot(path=str(comment_ss_path))
else:
page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/{reddit_id}/png/comment_{idx}.png"
)
except TimeoutError:
del reddit_object["comments"]
screenshot_num += 1
print("TimeoutError: Skipping screenshot...")
comment_element.screenshot(path=str(comment_ss_path))
logger.info(f"Screenshot for comment {idx} ({comment['comment_id']}) saved.")
except Exception as e: # Playwright TimeoutError, etc.
logger.warning(f"Failed to take screenshot for comment {comment['comment_id']}: {e}. Skipping.")
# Original code modified screenshot_num here, which is complex.
# Simpler to just skip and let it take fewer screenshots if some fail.
continue
# close browser instance when we are done using it
logger.info("Closing Playwright browser.")
browser.close()
except ConnectionRefusedError as e: # Catch the specific login error
logger.critical(f"Halting due to Reddit login failure: {e}")
raise # Re-raise to stop the process
except Exception as e:
logger.error(f"An error occurred during Playwright operations: {e}", exc_info=True)
if 'browser' in locals() and browser.is_connected():
browser.close()
raise RuntimeError(f"Playwright screenshot generation failed: {e}")
print_substep("Screenshots downloaded Successfully.", style="bold green")
logger.info("Screenshots downloaded successfully.")

@ -1,6 +1,8 @@
from typing import Tuple
import logging # Added for logging
from rich.console import Console
# from rich.console import Console # Keep if direct console interaction remains, otherwise remove
# For now, print_table is kept which uses Console.
from TTS.aws_polly import AWSPolly
from TTS.elevenlabs import elevenlabs
@ -10,9 +12,10 @@ from TTS.pyttsx import pyttsx
from TTS.streamlabs_polly import StreamlabsPolly
from TTS.TikTok import TikTok
from utils import settings
from utils.console import print_step, print_table
from utils.console import print_table # Keep print_table for now for interactive choice
console = Console()
# console = Console() # Replaced by logger for general messages
logger = logging.getLogger(__name__)
TTSProviders = {
"GoogleTranslate": GTTS,
@ -34,22 +37,56 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
tuple[int,int]: (total length of the audio, the number of comments audio was generated for)
"""
voice = settings.config["settings"]["tts"]["voice_choice"]
if str(voice).casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
else:
selected_tts_provider_name = settings.config["settings"]["tts"]["voice_choice"]
tts_engine_class = None
if selected_tts_provider_name:
selected_tts_provider_name_lower = str(selected_tts_provider_name).casefold()
for provider_name, provider_class in TTSProviders.items():
if provider_name.casefold() == selected_tts_provider_name_lower:
tts_engine_class = provider_class
logger.info(f"Using configured TTS provider: {provider_name}")
break
if not tts_engine_class:
logger.warning(
f"Configured TTS provider '{selected_tts_provider_name}' not found or not set. Prompting user for selection."
)
# Interactive fallback - uses direct print/input via rich Console from print_table
# This part remains interactive as it's a fallback for misconfiguration.
from rich.console import Console as RichConsole # Local import for this specific interaction
local_console = RichConsole()
while True:
print_step("Please choose one of the following TTS providers: ")
print_table(TTSProviders)
choice = input("\n")
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
local_console.print("[bold yellow]Please choose one of the following TTS providers:[/bold yellow]")
print_table(TTSProviders) # print_table uses rich.Console internally
choice = input("\nEnter your choice: ").strip()
choice_lower = choice.casefold()
for provider_name, provider_class in TTSProviders.items():
if provider_name.casefold() == choice_lower:
tts_engine_class = provider_class
logger.info(f"User selected TTS provider: {provider_name}")
# Optionally, offer to save this choice back to config? (Out of scope for now)
break
if tts_engine_class:
break
print("Unknown Choice")
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj)
return text_to_mp3.run()
local_console.print("[bold red]Unknown TTS provider. Please try again.[/bold red]")
try:
text_to_mp3_engine = TTSEngine(tts_engine_class, reddit_obj)
return text_to_mp3_engine.run()
except Exception as e:
logger.error(f"Failed to initialize or run TTS engine {tts_engine_class.__name__ if tts_engine_class else 'N/A'}: {e}", exc_info=True)
# Depending on desired behavior, either re-raise or return a value indicating failure
# For now, re-raising to ensure the error is propagated.
raise
def get_case_insensitive_key_value(input_dict, key):
def get_case_insensitive_key_value(input_dict, key): # This function seems unused now.
# Retaining it for now in case it's used by other parts of the codebase not yet reviewed for logging.
# If confirmed unused later, it can be removed.
logger.debug(f"Performing case-insensitive key lookup for '{key}'")
return next(
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
None,

Loading…
Cancel
Save