diff --git a/TTS/GTTS.py b/TTS/GTTS.py index 31e29df..c8d6ae8 100644 --- a/TTS/GTTS.py +++ b/TTS/GTTS.py @@ -1,23 +1,19 @@ #!/usr/bin/env python3 -import random from utils import settings from gtts import gTTS -max_chars = 0 - class GTTS: - def __init__(self): - self.max_chars = 0 - self.voices = [] + max_chars = 0 - def run(self, text, filepath): + @staticmethod + async def run( + text, + filepath + ) -> None: tts = gTTS( text=text, lang=settings.config["reddit"]["thread"]["post_lang"] or "en", slow=False, ) tts.save(filepath) - - def randomvoice(self): - return random.choice(self.voices) diff --git a/TTS/TikTok.py b/TTS/TikTok.py index 743118c..198866b 100644 --- a/TTS/TikTok.py +++ b/TTS/TikTok.py @@ -1,101 +1,101 @@ -import base64 +from aiohttp import ClientSession + from utils import settings -import random -import requests -from requests.adapters import HTTPAdapter, Retry +from random import choice + +from attr import attrs, attrib +from attr.validators import instance_of + +from TTS.common import BaseApiTTS, get_random_voice + +# TTS examples: https://twitter.com/scanlime/status/1512598559769702406 -# from profanity_filter import ProfanityFilter -# pf = ProfanityFilter() -# Code by @JasonLovesDoggo -# https://twitter.com/scanlime/status/1512598559769702406 +voices = dict() -nonhuman = [ # DISNEY VOICES - "en_us_ghostface", # Ghost Face - "en_us_chewbacca", # Chewbacca - "en_us_c3po", # C3PO - "en_us_stitch", # Stitch - "en_us_stormtrooper", # Stormtrooper - "en_us_rocket", # Rocket +voices['nonhuman'] = [ # DISNEY VOICES + 'en_us_ghostface', # Ghost Face + 'en_us_chewbacca', # Chewbacca + 'en_us_c3po', # C3PO + 'en_us_stitch', # Stitch + 'en_us_stormtrooper', # Stormtrooper + 'en_us_rocket', # Rocket # ENGLISH VOICES ] -human = [ - "en_au_001", # English AU - Female - "en_au_002", # English AU - Male - "en_uk_001", # English UK - Male 1 - "en_uk_003", # English UK - Male 2 - "en_us_001", # English US - Female (Int. 1) - "en_us_002", # English US - Female (Int. 2) - "en_us_006", # English US - Male 1 - "en_us_007", # English US - Male 2 - "en_us_009", # English US - Male 3 - "en_us_010", +voices['human'] = [ + 'en_au_001', # English AU - Female + 'en_au_002', # English AU - Male + 'en_uk_001', # English UK - Male 1 + 'en_uk_003', # English UK - Male 2 + 'en_us_001', # English US - Female (Int. 1) + 'en_us_002', # English US - Female (Int. 2) + 'en_us_006', # English US - Male 1 + 'en_us_007', # English US - Male 2 + 'en_us_009', # English US - Male 3 + 'en_us_010', ] -voices = nonhuman + human -noneng = [ - "fr_001", # French - Male 1 - "fr_002", # French - Male 2 - "de_001", # German - Female - "de_002", # German - Male - "es_002", # Spanish - Male +voices['non_eng'] = [ + 'fr_001', # French - Male 1 + 'fr_002', # French - Male 2 + 'de_001', # German - Female + 'de_002', # German - Male + 'es_002', # Spanish - Male # AMERICA VOICES - "es_mx_002", # Spanish MX - Male - "br_001", # Portuguese BR - Female 1 - "br_003", # Portuguese BR - Female 2 - "br_004", # Portuguese BR - Female 3 - "br_005", # Portuguese BR - Male + 'es_mx_002', # Spanish MX - Male + 'br_001', # Portuguese BR - Female 1 + 'br_003', # Portuguese BR - Female 2 + 'br_004', # Portuguese BR - Female 3 + 'br_005', # Portuguese BR - Male # ASIA VOICES - "id_001", # Indonesian - Female - "jp_001", # Japanese - Female 1 - "jp_003", # Japanese - Female 2 - "jp_005", # Japanese - Female 3 - "jp_006", # Japanese - Male - "kr_002", # Korean - Male 1 - "kr_003", # Korean - Female - "kr_004", # Korean - Male 2 + 'id_001', # Indonesian - Female + 'jp_001', # Japanese - Female 1 + 'jp_003', # Japanese - Female 2 + 'jp_005', # Japanese - Female 3 + 'jp_006', # Japanese - Male + 'kr_002', # Korean - Male 1 + 'kr_003', # Korean - Female + 'kr_004', # Korean - Male 2 ] -# good_voices = {'good': ['en_us_002', 'en_us_006'], -# 'ok': ['en_au_002', 'en_uk_001']} # less en_us_stormtrooper more less en_us_rocket en_us_ghostface +# good_voices: 'en_us_002', 'en_us_006' +# ok: 'en_au_002', 'en_uk_001' +# less: en_us_stormtrooper +# more or less: en_us_rocket, en_us_ghostface -class TikTok: # TikTok Text-to-Speech Wrapper - def __init__(self): - self.URI_BASE = ( - "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker=" - ) - self.max_chars = 300 - self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng} +@attrs(auto_attribs=True) +class TikTok(BaseApiTTS): # TikTok Text-to-Speech Wrapper + client: ClientSession = attrib( + validator=instance_of(ClientSession), + ) + random_voice: bool = False + uri_base: str = attrib( + default='https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke', + kw_only=True, + ) + max_chars = 300 + decode_base64 = True - def run(self, text, filepath, random_voice: bool = False): - # if censor: - # req_text = pf.censor(req_text) - # pass - voice = ( - self.randomvoice() - if random_voice - else ( - settings.config["settings"]["tts"]["tiktok_voice"] - or random.choice(self.voices["human"]) - ) + def __attrs_post_init__(self): + self.voice = ( + get_random_voice(voices, 'human') + if self.random_voice + else str(settings.config['settings']['tts']['tiktok_voice']).lower() + if str(settings.config['settings']['tts']['tiktok_voice']).lower() in [ + voice.lower() for dict_title in voices for voice in voices[dict_title]] + else get_random_voice(voices, 'human') ) - try: - r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") - except requests.exceptions.SSLError: - # https://stackoverflow.com/a/47475019/18516611 - session = requests.Session() - retry = Retry(connect=3, backoff_factor=0.5) - adapter = HTTPAdapter(max_retries=retry) - session.mount("http://", adapter) - session.mount("https://", adapter) - r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") - # print(r.text) - vstr = [r.json()["data"]["v_str"]][0] - b64d = base64.b64decode(vstr) - - with open(filepath, "wb") as out: - out.write(b64d) - def randomvoice(self): - return random.choice(self.voices["human"]) + async def make_request( + self, + text: str, + ): + return await self.client.post( + f'{self.uri_base}', + params={ + 'text_speaker': self.voice, + 'req_text': text, + 'speaker_map_type': 0, + } + ) diff --git a/TTS/aws_polly.py b/TTS/aws_polly.py index 13eaea1..db79adc 100644 --- a/TTS/aws_polly.py +++ b/TTS/aws_polly.py @@ -1,45 +1,52 @@ #!/usr/bin/env python3 from boto3 import Session from botocore.exceptions import BotoCoreError, ClientError + import sys from utils import settings -import random +from attr import attrs + +from TTS.common import get_random_voice + voices = [ - "Brian", - "Emma", - "Russell", - "Joey", - "Matthew", - "Joanna", - "Kimberly", - "Amy", - "Geraint", - "Nicole", - "Justin", - "Ivy", - "Kendra", - "Salli", - "Raveena", + 'Brian', + 'Emma', + 'Russell', + 'Joey', + 'Matthew', + 'Joanna', + 'Kimberly', + 'Amy', + 'Geraint', + 'Nicole', + 'Justin', + 'Ivy', + 'Kendra', + 'Salli', + 'Raveena', ] +@attrs(auto_attribs=True) class AWSPolly: - def __init__(self): - self.max_chars = 0 - self.voices = voices + random_voice: bool = False + max_chars: int = 0 - def run(self, text, filepath, random_voice: bool = False): - session = Session(profile_name="polly") + async def run( + self, + text, + filepath, + ): + session = Session(profile_name='polly') polly = session.client("polly") - if random_voice: - voice = self.randomvoice() - else: - if not settings.config["settings"]["tts"]["aws_polly_voice"]: - return ValueError( - f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}" - ) - voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize() + voice = ( + get_random_voice(voices) + if self.random_voice + else str(settings.config['settings']['tts']['aws_polly_voice']).capitalize() + if str(settings.config['settings']['tts']['aws_polly_voice']).lower() in [voice.lower() for voice in voices] + else get_random_voice(voices) + ) try: # Request speech synthesis response = polly.synthesize_speech( @@ -51,7 +58,7 @@ class AWSPolly: sys.exit(-1) # Access the audio stream from the response - if "AudioStream" in response: + if 'AudioStream' in response: file = open(filepath, "wb") file.write(response["AudioStream"].read()) file.close() @@ -59,8 +66,5 @@ class AWSPolly: else: # The response didn't contain audio data, exit gracefully - print("Could not stream audio") + print('Could not stream audio') sys.exit(-1) - - def randomvoice(self): - return random.choice(self.voices) diff --git a/TTS/common.py b/TTS/common.py new file mode 100644 index 0000000..a4a532d --- /dev/null +++ b/TTS/common.py @@ -0,0 +1,70 @@ +from aiofiles import open + +import base64 +from random import choice +from typing import Union, Optional + + +class BaseApiTTS: + max_chars: int + decode_base64: bool = False + + @staticmethod + def text_len_sanitize( + text: str, + max_length: int, + ) -> list: + # Split by comma or dot (else you can lose intonations), if there is non, split by groups of 299 chars + if '.' in text and all([split_text.__len__() < max_length for split_text in text.split('.')]): + return text.split('.') + + if ',' in text and all([split_text.__len__() < max_length for split_text in text.split(',')]): + return text.split(',') + + return [text[i:i + max_length] for i in range(0, len(text), max_length)] + + async def write_file( + self, + output_text: str, + filename: str, + ) -> None: + decoded_text = base64.b64decode(output_text) if self.decode_base64 else output_text + + async with open(filename, 'wb') as out: + await out.write(decoded_text) + + async def run( + self, + req_text: str, + filename: str, + ) -> None: + output_text = '' + if len(req_text) > self.max_chars: + for part in self.text_len_sanitize(req_text, self.max_chars): + if part: + output_text += await self.make_request(part) + else: + output_text = await self.make_request(req_text) + await self.write_file(output_text, filename) + + +def get_random_voice( + voices: Union[list, dict], + key: Optional[str] = None, +) -> str: + if isinstance(voices, list): + return choice(voices) + else: + return choice(voices[key]) + + +def audio_length( + path: str, +) -> float | int: + from mutagen.mp3 import MP3 + + try: + audio = MP3(path) + return audio.info.length + except Exception as e: # TODO add logging + return 0 diff --git a/TTS/engine_wrapper.py b/TTS/engine_wrapper.py index a171db7..0b97809 100644 --- a/TTS/engine_wrapper.py +++ b/TTS/engine_wrapper.py @@ -1,23 +1,19 @@ #!/usr/bin/env python3 -from pathlib import Path -from typing import Tuple -import re +from asyncio import as_completed -# import sox -# from mutagen import MutagenError -# from mutagen.mp3 import MP3, HeaderNotFoundError +from pathlib import Path import translators as ts from rich.progress import track -from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips +from attr import attrs, attrib + from utils.console import print_step, print_substep from utils.voice import sanitize_text from utils import settings - -DEFUALT_MAX_LENGTH: int = 50 # video length variable +from TTS.common import audio_length +@attrs(auto_attribs=True) class TTSEngine: - """Calls the given TTS engine to reduce code duplication and allow multiple TTS engines. Args: @@ -29,94 +25,81 @@ class TTSEngine: Notes: tts_module must take the arguments text and filepath. """ - - def __init__( - self, - tts_module, - reddit_object: dict, - path: str = "assets/temp/mp3", - max_length: int = DEFUALT_MAX_LENGTH, - ): - self.tts_module = tts_module() - self.reddit_object = reddit_object - self.path = path - self.max_length = max_length - self.length = 0 - - def run(self) -> Tuple[int, int]: + tts_module: object + reddit_object: dict + path: str = 'assets/temp/mp3' + max_length: int = 50 # TODO move to config + __total_length: int = attrib( + default=0, + kw_only=True + ) + + async def run( + self + ) -> list: Path(self.path).mkdir(parents=True, exist_ok=True) - # This file needs to be removed in case this post does not use post text, so that it wont appear in the final video + # This file needs to be removed in case this post does not use post text + # so that it won't appear in the final video try: - Path(f"{self.path}/posttext.mp3").unlink() + Path(f'{self.path}/posttext.mp3').unlink() except OSError: pass - print_step("Saving Text to MP3 files...") + print_step('Saving Text to MP3 files...') - self.call_tts("title", self.reddit_object["thread_title"]) - if ( - self.reddit_object["thread_post"] != "" - and settings.config["settings"]["storymode"] == True + await self.call_tts('title', self.reddit_object['thread_title']) + async_tasks_offset = 1 + + if self.reddit_object['thread_post'] and settings.config['settings']['storymode']: + await self.call_tts('posttext', self.reddit_object['thread_post']) + async_tasks_offset += 1 + + async_tasks_primary = [ + self.call_tts(str(idx), comment['comment_body']) + for idx, comment in enumerate(self.reddit_object['comments']) + ] + + for task in track( + as_completed(async_tasks_primary), + description='Saving...', + total=async_tasks_primary.__len__() ): - self.call_tts("posttext", self.reddit_object["thread_post"]) - - idx = None - for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."): - # ! Stop creating mp3 files if the length is greater than max length. - if self.length > self.max_length: - break - if not self.tts_module.max_chars: - self.call_tts(f"{idx}", comment["comment_body"]) - else: - self.split_post(comment["comment_body"], idx) - - print_substep("Saved Text to MP3 files successfully.", style="bold green") - return self.length, idx - - def split_post(self, text: str, idx: int): - split_files = [] - split_text = [ - x.group().strip() - for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text) + await task + + print_substep('Saved Text to MP3 files successfully.', style='bold green') + return [ + comments for comments, condition in + zip(self.reddit_object['comments'], async_tasks_primary[async_tasks_offset:]) + if condition ] - idy = None - for idy, text_cut in enumerate(split_text): - # print(f"{idx}-{idy}: {text_cut}\n") - self.call_tts(f"{idx}-{idy}.part", text_cut) - split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy}.part.mp3")) - CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile( - f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None + async def call_tts( + self, + filename: str, + text: str + ) -> bool: + await self.tts_module.run( + text=self.process_text(text), + filepath=f'{self.path}/{filename}.mp3' ) - for i in split_files: - name = i.filename - i.close() - Path(name).unlink() - - # for i in range(0, idy + 1): - # print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3") - - # Path(f"{self.path}/{idx}-{i}.part.mp3").unlink() - - def call_tts(self, filename: str, text: str): - self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3") - # try: - # self.length += MP3(f"{self.path}/{filename}.mp3").info.length - # except (MutagenError, HeaderNotFoundError): - # self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3") - clip = AudioFileClip(f"{self.path}/{filename}.mp3") - self.length += clip.duration - clip.close() - - -def process_text(text: str): - lang = settings.config["reddit"]["thread"]["post_lang"] - new_text = sanitize_text(text) - if lang: - print_substep("Translating Text...") - translated_text = ts.google(text, to_language=lang) - new_text = sanitize_text(translated_text) - return new_text + clip_length = audio_length(f'assets/audio/{filename}.mp3') + + if self.__total_length + clip_length <= self.max_length: + self.max_length += clip_length + return True + return False + + @staticmethod + def process_text( + text: str, + ) -> str: + lang = settings.config['reddit']['thread']['post_lang'] + new_text = sanitize_text(text) + if lang: + print_substep('Translating Text...') + translated_text = ts.google(text, to_language=lang) + new_text = sanitize_text(translated_text) + return new_text diff --git a/TTS/streamlabs_polly.py b/TTS/streamlabs_polly.py index b7365ab..9ec01cb 100644 --- a/TTS/streamlabs_polly.py +++ b/TTS/streamlabs_polly.py @@ -1,62 +1,71 @@ -import random -import requests -from requests.exceptions import JSONDecodeError +from aiohttp import ClientSession + +from random import choice from utils import settings -from utils.voice import check_ratelimit +from attr import attrs, attrib +from attr.validators import instance_of + +from TTS.common import BaseApiTTS, get_random_voice + voices = [ - "Brian", - "Emma", - "Russell", - "Joey", - "Matthew", - "Joanna", - "Kimberly", - "Amy", - "Geraint", - "Nicole", - "Justin", - "Ivy", - "Kendra", - "Salli", - "Raveena", + 'Brian', + 'Emma', + 'Russell', + 'Joey', + 'Matthew', + 'Joanna', + 'Kimberly', + 'Amy', + 'Geraint', + 'Nicole', + 'Justin', + 'Ivy', + 'Kendra', + 'Salli', + 'Raveena', ] # valid voices https://lazypy.ro/tts/ -class StreamlabsPolly: - def __init__(self): - self.url = "https://streamlabs.com/polly/speak" - self.max_chars = 550 - self.voices = voices - - def run(self, text, filepath, random_voice: bool = False): - if random_voice: - voice = self.randomvoice() - else: - if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]: - return ValueError( - f"Please set the config variable STREAMLABS_VOICE to a valid voice. options are: {voices}" - ) - voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize() - body = {"voice": voice, "text": text, "service": "polly"} - response = requests.post(self.url, data=body) - if not check_ratelimit(response): - self.run(text, filepath, random_voice) - - else: - try: - voice_data = requests.get(response.json()["speak_url"]) - with open(filepath, "wb") as f: - f.write(voice_data.content) - except (KeyError, JSONDecodeError): - try: - if response.json()["error"] == "No text specified!": - raise ValueError("Please specify a text to convert to speech.") - except (KeyError, JSONDecodeError): - print("Error occurred calling Streamlabs Polly") - - def randomvoice(self): - return random.choice(self.voices) +@attrs(auto_attribs=True) +class StreamlabsPolly(BaseApiTTS): + client: ClientSession = attrib( + validator=instance_of(ClientSession), + ) + random_voice: bool = False + url: str = attrib( + default='https://streamlabs.com/polly/speak', + kw_only=True, + ) + + max_chars = 550 + + async def make_request( + self, + text: str, + ): + voice = ( + get_random_voice(voices) + if self.random_voice + else str(settings.config['settings']['tts']['streamlabs_polly_voice']).capitalize() + if str(settings.config['settings']['tts']['streamlabs_polly_voice']).lower() in [ + voice.lower() for voice in voices] + else get_random_voice(voices) + ) + + response = await self.client.post( + self.url, + data={ + 'voice': voice, + 'text': text, + 'service': 'polly', + } + ) + speak_url = await( + await response.json() + )['speak_url'] + + return await self.client.get(speak_url) diff --git a/main.py b/main.py index 8ce8725..b563636 100755 --- a/main.py +++ b/main.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -import math +from asyncio import run from subprocess import Popen from os import name from reddit.subreddit import get_subreddit_threads @@ -9,8 +9,6 @@ from utils import settings # from utils.checker import envUpdate from video_creation.background import ( - download_background, - chop_background_video, get_background_config, ) from video_creation.final_video import make_final_video @@ -35,24 +33,21 @@ print_markdown( print_step(f"You are using V{VERSION} of the bot") -def main(POST_ID=None): +async def main(POST_ID=None): cleanup() reddit_object = get_subreddit_threads(POST_ID) - length, number_of_comments = save_text_to_mp3(reddit_object) - length = math.ceil(length) - download_screenshots_of_reddit_posts(reddit_object, number_of_comments) + comments_created = await save_text_to_mp3(reddit_object) + download_screenshots_of_reddit_posts(reddit_object, comments_created) bg_config = get_background_config() - download_background(bg_config) - chop_background_video(bg_config, length) - make_final_video(number_of_comments, length, reddit_object, bg_config) + make_final_video(comments_created, reddit_object, bg_config) -def run_many(times): +async def run_many(times): for x in range(1, times + 1): print_step( - f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th","th", "th")[x%10]} iteration of {times}' + f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}' ) # correct 1st 2nd 3rd 4th 5th.... - main() + await main() Popen("cls" if name == "nt" else "clear", shell=True).wait() @@ -61,15 +56,19 @@ if __name__ == "__main__": config is False and exit() try: if config["settings"]["times_to_run"]: - run_many(config["settings"]["times_to_run"]) + run( + run_many(config["settings"]["times_to_run"]) + ) elif len(config["reddit"]["thread"]["post_id"].split("+")) > 1: for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")): index += 1 print_step( - f'on the {index}{("st" if index%10 == 1 else ("nd" if index%10 == 2 else ("rd" if index%10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}' + f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}' + ) + run( + main(post_id) ) - main(post_id) Popen("cls" if name == "nt" else "clear", shell=True).wait() else: main() diff --git a/requirements.txt b/requirements.txt index 8b377c2..b6f79fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,7 @@ requests==2.28.1 rich==12.4.4 toml==0.10.2 translators==5.3.1 +attrs==21.4.0 +aiohttp==3.8.1 # There is security warning for <=3.8.1, no fixes for now +aiofiles==0.8.0 +mutagen==1.45.1 diff --git a/utils/voice.py b/utils/voice.py index 0272b09..ccdff33 100644 --- a/utils/voice.py +++ b/utils/voice.py @@ -10,7 +10,9 @@ if sys.version_info[0] >= 3: from datetime import timezone -def check_ratelimit(response: Response): +def check_ratelimit( + response: Response, +): """ Checks if the response is a ratelimit response. If it is, it sleeps for the time specified in the response. diff --git a/video_creation/final_video.py b/video_creation/final_video.py index f1e1f96..7068b61 100755 --- a/video_creation/final_video.py +++ b/video_creation/final_video.py @@ -3,7 +3,7 @@ import multiprocessing import os import re from os.path import exists -from typing import Dict, Tuple, Any +from typing import Tuple, Any import translators as ts @@ -13,7 +13,6 @@ from moviepy.editor import ( ImageClip, concatenate_videoclips, concatenate_audioclips, - CompositeAudioClip, CompositeVideoClip, ) from moviepy.video.io.ffmpeg_tools import ffmpeg_merge_video_audio, ffmpeg_extract_subclip @@ -23,24 +22,26 @@ from utils.cleanup import cleanup from utils.console import print_step, print_substep from utils.videos import save_data from utils import settings - +from video_creation.background import download_background, chop_background_video console = Console() -W, H = 1080, 1920 +W, H = 1080, 1920 # TODO move to config -def name_normalize(name: str) -> str: +def name_normalize( + name: str +) -> str: name = re.sub(r'[?\\"%*:|<>]', "", name) - name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name) - name = re.sub(r"( [w,W]\s?\/)", r" with", name) - name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name) - name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name) - name = re.sub(r"\/", r"", name) + name = re.sub(r'( [w,W]\s?\/\s?[o,O,0])', r' without', name) + name = re.sub(r'( [w,W]\s?\/)', r' with', name) + name = re.sub(r'(\d+)\s?\/\s?(\d+)', r'\1 of \2', name) + name = re.sub(r'(\w+)\s?\/\s?(\w+)', r'\1 or \2', name) + name = re.sub(r'\/', '', name) - lang = settings.config["reddit"]["thread"]["post_lang"] + lang = settings.config['reddit']['thread']['post_lang'] if lang: - print_substep("Translating filename...") + print_substep('Translating filename...') translated_name = ts.google(name, to_language=lang) return translated_name @@ -49,48 +50,46 @@ def name_normalize(name: str) -> str: def make_final_video( - number_of_clips: int, length: int, reddit_obj: dict, background_config: Tuple[str, str, str, Any] -): - """Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp + indexes_of_clips: list, + reddit_obj: dict, + background_config: Tuple[str, str, str, Any], +) -> None: + """ + Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp + Args: - number_of_clips (int): Index to end at when going through the screenshots' - length (int): Length of the video + indexes_of_clips (list): Indexes of voiced comments reddit_obj (dict): The reddit object that contains the posts to read. background_config (Tuple[str, str, str, Any]): The background config to use. """ - print_step("Creating the final video 🎥") + print_step('Creating the final video 🎥') VideoFileClip.reW = lambda clip: clip.resize(width=W) VideoFileClip.reH = lambda clip: clip.resize(width=H) - opacity = settings.config["settings"]["opacity"] - background_clip = ( - VideoFileClip("assets/temp/background.mp4") - .without_audio() - .resize(height=H) - .crop(x1=1166.6, y1=0, x2=2246.6, y2=1920) - ) + opacity = settings.config['settings']['opacity'] + + final_length = 0 # Gather all audio clips - audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)] - audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3")) - audio_concat = concatenate_audioclips(audio_clips) - audio_composite = CompositeAudioClip([audio_concat]) + audio_clips = [AudioFileClip(f'assets/temp/mp3/{i}.mp3') for i in indexes_of_clips] + audio_clips.insert(0, AudioFileClip('assets/temp/mp3/title.mp3')) + audio_composite = concatenate_audioclips(audio_clips) - console.log(f"[bold green] Video Will Be: {length} Seconds Long") + console.log(f'[bold green] Video Will Be: {audio_composite.length} Seconds Long') # add title to video image_clips = [] # Gather all images new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity) image_clips.insert( 0, - ImageClip("assets/temp/png/title.png") + ImageClip('assets/temp/png/title.png') .set_duration(audio_clips[0].duration) .resize(width=W - 100) .set_opacity(new_opacity), ) - for i in range(0, number_of_clips): + for i in indexes_of_clips: image_clips.append( - ImageClip(f"assets/temp/png/comment_{i}.png") + ImageClip(f'assets/temp/png/comment_{i}.png') .set_duration(audio_clips[i + 1].duration) .resize(width=W - 100) .set_opacity(new_opacity) @@ -109,63 +108,73 @@ def make_final_video( img_clip_pos = background_config[3] image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos) image_concat.audio = audio_composite + + download_background(background_config) + chop_background_video(background_config, final_length) + background_clip = ( + VideoFileClip("assets/temp/background.mp4") + .without_audio() + .resize(height=H) + .crop(x1=1166.6, y1=0, x2=2246.6, y2=1920) + ) + final = CompositeVideoClip([background_clip, image_concat]) - title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) - idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) + title = re.sub(r'[^\w\s-]', '', reddit_obj['thread_title']) + idx = re.sub(r'[^\w\s-]', '', reddit_obj['thread_id']) - filename = f"{name_normalize(title)}.mp4" - subreddit = settings.config["reddit"]["thread"]["subreddit"] + filename = f'{name_normalize(title)}.mp4' + subreddit = settings.config['reddit']['thread']['subreddit'] save_data(subreddit, filename, title, idx, background_config[2]) - if not exists(f"./results/{subreddit}"): - print_substep("The results folder didn't exist so I made it") - os.makedirs(f"./results/{subreddit}") + if not exists(f'./results/{subreddit}'): + print_substep('The results folder didn\'t exist so I made it') + os.makedirs(f'./results/{subreddit}') final.write_videofile( - "assets/temp/temp.mp4", + 'assets/temp/temp.mp4', fps=30, - audio_codec="aac", - audio_bitrate="192k", + audio_codec='aac', + audio_bitrate='192k', verbose=False, threads=multiprocessing.cpu_count(), ) - if settings.config["settings"]["background_audio"]: - print("[bold green] Merging background audio with video") - if not exists(f"assets/backgrounds/background.mp3"): + if settings.config['settings']['background_audio']: + print('[bold green] Merging background audio with video') + if not exists('assets/backgrounds/background.mp3'): print_substep( - "Cannot find assets/backgrounds/background.mp3 audio file didn't so skipping." + 'Cannot find assets/backgrounds/background.mp3 audio file didn\'t so skipping.' ) ffmpeg_extract_subclip( - "assets/temp/temp.mp4", + 'assets/temp/temp.mp4', 0, final.duration, - targetname=f"results/{subreddit}/{filename}", + targetname=f'results/{subreddit}/{filename}', ) else: ffmpeg_merge_video_audio( - "assets/temp/temp.mp4", - "assets/backgrounds/background.mp3", - "assets/temp/temp_audio.mp4", + 'assets/temp/temp.mp4', + 'assets/backgrounds/background.mp3', + 'assets/temp/temp_audio.mp4', ) ffmpeg_extract_subclip( # check if this gets run - "assets/temp/temp_audio.mp4", + 'assets/temp/temp_audio.mp4', 0, final.duration, targetname=f"results/{subreddit}/{filename}", ) else: - print("debug duck") + print('debug duck') ffmpeg_extract_subclip( - "assets/temp/temp.mp4", + 'assets/temp/temp.mp4', 0, final.duration, - targetname=f"results/{subreddit}/{filename}", + targetname=f'results/{subreddit}/{filename}', ) - print_step("Removing temporary files 🗑") + print_step('Removing temporary files 🗑') cleanups = cleanup() - print_substep(f"Removed {cleanups} temporary files 🗑") - print_substep("See result in the results folder!") + print_substep(f'Removed {cleanups} temporary files 🗑') + print_substep('See result in the results folder!') print_step( f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}' diff --git a/video_creation/screenshot_downloader.py b/video_creation/screenshot_downloader.py index 6fb9ef4..e1bb9e5 100644 --- a/video_creation/screenshot_downloader.py +++ b/video_creation/screenshot_downloader.py @@ -16,12 +16,12 @@ from utils.console import print_step, print_substep storymode = False -def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int): +def download_screenshots_of_reddit_posts(reddit_object: dict, voiced_idx: list): """Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png Args: reddit_object (Dict): Reddit object received from reddit/subreddit.py - screenshot_num (int): Number of screenshots to download + voiced_idx (int): Indexes of voiced comments """ print_step("Downloading screenshots of reddit posts...") @@ -76,12 +76,11 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in path="assets/temp/png/story_content.png" ) else: - for idx, comment in enumerate( - track(reddit_object["comments"], "Downloading screenshots...") + for idx in track( + screenshot_num, + "Downloading screenshots..." ): - # Stop if we have reached the screenshot_num - if idx >= screenshot_num: - break + comment = reddit_object["comments"][idx] if page.locator('[data-testid="content-gate"]').is_visible(): page.locator('[data-testid="content-gate"] button').click() diff --git a/video_creation/voices.py b/video_creation/voices.py index ffc0898..b4eaf1f 100644 --- a/video_creation/voices.py +++ b/video_creation/voices.py @@ -1,55 +1,50 @@ -#!/usr/bin/env python - -from typing import Dict, Tuple - -from rich.console import Console - from TTS.engine_wrapper import TTSEngine from TTS.GTTS import GTTS from TTS.streamlabs_polly import StreamlabsPolly from TTS.aws_polly import AWSPolly from TTS.TikTok import TikTok + from utils import settings from utils.console import print_table, print_step -console = Console() - TTSProviders = { - "GoogleTranslate": GTTS, - "AWSPolly": AWSPolly, - "StreamlabsPolly": StreamlabsPolly, - "TikTok": TikTok, + 'GoogleTranslate': GTTS, + 'AWSPolly': AWSPolly, + 'StreamlabsPolly': StreamlabsPolly, + 'TikTok': TikTok, } -def save_text_to_mp3(reddit_obj) -> Tuple[int, int]: +async def save_text_to_mp3( + reddit_obj: dict, +) -> list: """Saves text to MP3 files. Args: reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py Returns: - tuple[int,int]: (total length of the audio, the number of comments audio was generated for) + The number of comments audio was generated for """ - voice = settings.config["settings"]["tts"]["choice"] - if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): - text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj) - else: + voice = settings.config['settings']['tts']['choice'] + if voice.casefold() not in map(lambda _: _.casefold(), TTSProviders): while True: - print_step("Please choose one of the following TTS providers: ") + print_step('Please choose one of the following TTS providers: ') print_table(TTSProviders) - choice = input("\n") - if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): + voice = input('\n') + if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): break - print("Unknown Choice") - text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj) - - return text_to_mp3.run() + print('Unknown Choice') + engine_instance = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj) + return await engine_instance.run() -def get_case_insensitive_key_value(input_dict, key): +def get_case_insensitive_key_value( + input_dict, + key +) -> object: return next( (value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()), None,