You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
RedditVideoMakerBot/manual/tts_processor.py

285 lines
9.5 KiB

"""
TTS Processor for the manual pipeline.
Takes a post_object (built by scanner.py), generates MP3 audio files
for each screenshot's text using the existing TTS engines, and updates
the post_object with audio paths and durations.
Reuses TTS engines from TTS/ module — no code duplication.
"""
import re
import time
from pathlib import Path
from typing import Tuple
from moviepy import AudioFileClip
from utils import settings
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
class ManualTTSProcessor:
"""Processes text-to-speech for manual pipeline posts."""
def __init__(self, post_object: dict, max_length: int = 120):
"""
Args:
post_object: Post data from scanner.py
max_length: Maximum total audio length in seconds (default: 120s = 2 min)
"""
self.post = post_object
self.post_id = post_object["post_id"]
self.max_length = max_length
self.mp3_dir = Path(f"assets/temp/{self.post_id}/mp3")
self.tts_module = None
def process(self) -> dict:
"""Process audio for all screenshots.
For each screenshot:
- If .mp3 already provided (audio_path set by scanner) → skip TTS, just measure duration
- If only .txt provided → run TTS to generate .mp3
- If neither → skip
Returns:
Updated post_object with audio_path and audio_duration filled in
"""
self.mp3_dir.mkdir(parents=True, exist_ok=True)
print_step("🔊 Processing audio files...")
total_duration = 0
processed_count = 0
tts_needed = False
for screenshot in self.post["screenshots"]:
idx = screenshot["index"]
# Case 1: .mp3 already provided — just measure duration
if screenshot.get("audio_path"):
try:
clip = AudioFileClip(screenshot["audio_path"])
duration = clip.duration
clip.close()
except Exception as e:
print_substep(f" ✗ Failed to read audio #{idx}: {e}", style="red")
duration = 0
screenshot["audio_duration"] = duration
total_duration += duration
processed_count += 1
print_substep(
f" ✓ #{idx}{duration:.1f}s (pre-recorded .mp3)",
style="green",
)
continue
# Case 2: Only .txt provided — need TTS
text = screenshot.get("text", "").strip()
if not text:
print_substep(
f" ⚠ Screenshot #{idx} has no audio or text, skipping.",
style="yellow",
)
continue
# Initialize TTS engine only when needed (lazy)
if not tts_needed:
print_substep(" 📝 Some entries need TTS generation...")
self.tts_module = self._get_tts_engine()
tts_needed = True
mp3_path = str(self.mp3_dir / f"{idx}.mp3")
# Sanitize and process text
clean_text = self._process_text(text)
if not clean_text or clean_text.isspace():
print_substep(
f" ⚠ Screenshot #{idx} text is empty after sanitization, skipping.",
style="yellow",
)
continue
# Handle long text by splitting
if len(clean_text) > self.tts_module.max_chars:
self._generate_split_audio(clean_text, idx, mp3_path)
else:
self._generate_audio(clean_text, mp3_path)
# Measure duration
try:
clip = AudioFileClip(mp3_path)
duration = clip.duration
clip.close()
except Exception as e:
print_substep(f" ✗ Failed to read audio #{idx}: {e}", style="red")
duration = 0
# Update screenshot entry
screenshot["audio_path"] = mp3_path
screenshot["audio_duration"] = duration
total_duration += duration
processed_count += 1
# Sleep 10s between TTS generation
print_substep(" 💤 Sleeping 10s...", style="dim")
time.sleep(10)
print_substep(
f" ✓ #{idx}{duration:.1f}s (TTS generated, {len(clean_text)} chars)",
style="green",
)
# Check max length
if total_duration > self.max_length and processed_count > 1:
print_substep(
f" ⚠ Total duration ({total_duration:.1f}s) exceeds max ({self.max_length}s). "
f"Stopping at {processed_count} clips.",
style="yellow",
)
break
self.post["total_duration"] = total_duration
print_substep(
f"{processed_count} audio clips ready, total: {total_duration:.1f}s",
style="bold green",
)
return self.post
def _get_tts_engine(self):
"""Initialize the TTS engine based on config.
Reuses the TTS engines from video_creation/voices.py
"""
from TTS.GTTS import GTTS
from TTS.OhFreeMe import OhFreeMe
from TTS.TikTok import TikTok
from TTS.aws_polly import AWSPolly
from TTS.elevenlabs import elevenlabs
from TTS.openai_tts import OpenAITTS
from TTS.pyttsx import pyttsx
from TTS.streamlabs_polly import StreamlabsPolly
providers = {
"googletranslate": GTTS,
"ohfreeme": OhFreeMe,
"awspolly": AWSPolly,
"streamlabspolly": StreamlabsPolly,
"tiktok": TikTok,
"pyttsx": pyttsx,
"elevenlabs": elevenlabs,
"openai": OpenAITTS,
}
voice_choice = settings.config["settings"]["tts"]["voice_choice"]
engine_class = providers.get(str(voice_choice).lower())
if engine_class is None:
print_substep(
f"Unknown TTS provider: {voice_choice}. Falling back to GoogleTranslate.",
style="yellow",
)
engine_class = GTTS
print_substep(f"Using TTS engine: {engine_class.__name__}")
return engine_class()
def _generate_audio(self, text: str, filepath: str):
"""Generate a single audio file from text."""
try:
random_voice = settings.config["settings"]["tts"].get("random_voice", False)
if str(settings.config["settings"]["tts"]["voice_choice"]).lower() == "googletranslate":
# GTTS doesn't support random_voice parameter
self.tts_module.run(text, filepath=filepath)
else:
self.tts_module.run(text, filepath=filepath, random_voice=random_voice)
except Exception as e:
print_substep(f" ✗ TTS generation failed: {e}", style="red")
raise
def _generate_split_audio(self, text: str, idx: int, final_path: str):
"""Split long text and concat into one audio file.
For texts longer than the TTS engine's max_chars limit.
"""
import os
# Split text into chunks at sentence boundaries
max_chars = self.tts_module.max_chars
chunks = [
x.group().strip()
for x in re.finditer(
r" *(((.|\\n){0," + str(max_chars) + r"})(\.|.$))", text
)
]
if not chunks:
chunks = [text[:max_chars]]
part_files = []
for part_idx, chunk in enumerate(chunks):
if not chunk or chunk.isspace():
continue
part_path = str(self.mp3_dir / f"{idx}-{part_idx}.part.mp3")
self._generate_audio(chunk, part_path)
part_files.append(part_path)
if not part_files:
return
# Concat using ffmpeg
list_path = str(self.mp3_dir / f"{idx}_list.txt")
with open(list_path, "w") as f:
for part in part_files:
f.write(f"file '{Path(part).name}'\n")
os.system(
f"ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 "
f"-i {list_path} -c copy {final_path}"
)
# Cleanup part files
for part in part_files:
try:
os.unlink(part)
except OSError:
pass
try:
os.unlink(list_path)
except OSError:
pass
def _process_text(self, text: str) -> str:
"""Clean and sanitize text for TTS.
- Removes lines starting with # (comments in txt files)
- Sanitizes using existing sanitize_text()
"""
# Remove comment lines (lines starting with #)
lines = text.split("\n")
lines = [line for line in lines if not line.strip().startswith("#")]
text = " ".join(lines).strip()
# Remove URLs
regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*"
text = re.sub(regex_urls, " ", text)
# Replace newlines with periods for natural speech
text = text.replace("\n", ". ")
# Add period at end if missing
if text and text[-1] not in ".!?":
text += "."
# Clean repeated dots
text = re.sub(r"\.{2,}", ".", text)
text = re.sub(r"\.\s*\.", ".", text)
# Use existing sanitize_text for final cleanup
text = sanitize_text(text)
return text