diff --git a/TTS/engine_wrapper.py b/TTS/engine_wrapper.py index 90fe45f..577b77b 100644 --- a/TTS/engine_wrapper.py +++ b/TTS/engine_wrapper.py @@ -186,6 +186,7 @@ def process_text(text: str, clean: bool = True): new_text = sanitize_text(text) if clean else text if lang: print_substep("Translating Text...") - translated_text = translators.google(text, to_language=lang) + #translated_text = translators.google(text, ) + translated_text = translators.translate_text(text, translator='google', to_language='en') new_text = sanitize_text(translated_text) return new_text diff --git a/video_creation/final_video.py b/video_creation/final_video.py index 52ebe04..577b77b 100644 --- a/video_creation/final_video.py +++ b/video_creation/final_video.py @@ -1,431 +1,192 @@ -import multiprocessing import os import re -from os.path import exists # Needs to be imported specifically -from typing import Final -from typing import Tuple, Any, Dict +from pathlib import Path +from typing import Tuple -import ffmpeg +import numpy as np import translators -from PIL import Image -from rich.console import Console +from moviepy.audio.AudioClip import AudioClip +from moviepy.audio.fx.volumex import volumex +from moviepy.editor import AudioFileClip from rich.progress import track -from utils.cleanup import cleanup -from utils.console import print_step, print_substep -from utils.thumbnail import create_thumbnail -from utils.videos import save_data from utils import settings - -import tempfile -import threading -import time - -console = Console() - - -class ProgressFfmpeg(threading.Thread): - def __init__(self, vid_duration_seconds, progress_update_callback): - threading.Thread.__init__(self, name="ProgressFfmpeg") - self.stop_event = threading.Event() - self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) - self.vid_duration_seconds = vid_duration_seconds - self.progress_update_callback = progress_update_callback - - def run(self): - while not self.stop_event.is_set(): - latest_progress = self.get_latest_ms_progress() - if latest_progress is not None: - completed_percent = latest_progress / self.vid_duration_seconds - self.progress_update_callback(completed_percent) - time.sleep(1) - - def get_latest_ms_progress(self): - lines = self.output_file.readlines() - - if lines: - for line in lines: - if "out_time_ms" in line: - out_time_ms = line.split("=")[1] - return int(out_time_ms) / 1000000.0 - return None - - def stop(self): - self.stop_event.set() - - def __enter__(self): - self.start() - return self - - def __exit__(self, *args, **kwargs): - self.stop() +from utils.console import print_step, print_substep +from utils.voice import sanitize_text -def name_normalize(name: str) -> str: - name = re.sub(r'[?\\"%*:|<>]', "", name) - name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name) - name = re.sub(r"( [w,W]\s?\/)", r" with", name) - name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name) - name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name) - name = re.sub(r"\/", r"", name) +DEFAULT_MAX_LENGTH: int = 50 # Video length variable, edit this on your own risk. It should work, but it's not supported - lang = settings.config["reddit"]["thread"]["post_lang"] - if lang: - print_substep("Translating filename...") - translated_name = translators.google(name, to_language=lang) - return translated_name - else: - return name -def prepare_background(reddit_id: str, W: int, H: int) -> str: - output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4" - output = ( - ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4") - .filter("crop", f"ih*({W}/{H})", "ih") - .output( - output_path, - an=None, - **{ - "c:v": "h264", - "b:v": "20M", - "b:a": "192k", - "threads": multiprocessing.cpu_count(), - }, - ) - .overwrite_output() - ) - try: - output.run(quiet=True) - except ffmpeg.Error as e: - print(e.stderr.decode("utf8")) - exit(1) - return output_path +class TTSEngine: + """Calls the given TTS engine to reduce code duplication and allow multiple TTS engines. -def merge_background_audio(audio: ffmpeg, reddit_id: str): - """Gather an audio and merge with assets/backgrounds/background.mp3 Args: - audio (ffmpeg): The TTS final audio but without background. - reddit_id (str): The ID of subreddit - """ - background_audio_volume = settings.config["settings"]["background"][ - "background_audio_volume" - ] - if background_audio_volume == 0: - return audio # Return the original audio - else: - # sets volume to config - bg_audio = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp3").filter( - "volume", - background_audio_volume, - ) - # Merges audio and background_audio - merged_audio = ffmpeg.filter([audio, bg_audio], "amix", duration="longest") - return merged_audio # Return merged audio + tts_module : The TTS module. Your module should handle the TTS itself and saving to the given path under the run method. + reddit_object : The reddit object that contains the posts to read. + path (Optional) : The unix style path to save the mp3 files to. This must not have leading or trailing slashes. + max_length (Optional) : The maximum length of the mp3 files in total. - -def make_final_video( - number_of_clips: int, - length: int, - reddit_obj: dict, - background_config: Dict[str, Tuple], -): - """Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp - Args: - number_of_clips (int): Index to end at when going through the screenshots' - length (int): Length of the video - reddit_obj (dict): The reddit object that contains the posts to read. - background_config (Tuple[str, str, str, Any]): The background config to use. + Notes: + tts_module must take the arguments text and filepath. """ - # settings values - W: Final[int] = int(settings.config["settings"]["resolution_w"]) - H: Final[int] = int(settings.config["settings"]["resolution_h"]) - - opacity = settings.config["settings"]["opacity"] - - reddit_id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) - - allowOnlyTTSFolder: bool = ( - settings.config["settings"]["background"]["enable_extra_audio"] - and settings.config["settings"]["background"]["background_audio_volume"] != 0 - ) - - print_step("Creating the final video 🎥") - - background_clip = ffmpeg.input(prepare_background(reddit_id, W=W, H=H)) - - # Gather all audio clips - audio_clips = list() - if number_of_clips == 0 and settings.config["settings"]["storymode"] == "false": - print( - "No audio clips to gather. Please use a different TTS or post." - ) # This is to fix the TypeError: unsupported operand type(s) for +: 'int' and 'NoneType' - exit() - if settings.config["settings"]["storymode"]: - if settings.config["settings"]["storymodemethod"] == 0: - audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")] - audio_clips.insert( - 1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3") - ) - elif settings.config["settings"]["storymodemethod"] == 1: - audio_clips = [ - ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3") - for i in track( - range(number_of_clips + 1), "Collecting the audio files..." - ) - ] - audio_clips.insert( - 0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3") - ) - - else: - audio_clips = [ - ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") - for i in range(number_of_clips) - ] - audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")) - audio_clips_durations = [ - float( - ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"][ - "duration" - ] + def __init__( + self, + tts_module, + reddit_object: dict, + path: str = "assets/temp/", + max_length: int = DEFAULT_MAX_LENGTH, + last_clip_length: int = 0, + ): + self.tts_module = tts_module() + self.reddit_object = reddit_object + + self.redditid = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"]) + self.path = path + self.redditid + "/mp3" + self.max_length = max_length + self.length = 0 + self.last_clip_length = last_clip_length + + def add_periods( + self, + ): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences + for comment in self.reddit_object["comments"]: + # remove links + regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*" + comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"]) + comment["comment_body"] = comment["comment_body"].replace("\n", ". ") + comment["comment_body"] = re.sub(r"\bAI\b", "A.I", comment["comment_body"]) + comment["comment_body"] = re.sub( + r"\bAGI\b", "A.G.I", comment["comment_body"] ) - for i in range(number_of_clips) - ] - audio_clips_durations.insert( - 0, - float( - ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][ - "duration" - ] - ), - ) - audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0) - ffmpeg.output( - audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"} - ).overwrite_output().run(quiet=True) - - console.log(f"[bold green] Video Will Be: {length} Seconds Long") - - screenshot_width = int((W * 45) // 100) - audio = ffmpeg.input(f"assets/temp/{reddit_id}/audio.mp3") - final_audio = merge_background_audio(audio, reddit_id) - - image_clips = list() - - image_clips.insert( - 0, - ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter( - "scale", screenshot_width, -1 - ), - ) + if comment["comment_body"][-1] != ".": + comment["comment_body"] += "." + comment["comment_body"] = comment["comment_body"].replace(". . .", ".") + comment["comment_body"] = comment["comment_body"].replace(".. . ", ".") + comment["comment_body"] = comment["comment_body"].replace(". . ", ".") + comment["comment_body"] = re.sub(r'\."\.', '".', comment["comment_body"]) + + def run(self) -> Tuple[int, int]: + Path(self.path).mkdir(parents=True, exist_ok=True) + print_step("Saving Text to MP3 files...") + + self.add_periods() + self.call_tts("title", process_text(self.reddit_object["thread_title"])) + # processed_text = ##self.reddit_object["thread_post"] != "" + idx = 0 + + if settings.config["settings"]["storymode"]: + if settings.config["settings"]["storymodemethod"] == 0: + if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars: + self.split_post(self.reddit_object["thread_post"], "postaudio") + else: + self.call_tts( + "postaudio", process_text(self.reddit_object["thread_post"]) + ) + elif settings.config["settings"]["storymodemethod"] == 1: + for idx, text in track(enumerate(self.reddit_object["thread_post"])): + self.call_tts(f"postaudio-{idx}", process_text(text)) - current_time = 0 - if settings.config["settings"]["storymode"]: - audio_clips_durations = [ - float( - ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")[ - "format" - ]["duration"] + else: + for idx, comment in track( + enumerate(self.reddit_object["comments"]), "Saving..." + ): + # ! Stop creating mp3 files if the length is greater than max length. + if self.length > self.max_length and idx > 1: + self.length -= self.last_clip_length + idx -= 1 + break + if ( + len(comment["comment_body"]) > self.tts_module.max_chars + ): # Split the comment if it is too long + self.split_post(comment["comment_body"], idx) # Split the comment + else: # If the comment is not too long, just call the tts engine + self.call_tts(f"{idx}", process_text(comment["comment_body"])) + + print_substep("Saved Text to MP3 files successfully.", style="bold green") + return self.length, idx + + def split_post(self, text: str, idx): + split_files = [] + split_text = [ + x.group().strip() + for x in re.finditer( + r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text ) - for i in range(number_of_clips) ] - audio_clips_durations.insert( - 0, - float( - ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][ - "duration" - ] - ), - ) - if settings.config["settings"]["storymodemethod"] == 0: - image_clips.insert( - 1, - ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter( - "scale", screenshot_width, -1 - ), - ) - background_clip = background_clip.overlay( - image_clips[1], - enable=f"between(t,{current_time},{current_time + audio_clips_durations[1]})", - x="(main_w-overlay_w)/2", - y="(main_h-overlay_h)/2", - ) - current_time += audio_clips_durations[1] - elif settings.config["settings"]["storymodemethod"] == 1: - for i in track( - range(0, number_of_clips + 1), "Collecting the image files..." - ): - image_clips.append( - ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter( - "scale", screenshot_width, -1 - ) - ) - background_clip = background_clip.overlay( - image_clips[i], - enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})", - x="(main_w-overlay_w)/2", - y="(main_h-overlay_h)/2", + self.create_silence_mp3() + + idy = None + for idy, text_cut in enumerate(split_text): + newtext = process_text(text_cut) + # print(f"{idx}-{idy}: {newtext}\n") + + if not newtext or newtext.isspace(): + print("newtext was blank because sanitized split text resulted in none") + continue + else: + self.call_tts(f"{idx}-{idy}.part", newtext) + with open(f"{self.path}/list.txt", "w") as f: + for idz in range(0, len(split_text)): + f.write("file " + f"'{idx}-{idz}.part.mp3'" + "\n") + split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3")) + f.write("file " + f"'silence.mp3'" + "\n") + + os.system( + "ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 " + + "-i " + + f"{self.path}/list.txt " + + "-c copy " + + f"{self.path}/{idx}.mp3" ) - current_time += audio_clips_durations[i] - else: - for i in range(0, number_of_clips + 1): - image_clips.append( - ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")[ - "v" - ].filter("scale", screenshot_width, -1) - ) - image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity) - background_clip = background_clip.overlay( - image_overlay, - enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})", - x="(main_w-overlay_w)/2", - y="(main_h-overlay_h)/2", - ) - current_time += audio_clips_durations[i] - - title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) - idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) - title_thumb = reddit_obj["thread_title"] - - filename = f"{name_normalize(title)[:251]}" - subreddit = settings.config["reddit"]["thread"]["subreddit"] - - if not exists(f"./results/{subreddit}"): - print_substep( - "The 'results' folder could not be found so it was automatically created." + try: + for i in range(0, len(split_files)): + os.unlink(split_files[i]) + except FileNotFoundError as e: + print("File not found: " + e.filename) + except OSError: + print("OSError") + + def call_tts(self, filename: str, text: str): + self.tts_module.run( + text, + filepath=f"{self.path}/{filename}.mp3", + random_voice=settings.config["settings"]["tts"]["random_voice"], ) - os.makedirs(f"./results/{subreddit}") - - if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder: - print_substep( - "The 'OnlyTTS' folder could not be found so it was automatically created." + # try: + # self.length += MP3(f"{self.path}/{filename}.mp3").info.length + # except (MutagenError, HeaderNotFoundError): + # self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3") + try: + clip = AudioFileClip(f"{self.path}/{filename}.mp3") + self.last_clip_length = clip.duration + self.length += clip.duration + clip.close() + except: + self.length = 0 + + def create_silence_mp3(self): + silence_duration = settings.config["settings"]["tts"]["silence_duration"] + silence = AudioClip( + make_frame=lambda t: np.sin(440 * 2 * np.pi * t), + duration=silence_duration, + fps=44100, ) - os.makedirs(f"./results/{subreddit}/OnlyTTS") - - # create a thumbnail for the video - settingsbackground = settings.config["settings"]["background"] - - if settingsbackground["background_thumbnail"]: - if not exists(f"./results/{subreddit}/thumbnails"): - print_substep( - "The 'results/thumbnails' folder could not be found so it was automatically created." - ) - os.makedirs(f"./results/{subreddit}/thumbnails") - # get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail - first_image = next( - ( - file - for file in os.listdir("assets/backgrounds") - if file.endswith(".png") - ), - None, + silence = volumex(silence, 0) + silence.write_audiofile( + f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None ) - if first_image is None: - print_substep("No png files found in assets/backgrounds", "red") - - else: - font_family = settingsbackground["background_thumbnail_font_family"] - font_size = settingsbackground["background_thumbnail_font_size"] - font_color = settingsbackground["background_thumbnail_font_color"] - thumbnail = Image.open(f"assets/backgrounds/{first_image}") - width, height = thumbnail.size - thumbnailSave = create_thumbnail( - thumbnail, - font_family, - font_size, - font_color, - width, - height, - title_thumb, - ) - thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png") - print_substep( - f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png" - ) - - text = f"Background by {background_config['video'][2]}" - background_clip = ffmpeg.drawtext( - background_clip, - text=text, - x=f"(w-text_w)", - y=f"(h-text_h)", - fontsize=5, - fontcolor="White", - fontfile=os.path.join("fonts", "Roboto-Regular.ttf"), - ) - background_clip = background_clip.filter("scale", W, H) - print_step("Rendering the video 🎥") - from tqdm import tqdm - - pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %") - - def on_update_example(progress) -> None: - status = round(progress * 100, 2) - old_percentage = pbar.n - pbar.update(status - old_percentage) - defaultPath = f"results/{subreddit}" - with ProgressFfmpeg(length, on_update_example) as progress: - path = defaultPath + f"/{filename}" - path = ( - path[:251] + ".mp4" - ) # Prevent a error by limiting the path length, do not change this. - ffmpeg.output( - background_clip, - final_audio, - path, - f="mp4", - **{ - "c:v": "h264", - "b:v": "20M", - "b:a": "192k", - "threads": multiprocessing.cpu_count(), - }, - ).overwrite_output().global_args("-progress", progress.output_file.name).run( - quiet=True, - overwrite_output=True, - capture_stdout=False, - capture_stderr=False, - ) - old_percentage = pbar.n - pbar.update(100 - old_percentage) - if allowOnlyTTSFolder: - path = defaultPath + f"/OnlyTTS/{filename}" - path = ( - path[:251] + ".mp4" - ) # Prevent a error by limiting the path length, do not change this. - print_step("Rendering the Only TTS Video 🎥") - with ProgressFfmpeg(length, on_update_example) as progress: - try: - ffmpeg.output( - background_clip, - audio, - path, - f="mp4", - **{ - "c:v": "h264", - "b:v": "20M", - "b:a": "192k", - "threads": multiprocessing.cpu_count(), - }, - ).overwrite_output().global_args("-progress", progress.output_file.name).run( - quiet=True, - overwrite_output=True, - capture_stdout=False, - capture_stderr=False, - ) - except ffmpeg.Error as e: - print(e.stderr.decode("utf8")) - exit(1) - old_percentage = pbar.n - pbar.update(100 - old_percentage) - pbar.close() - save_data(subreddit, filename + ".mp4", title, idx, background_config["video"][2]) - print_step("Removing temporary files 🗑") - cleanups = cleanup(reddit_id) - print_substep(f"Removed {cleanups} temporary files 🗑") - print_step("Done! 🎉 The video is in the results folder 📁") +def process_text(text: str, clean: bool = True): + lang = settings.config["reddit"]["thread"]["post_lang"] + new_text = sanitize_text(text) if clean else text + if lang: + print_substep("Translating Text...") + #translated_text = translators.google(text, ) + translated_text = translators.translate_text(text, translator='google', to_language='en') + new_text = sanitize_text(translated_text) + return new_text diff --git a/video_creation/screenshot_downloader.py b/video_creation/screenshot_downloader.py index f7510f6..57a1231 100644 --- a/video_creation/screenshot_downloader.py +++ b/video_creation/screenshot_downloader.py @@ -168,10 +168,11 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int): if lang: print_substep("Translating post...") - texts_in_tl = translators.google( - reddit_object["thread_title"], - to_language=lang, - ) + #texts_in_tl = translators.google( + # reddit_object["thread_title"], + # , + #) + texts_in_tl = translators.translate_text(reddit_object["thread_title"], translator='google', to_language='en') page.evaluate( "tl_content => document.querySelector('[data-adclicklocation=\"title\"] > div > div > h1').textContent = tl_content", @@ -240,10 +241,11 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int): # translate code if settings.config["reddit"]["thread"]["post_lang"]: - comment_tl = translators.google( - comment["comment_body"], - to_language=settings.config["reddit"]["thread"]["post_lang"], - ) + #comment_tl = translators.google( + # comment["comment_body"], + # to_language=settings.config["reddit"]["thread"]["post_lang"], + #) + comment_tl = translators.translate_text(comment["comment_body"], translator='google', to_language='en') page.evaluate( '([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content', [comment_tl, comment["comment_id"]],