diff --git a/TTS/common.py b/TTS/common.py new file mode 100644 index 0000000..a56444e --- /dev/null +++ b/TTS/common.py @@ -0,0 +1,10 @@ +def audio_length( + path: str, +) -> float | int: + from mutagen.mp3 import MP3 + + try: + audio = MP3(path) + return audio.info.length + except Exception as e: # TODO add logging + return 0 diff --git a/TTS/engine_wrapper.py b/TTS/engine_wrapper.py index a171db7..1bacd86 100644 --- a/TTS/engine_wrapper.py +++ b/TTS/engine_wrapper.py @@ -8,16 +8,16 @@ import re # from mutagen.mp3 import MP3, HeaderNotFoundError import translators as ts from rich.progress import track -from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips +from attr import attrs, attrib + from utils.console import print_step, print_substep from utils.voice import sanitize_text from utils import settings - -DEFUALT_MAX_LENGTH: int = 50 # video length variable +from TTS.common import audio_length +@attrs(auto_attribs=True) class TTSEngine: - """Calls the given TTS engine to reduce code duplication and allow multiple TTS engines. Args: @@ -29,94 +29,72 @@ class TTSEngine: Notes: tts_module must take the arguments text and filepath. """ - - def __init__( - self, - tts_module, - reddit_object: dict, - path: str = "assets/temp/mp3", - max_length: int = DEFUALT_MAX_LENGTH, - ): - self.tts_module = tts_module() - self.reddit_object = reddit_object - self.path = path - self.max_length = max_length - self.length = 0 - - def run(self) -> Tuple[int, int]: + tts_module: object + reddit_object: dict + path: str = 'assets/temp/mp3' + max_length: int = 50 # TODO move to config + __total_length: int = attrib( + default=0, + kw_only=True + ) + + def run( + self + ) -> list: Path(self.path).mkdir(parents=True, exist_ok=True) - # This file needs to be removed in case this post does not use post text, so that it wont appear in the final video + # This file needs to be removed in case this post does not use post text + # so that it won't appear in the final video try: - Path(f"{self.path}/posttext.mp3").unlink() + Path(f'{self.path}/posttext.mp3').unlink() except OSError: pass - print_step("Saving Text to MP3 files...") - - self.call_tts("title", self.reddit_object["thread_title"]) - if ( - self.reddit_object["thread_post"] != "" - and settings.config["settings"]["storymode"] == True - ): - self.call_tts("posttext", self.reddit_object["thread_post"]) - - idx = None - for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."): - # ! Stop creating mp3 files if the length is greater than max length. - if self.length > self.max_length: - break - if not self.tts_module.max_chars: - self.call_tts(f"{idx}", comment["comment_body"]) - else: - self.split_post(comment["comment_body"], idx) - - print_substep("Saved Text to MP3 files successfully.", style="bold green") - return self.length, idx - - def split_post(self, text: str, idx: int): - split_files = [] - split_text = [ - x.group().strip() - for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text) + print_step('Saving Text to MP3 files...') + + self.call_tts('title', self.reddit_object['thread_title']) + + if self.reddit_object['thread_post'] and settings.config['settings']['storymode']: + self.call_tts('posttext', self.reddit_object['thread_post']) + + sync_tasks_primary = [ + self.call_tts(str(idx), comment['comment_body']) + for idx, comment in track(enumerate(self.reddit_object['comments']), description='Saving...') + ] + + print_substep('Saved Text to MP3 files successfully.', style='bold green') + return [ + comments for comments, condition in + zip(self.reddit_object['comments'], sync_tasks_primary) + if condition ] - idy = None - for idy, text_cut in enumerate(split_text): - # print(f"{idx}-{idy}: {text_cut}\n") - self.call_tts(f"{idx}-{idy}.part", text_cut) - split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy}.part.mp3")) - CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile( - f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None + def call_tts( + self, + filename: str, + text: str + ) -> bool: + self.tts_module.run( + text=self.process_text(text), + filepath=f'{self.path}/{filename}.mp3' ) - for i in split_files: - name = i.filename - i.close() - Path(name).unlink() - - # for i in range(0, idy + 1): - # print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3") - - # Path(f"{self.path}/{idx}-{i}.part.mp3").unlink() - - def call_tts(self, filename: str, text: str): - self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3") - # try: - # self.length += MP3(f"{self.path}/{filename}.mp3").info.length - # except (MutagenError, HeaderNotFoundError): - # self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3") - clip = AudioFileClip(f"{self.path}/{filename}.mp3") - self.length += clip.duration - clip.close() - - -def process_text(text: str): - lang = settings.config["reddit"]["thread"]["post_lang"] - new_text = sanitize_text(text) - if lang: - print_substep("Translating Text...") - translated_text = ts.google(text, to_language=lang) - new_text = sanitize_text(translated_text) - return new_text + clip_length = audio_length(f'assets/audio/{filename}.mp3') + + if self.__total_length + clip_length <= self.max_length: + self.max_length += clip_length + return True + return False + + @staticmethod + def process_text( + text: str, + ) -> str: + lang = settings.config['reddit']['thread']['post_lang'] + new_text = sanitize_text(text) + if lang: + print_substep('Translating Text...') + translated_text = ts.google(text, to_language=lang) + new_text = sanitize_text(translated_text) + return new_text diff --git a/main.py b/main.py index 411ed90..82d459b 100755 --- a/main.py +++ b/main.py @@ -1,6 +1,5 @@ #!/usr/bin/env python from asyncio import run -import math from subprocess import Popen from os import name from reddit.subreddit import get_subreddit_threads @@ -41,13 +40,10 @@ async def main( ): cleanup() reddit_object = get_subreddit_threads(POST_ID) - length, number_of_comments = save_text_to_mp3(reddit_object) - length = math.ceil(length) - await RedditScreenshot(reddit_object, number_of_comments).download() + comments_created = save_text_to_mp3(reddit_object) + await RedditScreenshot(reddit_object, comments_created).download() bg_config = get_background_config() - download_background(bg_config) - chop_background_video(bg_config, length) - make_final_video(number_of_comments, length, reddit_object, bg_config) + make_final_video(comments_created, reddit_object, bg_config) async def run_many(times): diff --git a/video_creation/final_video.py b/video_creation/final_video.py index f1e1f96..b7b40d2 100755 --- a/video_creation/final_video.py +++ b/video_creation/final_video.py @@ -3,7 +3,7 @@ import multiprocessing import os import re from os.path import exists -from typing import Dict, Tuple, Any +from typing import Tuple, Any import translators as ts @@ -13,7 +13,6 @@ from moviepy.editor import ( ImageClip, concatenate_videoclips, concatenate_audioclips, - CompositeAudioClip, CompositeVideoClip, ) from moviepy.video.io.ffmpeg_tools import ffmpeg_merge_video_audio, ffmpeg_extract_subclip @@ -23,24 +22,26 @@ from utils.cleanup import cleanup from utils.console import print_step, print_substep from utils.videos import save_data from utils import settings - +from video_creation.background import download_background, chop_background_video console = Console() -W, H = 1080, 1920 +W, H = 1080, 1920 # TODO move to config -def name_normalize(name: str) -> str: +def name_normalize( + name: str +) -> str: name = re.sub(r'[?\\"%*:|<>]', "", name) - name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name) - name = re.sub(r"( [w,W]\s?\/)", r" with", name) - name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name) - name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name) - name = re.sub(r"\/", r"", name) + name = re.sub(r'( [w,W]\s?\/\s?[o,O,0])', r' without', name) + name = re.sub(r'( [w,W]\s?\/)', r' with', name) + name = re.sub(r'(\d+)\s?\/\s?(\d+)', r'\1 of \2', name) + name = re.sub(r'(\w+)\s?\/\s?(\w+)', r'\1 or \2', name) + name = re.sub(r'\/', '', name) - lang = settings.config["reddit"]["thread"]["post_lang"] + lang = settings.config['reddit']['thread']['post_lang'] if lang: - print_substep("Translating filename...") + print_substep('Translating filename...') translated_name = ts.google(name, to_language=lang) return translated_name @@ -49,48 +50,46 @@ def name_normalize(name: str) -> str: def make_final_video( - number_of_clips: int, length: int, reddit_obj: dict, background_config: Tuple[str, str, str, Any] -): - """Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp + indexes_of_clips: list, + reddit_obj: dict, + background_config: Tuple[str, str, str, Any], +) -> None: + """ + Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp + Args: - number_of_clips (int): Index to end at when going through the screenshots' - length (int): Length of the video + indexes_of_clips (list): Indexes with created comments' reddit_obj (dict): The reddit object that contains the posts to read. background_config (Tuple[str, str, str, Any]): The background config to use. """ - print_step("Creating the final video 🎥") + print_step('Creating the final video 🎥') VideoFileClip.reW = lambda clip: clip.resize(width=W) VideoFileClip.reH = lambda clip: clip.resize(width=H) - opacity = settings.config["settings"]["opacity"] - background_clip = ( - VideoFileClip("assets/temp/background.mp4") - .without_audio() - .resize(height=H) - .crop(x1=1166.6, y1=0, x2=2246.6, y2=1920) - ) + opacity = settings.config['settings']['opacity'] + + final_length = 0 # Gather all audio clips - audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)] - audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3")) - audio_concat = concatenate_audioclips(audio_clips) - audio_composite = CompositeAudioClip([audio_concat]) + audio_clips = [AudioFileClip(f'assets/temp/mp3/{i}.mp3') for i in indexes_of_clips] + audio_clips.insert(0, AudioFileClip('assets/temp/mp3/title.mp3')) + audio_composite = concatenate_audioclips(audio_clips) - console.log(f"[bold green] Video Will Be: {length} Seconds Long") + console.log(f'[bold green] Video Will Be: {audio_composite.length} Seconds Long') # add title to video image_clips = [] # Gather all images new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity) image_clips.insert( 0, - ImageClip("assets/temp/png/title.png") + ImageClip('assets/temp/png/title.png') .set_duration(audio_clips[0].duration) .resize(width=W - 100) .set_opacity(new_opacity), ) - for i in range(0, number_of_clips): + for i in indexes_of_clips: image_clips.append( - ImageClip(f"assets/temp/png/comment_{i}.png") + ImageClip(f'assets/temp/png/comment_{i}.png') .set_duration(audio_clips[i + 1].duration) .resize(width=W - 100) .set_opacity(new_opacity) @@ -109,63 +108,73 @@ def make_final_video( img_clip_pos = background_config[3] image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos) image_concat.audio = audio_composite + + download_background(background_config) + chop_background_video(background_config, final_length) + background_clip = ( + VideoFileClip("assets/temp/background.mp4") + .without_audio() + .resize(height=H) + .crop(x1=1166.6, y1=0, x2=2246.6, y2=1920) + ) + final = CompositeVideoClip([background_clip, image_concat]) - title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) - idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) + title = re.sub(r'[^\w\s-]', '', reddit_obj['thread_title']) + idx = re.sub(r'[^\w\s-]', '', reddit_obj['thread_id']) - filename = f"{name_normalize(title)}.mp4" - subreddit = settings.config["reddit"]["thread"]["subreddit"] + filename = f'{name_normalize(title)}.mp4' + subreddit = settings.config['reddit']['thread']['subreddit'] save_data(subreddit, filename, title, idx, background_config[2]) - if not exists(f"./results/{subreddit}"): - print_substep("The results folder didn't exist so I made it") - os.makedirs(f"./results/{subreddit}") + if not exists(f'./results/{subreddit}'): + print_substep('The results folder didn\'t exist so I made it') + os.makedirs(f'./results/{subreddit}') final.write_videofile( - "assets/temp/temp.mp4", + 'assets/temp/temp.mp4', fps=30, - audio_codec="aac", - audio_bitrate="192k", + audio_codec='aac', + audio_bitrate='192k', verbose=False, threads=multiprocessing.cpu_count(), ) - if settings.config["settings"]["background_audio"]: - print("[bold green] Merging background audio with video") - if not exists(f"assets/backgrounds/background.mp3"): + if settings.config['settings']['background_audio']: + print('[bold green] Merging background audio with video') + if not exists('assets/backgrounds/background.mp3'): print_substep( - "Cannot find assets/backgrounds/background.mp3 audio file didn't so skipping." + 'Cannot find assets/backgrounds/background.mp3 audio file didn\'t so skipping.' ) ffmpeg_extract_subclip( - "assets/temp/temp.mp4", + 'assets/temp/temp.mp4', 0, final.duration, - targetname=f"results/{subreddit}/{filename}", + targetname=f'results/{subreddit}/{filename}', ) else: ffmpeg_merge_video_audio( - "assets/temp/temp.mp4", - "assets/backgrounds/background.mp3", - "assets/temp/temp_audio.mp4", + 'assets/temp/temp.mp4', + 'assets/backgrounds/background.mp3', + 'assets/temp/temp_audio.mp4', ) ffmpeg_extract_subclip( # check if this gets run - "assets/temp/temp_audio.mp4", + 'assets/temp/temp_audio.mp4', 0, final.duration, targetname=f"results/{subreddit}/{filename}", ) else: - print("debug duck") + print('debug duck') ffmpeg_extract_subclip( - "assets/temp/temp.mp4", + 'assets/temp/temp.mp4', 0, final.duration, - targetname=f"results/{subreddit}/{filename}", + targetname=f'results/{subreddit}/{filename}', ) - print_step("Removing temporary files 🗑") + print_step('Removing temporary files 🗑') cleanups = cleanup() - print_substep(f"Removed {cleanups} temporary files 🗑") - print_substep("See result in the results folder!") + print_substep(f'Removed {cleanups} temporary files 🗑') + print_substep('See result in the results folder!') print_step( f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}' diff --git a/video_creation/screenshot_downloader.py b/video_creation/screenshot_downloader.py index 12eba72..f4d2bca 100644 --- a/video_creation/screenshot_downloader.py +++ b/video_creation/screenshot_downloader.py @@ -209,10 +209,10 @@ class RedditScreenshot(Browser, Wait): """ Args: reddit_object (Dict): Reddit object received from reddit/subreddit.py - screenshot_num (int): Number of screenshots to download + screenshot_idx (int): List with indexes of voiced comments """ reddit_object: dict - screenshot_num: int = attrib() + screenshot_idx: list = attrib() @screenshot_num.validator def validate_screenshot_num(self, attribute, value): @@ -348,9 +348,8 @@ class RedditScreenshot(Browser, Wait): ) async_tasks_primary = [ - self.__collect_comment(comment, idx) for idx, comment in - enumerate(self.reddit_object['comments']) - if idx < self.screenshot_num + self.__collect_comment(self.reddit_object['comments'][idx], idx) for idx in + self.screenshot_idx ] for task in track( diff --git a/video_creation/voices.py b/video_creation/voices.py index ffc0898..b4eaf1f 100644 --- a/video_creation/voices.py +++ b/video_creation/voices.py @@ -1,55 +1,50 @@ -#!/usr/bin/env python - -from typing import Dict, Tuple - -from rich.console import Console - from TTS.engine_wrapper import TTSEngine from TTS.GTTS import GTTS from TTS.streamlabs_polly import StreamlabsPolly from TTS.aws_polly import AWSPolly from TTS.TikTok import TikTok + from utils import settings from utils.console import print_table, print_step -console = Console() - TTSProviders = { - "GoogleTranslate": GTTS, - "AWSPolly": AWSPolly, - "StreamlabsPolly": StreamlabsPolly, - "TikTok": TikTok, + 'GoogleTranslate': GTTS, + 'AWSPolly': AWSPolly, + 'StreamlabsPolly': StreamlabsPolly, + 'TikTok': TikTok, } -def save_text_to_mp3(reddit_obj) -> Tuple[int, int]: +async def save_text_to_mp3( + reddit_obj: dict, +) -> list: """Saves text to MP3 files. Args: reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py Returns: - tuple[int,int]: (total length of the audio, the number of comments audio was generated for) + The number of comments audio was generated for """ - voice = settings.config["settings"]["tts"]["choice"] - if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): - text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj) - else: + voice = settings.config['settings']['tts']['choice'] + if voice.casefold() not in map(lambda _: _.casefold(), TTSProviders): while True: - print_step("Please choose one of the following TTS providers: ") + print_step('Please choose one of the following TTS providers: ') print_table(TTSProviders) - choice = input("\n") - if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): + voice = input('\n') + if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): break - print("Unknown Choice") - text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj) - - return text_to_mp3.run() + print('Unknown Choice') + engine_instance = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj) + return await engine_instance.run() -def get_case_insensitive_key_value(input_dict, key): +def get_case_insensitive_key_value( + input_dict, + key +) -> object: return next( (value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()), None,