From 1d968ee7b7d62550bad6549b0674222fc0a1014d Mon Sep 17 00:00:00 2001 From: Drugsosos <44712637+Drugsosos@users.noreply.github.com> Date: Sun, 17 Jul 2022 01:33:35 +0300 Subject: [PATCH] more fixes after review --- TTS/TikTok.py | 98 ++++++++++++------------- TTS/aws_polly.py | 48 ++++++------ TTS/common.py | 22 +++--- TTS/engine_wrapper.py | 41 ++++++----- TTS/streamlabs_polly.py | 50 ++++++------- reddit/subreddit.py | 2 +- utils/subreddit.py | 2 +- video_creation/final_video.py | 90 ++++++++++++----------- video_creation/screenshot_downloader.py | 79 ++++++++++---------- video_creation/voices.py | 16 ++-- 10 files changed, 227 insertions(+), 221 deletions(-) diff --git a/TTS/TikTok.py b/TTS/TikTok.py index 5561ac4..c321b89 100644 --- a/TTS/TikTok.py +++ b/TTS/TikTok.py @@ -11,49 +11,49 @@ from TTS.common import BaseApiTTS, get_random_voice voices = dict() -voices['nonhuman'] = [ # DISNEY VOICES - 'en_us_ghostface', # Ghost Face - 'en_us_chewbacca', # Chewbacca - 'en_us_c3po', # C3PO - 'en_us_stitch', # Stitch - 'en_us_stormtrooper', # Stormtrooper - 'en_us_rocket', # Rocket +voices["nonhuman"] = [ # DISNEY VOICES + "en_us_ghostface", # Ghost Face + "en_us_chewbacca", # Chewbacca + "en_us_c3po", # C3PO + "en_us_stitch", # Stitch + "en_us_stormtrooper", # Stormtrooper + "en_us_rocket", # Rocket # ENGLISH VOICES ] -voices['human'] = [ - 'en_au_001', # English AU - Female - 'en_au_002', # English AU - Male - 'en_uk_001', # English UK - Male 1 - 'en_uk_003', # English UK - Male 2 - 'en_us_001', # English US - Female (Int. 1) - 'en_us_002', # English US - Female (Int. 2) - 'en_us_006', # English US - Male 1 - 'en_us_007', # English US - Male 2 - 'en_us_009', # English US - Male 3 - 'en_us_010', +voices["human"] = [ + "en_au_001", # English AU - Female + "en_au_002", # English AU - Male + "en_uk_001", # English UK - Male 1 + "en_uk_003", # English UK - Male 2 + "en_us_001", # English US - Female (Int. 1) + "en_us_002", # English US - Female (Int. 2) + "en_us_006", # English US - Male 1 + "en_us_007", # English US - Male 2 + "en_us_009", # English US - Male 3 + "en_us_010", ] -voices['non_eng'] = [ - 'fr_001', # French - Male 1 - 'fr_002', # French - Male 2 - 'de_001', # German - Female - 'de_002', # German - Male - 'es_002', # Spanish - Male +voices["non_eng"] = [ + "fr_001", # French - Male 1 + "fr_002", # French - Male 2 + "de_001", # German - Female + "de_002", # German - Male + "es_002", # Spanish - Male # AMERICA VOICES - 'es_mx_002', # Spanish MX - Male - 'br_001', # Portuguese BR - Female 1 - 'br_003', # Portuguese BR - Female 2 - 'br_004', # Portuguese BR - Female 3 - 'br_005', # Portuguese BR - Male + "es_mx_002", # Spanish MX - Male + "br_001", # Portuguese BR - Female 1 + "br_003", # Portuguese BR - Female 2 + "br_004", # Portuguese BR - Female 3 + "br_005", # Portuguese BR - Male # ASIA VOICES - 'id_001', # Indonesian - Female - 'jp_001', # Japanese - Female 1 - 'jp_003', # Japanese - Female 2 - 'jp_005', # Japanese - Female 3 - 'jp_006', # Japanese - Male - 'kr_002', # Korean - Male 1 - 'kr_003', # Korean - Female - 'kr_004', # Korean - Male 2 + "id_001", # Indonesian - Female + "jp_001", # Japanese - Female 1 + "jp_003", # Japanese - Female 2 + "jp_005", # Japanese - Female 3 + "jp_006", # Japanese - Male + "kr_002", # Korean - Male 1 + "kr_003", # Korean - Female + "kr_004", # Korean - Male 2 ] @@ -69,7 +69,7 @@ class TikTok(BaseApiTTS): # TikTok Text-to-Speech Wrapper validator=instance_of(bool), default=False ) - uri_base: str = 'https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/' + uri_base: str = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/" max_chars: int = 300 decode_base64: bool = True @@ -87,28 +87,28 @@ class TikTok(BaseApiTTS): # TikTok Text-to-Speech Wrapper Request's response """ voice = ( - get_random_voice(voices, 'human') + get_random_voice(voices, "human") if self.random_voice - else str(settings.config['settings']['tts']['tiktok_voice']).lower() - if str(settings.config['settings']['tts']['tiktok_voice']).lower() in [ + else str(settings.config["settings"]["tts"]["tiktok_voice"]).lower() + if str(settings.config["settings"]["tts"]["tiktok_voice"]).lower() in [ voice.lower() for dict_title in voices for voice in voices[dict_title]] - else get_random_voice(voices, 'human') + else get_random_voice(voices, "human") ) try: r = requests.post( self.uri_base, params={ - 'text_speaker': voice, - 'req_text': text, - 'speaker_map_type': 0, + "text_speaker": voice, + "req_text": text, + "speaker_map_type": 0, }) except requests.exceptions.SSLError: # https://stackoverflow.com/a/47475019/18516611 session = requests.Session() retry = Retry(connect=3, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) - session.mount('http://', adapter) - session.mount('https://', adapter) - r = session.post(f'{self.uri_base}{voice}&req_text={text}&speaker_map_type=0') + session.mount("http://", adapter) + session.mount("https://", adapter) + r = session.post(f"{self.uri_base}{voice}&req_text={text}&speaker_map_type=0") # print(r.text) - return r.json()['data']['v_str'] + return r.json()["data"]["v_str"] diff --git a/TTS/aws_polly.py b/TTS/aws_polly.py index f63ce61..1a9c87b 100644 --- a/TTS/aws_polly.py +++ b/TTS/aws_polly.py @@ -11,21 +11,21 @@ from TTS.common import get_random_voice voices = [ - 'Brian', - 'Emma', - 'Russell', - 'Joey', - 'Matthew', - 'Joanna', - 'Kimberly', - 'Amy', - 'Geraint', - 'Nicole', - 'Justin', - 'Ivy', - 'Kendra', - 'Salli', - 'Raveena', + "Brian", + "Emma", + "Russell", + "Joey", + "Matthew", + "Joanna", + "Kimberly", + "Amy", + "Geraint", + "Nicole", + "Justin", + "Ivy", + "Kendra", + "Salli", + "Raveena", ] @@ -50,20 +50,20 @@ class AWSPolly: filepath: name of the audio file """ try: - session = Session(profile_name='polly') - polly = session.client('polly') + session = Session(profile_name="polly") + polly = session.client("polly") voice = ( get_random_voice(voices) if self.random_voice - else str(settings.config['settings']['tts']['aws_polly_voice']).capitalize() - if str(settings.config['settings']['tts']['aws_polly_voice']).lower() in [voice.lower() for voice in + else str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize() + if str(settings.config["settings"]["tts"]["aws_polly_voice"]).lower() in [voice.lower() for voice in voices] else get_random_voice(voices) ) try: # Request speech synthesis response = polly.synthesize_speech( - Text=text, OutputFormat='mp3', VoiceId=voice, Engine='neural' + Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural" ) except (BotoCoreError, ClientError) as error: # The service returned an error, exit gracefully @@ -71,15 +71,15 @@ class AWSPolly: sys.exit(-1) # Access the audio stream from the response - if 'AudioStream' in response: - file = open(filepath, 'wb') - file.write(response['AudioStream'].read()) + if "AudioStream" in response: + file = open(filepath, "wb") + file.write(response["AudioStream"].read()) file.close() # print_substep(f"Saved Text {idx} to MP3 files successfully.", style="bold green") else: # The response didn't contain audio data, exit gracefully - print('Could not stream audio') + print("Could not stream audio") sys.exit(-1) except ProfileNotFound: print("You need to install the AWS CLI and configure your profile") diff --git a/TTS/common.py b/TTS/common.py index 5e6f629..f355a89 100644 --- a/TTS/common.py +++ b/TTS/common.py @@ -23,18 +23,16 @@ class BaseApiTTS: Split text as a list """ # Split by comma or dot (else you can lose intonations), if there is non, split by groups of 299 chars - split_text = '' - split_text = list( - map(lambda x: x.strip() if x.strip()[-1] != '.' else x.strip()[:-1], - filter(lambda x: True if x else False, text.split('.'))) + map(lambda x: x.strip() if x.strip()[-1] != "." else x.strip()[:-1], + filter(lambda x: True if x else False, text.split("."))) ) if split_text and all([chunk.__len__() < max_length for chunk in split_text]): return split_text split_text = list( - map(lambda x: x.strip() if x.strip()[-1] != ',' else x.strip()[:-1], - filter(lambda x: True if x else False, text.split(',')) + map(lambda x: x.strip() if x.strip()[-1] != "," else x.strip()[:-1], + filter(lambda x: True if x else False, text.split(",")) ) ) if split_text and all([chunk.__len__() < max_length for chunk in split_text]): @@ -42,7 +40,7 @@ class BaseApiTTS: return list( map( - lambda x: x.strip() if x.strip()[-1] != '.' or x.strip()[-1] != ',' else x.strip()[:-1], + lambda x: x.strip() if x.strip()[-1] != "." or x.strip()[-1] != "," else x.strip()[:-1], filter( lambda x: True if x else False, [text[i:i + max_length] for i in range(0, len(text), max_length)] @@ -64,7 +62,7 @@ class BaseApiTTS: """ decoded_text = base64.b64decode(output_text) if self.decode_base64 else output_text - with open(filepath, 'wb') as out: + with open(filepath, "wb") as out: out.write(decoded_text) def run( @@ -82,7 +80,7 @@ class BaseApiTTS: Returns: """ - output_text = '' + output_text = "" if len(text) > self.max_chars: for part in self.text_len_sanitize(text, self.max_chars): if part: @@ -135,9 +133,9 @@ def audio_length( except Exception as e: import logging - logger = logging.getLogger('tts_logger') + logger = logging.getLogger("tts_logger") logger.setLevel(logging.ERROR) - handler = logging.FileHandler('.tts.log', mode='a+', encoding='utf-8') + handler = logging.FileHandler(".tts.log", mode="a+", encoding="utf-8") logger.addHandler(handler) - logger.error('Error occurred in audio_length:', e) + logger.error("Error occurred in audio_length:", e) return 0 diff --git a/TTS/engine_wrapper.py b/TTS/engine_wrapper.py index 2a0986b..af45d38 100644 --- a/TTS/engine_wrapper.py +++ b/TTS/engine_wrapper.py @@ -30,19 +30,19 @@ class TTSEngine: """ tts_module: Union[GTTS, StreamlabsPolly, TikTok, AWSPolly] = attrib() reddit_object: dict = attrib() - __path: str = 'assets/temp/mp3' + __path: str = "assets/temp/mp3" __total_length: int = 0 def __attrs_post_init__(self): # Calls an instance of the tts_module class self.tts_module = self.tts_module() # Loading settings from the config - self.max_length: int = settings.config['settings']['video_length'] - self.time_before_tts: float = settings.config['settings']['time_before_tts'] - self.time_between_pictures: float = settings.config['settings']['time_between_pictures'] + self.max_length: int = settings.config["settings"]["video_length"] + self.time_before_tts: float = settings.config["settings"]["time_before_tts"] + self.time_between_pictures: float = settings.config["settings"]["time_between_pictures"] self.__total_length = ( - settings.config['settings']['time_before_first_picture'] + - settings.config['settings']['delay_before_end'] + settings.config["settings"]["time_before_first_picture"] + + settings.config["settings"]["delay_before_end"] ) def run( @@ -59,30 +59,31 @@ class TTSEngine: # This file needs to be removed in case this post does not use post text # so that it won't appear in the final video try: - Path(f'{self.__path}/posttext.mp3').unlink() + Path(f"{self.__path}/posttext.mp3").unlink() except OSError: pass - print_step('Saving Text to MP3 files...') + print_step("Saving Text to MP3 files...") - self.call_tts('title', self.reddit_object['thread_title']) + self.call_tts("title", self.reddit_object["thread_title"]) - if self.reddit_object['thread_post'] and settings.config['settings']['storymode']: - self.call_tts('posttext', self.reddit_object['thread_post']) + if self.reddit_object["thread_post"] and settings.config["settings"]["storymode"]: + self.call_tts("posttext", self.reddit_object["thread_post"]) sync_tasks_primary = [ - self.call_tts(str(idx), comment['comment_body']) + self.call_tts(str(idx), comment["comment_body"]) for idx, comment in track( - enumerate(self.reddit_object['comments']), - description='Saving...') + enumerate(self.reddit_object["comments"]), + description="Saving...", + total=self.reddit_object["comments"].__len__()) # Crunch, there will be fix in async TTS api, maybe if self.__total_length + self.__total_length * 0.05 < self.max_length ] - print_substep('Saved Text to MP3 files successfully.', style='bold green') + print_substep("Saved Text to MP3 files successfully.", style="bold green") return [ comments for comments, condition in - zip(range(self.reddit_object['comments'].__len__()), sync_tasks_primary) + zip(range(self.reddit_object["comments"].__len__()), sync_tasks_primary) if condition ] @@ -106,10 +107,10 @@ class TTSEngine: self.tts_module.run( text=self.process_text(text), - filepath=f'{self.__path}/{filename}.mp3' + filepath=f"{self.__path}/{filename}.mp3" ) - clip_length = audio_length(f'{self.__path}/{filename}.mp3') + clip_length = audio_length(f"{self.__path}/{filename}.mp3") clip_offset = self.time_between_pictures + self.time_before_tts * 2 if clip_length and self.__total_length + clip_length + clip_offset <= self.max_length: @@ -130,10 +131,10 @@ class TTSEngine: Returns: Processed text as a str """ - lang = settings.config['reddit']['thread']['post_lang'] + lang = settings.config["reddit"]["thread"]["post_lang"] new_text = sanitize_text(text) if lang: - print_substep('Translating Text...') + print_substep("Translating Text...") translated_text = ts.google(text, to_language=lang) new_text = sanitize_text(translated_text) return new_text diff --git a/TTS/streamlabs_polly.py b/TTS/streamlabs_polly.py index 7d2ca80..a0b7e19 100644 --- a/TTS/streamlabs_polly.py +++ b/TTS/streamlabs_polly.py @@ -8,21 +8,21 @@ from TTS.common import BaseApiTTS, get_random_voice from utils.voice import check_ratelimit voices = [ - 'Brian', - 'Emma', - 'Russell', - 'Joey', - 'Matthew', - 'Joanna', - 'Kimberly', - 'Amy', - 'Geraint', - 'Nicole', - 'Justin', - 'Ivy', - 'Kendra', - 'Salli', - 'Raveena', + "Brian", + "Emma", + "Russell", + "Joey", + "Matthew", + "Joanna", + "Kimberly", + "Amy", + "Geraint", + "Nicole", + "Justin", + "Ivy", + "Kendra", + "Salli", + "Raveena", ] @@ -35,7 +35,7 @@ class StreamlabsPolly(BaseApiTTS): validator=instance_of(bool), default=False ) - url: str = 'https://streamlabs.com/polly/speak', + url: str = "https://streamlabs.com/polly/speak" max_chars: int = 550 def make_request( @@ -54,27 +54,27 @@ class StreamlabsPolly(BaseApiTTS): voice = ( get_random_voice(voices) if self.random_voice - else str(settings.config['settings']['tts']['streamlabs_polly_voice']).capitalize() - if str(settings.config['settings']['tts']['streamlabs_polly_voice']).lower() in [ + else str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize() + if str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).lower() in [ voice.lower() for voice in voices] else get_random_voice(voices) ) response = requests.post( self.url, data={ - 'voice': voice, - 'text': text, - 'service': 'polly', + "voice": voice, + "text": text, + "service": "polly", }) if not check_ratelimit(response): return self.make_request(text) else: try: - results = requests.get(response.json()['speak_url']) + results = requests.get(response.json()["speak_url"]) return results except (KeyError, JSONDecodeError): try: - if response.json()['error'] == 'No text specified!': - raise ValueError('Please specify a text to convert to speech.') + if response.json()["error"] == "No text specified!": + raise ValueError("Please specify a text to convert to speech.") except (KeyError, JSONDecodeError): - print('Error occurred calling Streamlabs Polly') + print("Error occurred calling Streamlabs Polly") diff --git a/reddit/subreddit.py b/reddit/subreddit.py index 651e9a1..50c1fb9 100644 --- a/reddit/subreddit.py +++ b/reddit/subreddit.py @@ -89,7 +89,7 @@ def get_subreddit_threads( content["thread_title"] = submission.title content["thread_post"] = submission.selftext content["thread_id"] = submission.id - content["is_nsfw"] = 'nsfw' in submission.whitelist_status + content["is_nsfw"] = "nsfw" in submission.whitelist_status content["comments"] = [] for top_level_comment in submission.comments: diff --git a/utils/subreddit.py b/utils/subreddit.py index 24f3956..0a6b1e6 100644 --- a/utils/subreddit.py +++ b/utils/subreddit.py @@ -37,7 +37,7 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0): continue if submission.num_comments < int(settings.config["reddit"]["thread"]["min_comments"]): print_substep( - 'This post has under the specified minimum of comments' + "This post has under the specified minimum of comments" f'({settings.config["reddit"]["thread"]["min_comments"]}). Skipping...' ) continue diff --git a/video_creation/final_video.py b/video_creation/final_video.py index f6e21c3..80f4282 100755 --- a/video_creation/final_video.py +++ b/video_creation/final_video.py @@ -30,18 +30,18 @@ def name_normalize( name: str ) -> str: name = re.sub(r'[?\\"%*:|<>]', "", name) - name = re.sub(r'( [w,W]\s?\/\s?[o,O,0])', r' without', name) - name = re.sub(r'( [w,W]\s?\/)', r' with', name) - name = re.sub(r'(\d+)\s?\/\s?(\d+)', r'\1 of \2', name) - name = re.sub(r'(\w+)\s?\/\s?(\w+)', r'\1 or \2', name) - name = re.sub(r'\/', '', name) + name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name) + name = re.sub(r"( [w,W]\s?\/)", r" with", name) + name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name) + name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name) + name = re.sub(r"\/", "", name) # name[:30] # the hell this little guy does? commented until explained - lang = settings.config['reddit']['thread']['post_lang'] + lang = settings.config["reddit"]["thread"]["post_lang"] if lang: import translators as ts - print_substep('Translating filename...') + print_substep("Translating filename...") translated_name = ts.google(name, to_language=lang) return translated_name return name @@ -60,29 +60,29 @@ def make_final_video( reddit_obj (dict): The reddit object that contains the posts to read. background_config (Tuple[str, str, str, Any]): The background config to use. """ - W: int = int(settings.config['settings']['video_width']) - H: int = int(settings.config['settings']['video_height']) + W: int = int(settings.config["settings"]["video_width"]) + H: int = int(settings.config["settings"]["video_height"]) if not W or not H: W, H = 1080, 1920 - max_length: int = int(settings.config['settings']['video_length']) - time_before_first_picture: float = settings.config['settings']['time_before_first_picture'] - time_before_tts: float = settings.config['settings']['time_before_tts'] - time_between_pictures: float = settings.config['settings']['time_between_pictures'] - delay_before_end: float = settings.config['settings']['delay_before_end'] + max_length: int = int(settings.config["settings"]["video_length"]) + time_before_first_picture: float = settings.config["settings"]["time_before_first_picture"] + time_before_tts: float = settings.config["settings"]["time_before_tts"] + time_between_pictures: float = settings.config["settings"]["time_between_pictures"] + delay_before_end: float = settings.config["settings"]["delay_before_end"] - print_step('Creating the final video 🎥') + print_step("Creating the final video 🎥") VideoFileClip.reW = lambda clip: clip.resize(width=W) VideoFileClip.reH = lambda clip: clip.resize(width=H) - opacity = settings.config['settings']['opacity'] / 100 + opacity = settings.config["settings"]["opacity"] / 100 def create_audio_clip( clip_title: Union[str, int], clip_start: float, ) -> 'AudioFileClip': return ( - AudioFileClip(f'assets/temp/mp3/{clip_title}.mp3') + AudioFileClip(f"assets/temp/mp3/{clip_title}.mp3") .set_start(clip_start) ) @@ -93,7 +93,7 @@ def make_final_video( correct_audio_offset = time_before_tts * 2 + time_between_pictures audio_title = create_audio_clip( - 'title', + "title", time_before_first_picture + time_before_tts, ) video_duration += audio_title.duration + time_before_first_picture + time_before_tts @@ -102,7 +102,8 @@ def make_final_video( for audio_title in track( indexes_of_clips, - description='Gathering audio clips...', + description="Gathering audio clips...", + total=indexes_of_clips.__len__() ): temp_audio_clip = create_audio_clip( audio_title, @@ -118,7 +119,7 @@ def make_final_video( # Can't use concatenate_audioclips here, it resets clips' start point audio_composite = CompositeAudioClip(audio_clips) - console.log('[bold green] Video Will Be: %.2f Seconds Long' % video_duration) + console.log("[bold green] Video Will Be: %.2f Seconds Long" % video_duration) # Gather all images new_opacity = 1 if opacity is None or opacity >= 1 else opacity @@ -129,7 +130,7 @@ def make_final_video( audio_duration: float, ) -> 'ImageClip': return ( - ImageClip(f'assets/temp/png/{image_title}.png') + ImageClip(f"assets/temp/png/{image_title}.png") .set_start(audio_start - time_before_tts) .set_duration(time_before_tts * 2 + audio_duration) .set_opacity(new_opacity) @@ -144,19 +145,23 @@ def make_final_video( image_clips.append( create_image_clip( - 'title', + "title", audio_clips[0].start, audio_clips[0].duration ) ) - for idx, photo_idx in enumerate( - indexes_for_videos, - start=index_offset, + for idx, photo_idx in track( + enumerate( + indexes_for_videos, + start=index_offset, + ), + description="Gathering audio clips...", + total=indexes_for_videos[index_offset:].__len__() ): image_clips.append( create_image_clip( - f'comment_{photo_idx}', + f"comment_{photo_idx}", audio_clips[idx].start, audio_clips[idx].duration ) @@ -174,12 +179,13 @@ def make_final_video( # else: story mode stuff # Can't use concatenate_videoclips here, it resets clips' start point - image_concat = CompositeVideoClip(image_clips).set_position(background_config[3]) + image_concat = CompositeVideoClip(image_clips) + image_concat.set_position(background_config[3]) download_background(background_config) chop_background_video(background_config, video_duration) background_clip = ( - VideoFileClip('assets/temp/background.mp4') + VideoFileClip("assets/temp/background.mp4") .set_start(0) .set_end(video_duration) .without_audio() @@ -212,15 +218,15 @@ def make_final_video( final = CompositeVideoClip([background_clip, image_concat]) final.audio = audio_composite - title = re.sub(r'[^\w\s-]', '', reddit_obj['thread_title']) - idx = re.sub(r'[^\w\s-]', '', reddit_obj['thread_id']) + title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) + idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) - filename = f'{name_normalize(title)}.mp4' - subreddit = str(settings.config['reddit']['thread']['subreddit']) + filename = f"{name_normalize(title)}.mp4" + subreddit = str(settings.config["reddit"]["thread"]["subreddit"]) - if not exists(f'./results/{subreddit}'): - print_substep('The results folder didn\'t exist so I made it') - os.makedirs(f'./results/{subreddit}') + if not exists(f"./results/{subreddit}"): + print_substep("The results folder didn't exist so I made it") + os.makedirs(f"./results/{subreddit}") # if settings.config["settings"]['background']["background_audio"] and exists(f"assets/backgrounds/background.mp3"): # audioclip = mpe.AudioFileClip(f"assets/backgrounds/background.mp3").set_duration(final.duration) @@ -235,10 +241,10 @@ def make_final_video( ) final.write_videofile( - 'assets/temp/temp.mp4', + "assets/temp/temp.mp4", fps=30, - audio_codec='aac', - audio_bitrate='192k', + audio_codec="aac", + audio_bitrate="192k", verbose=False, threads=multiprocessing.cpu_count(), ) @@ -246,13 +252,13 @@ def make_final_video( "assets/temp/temp.mp4", 0, video_duration, - targetname=f'results/{subreddit}/{filename}', + targetname=f"results/{subreddit}/{filename}", ) save_data(subreddit, filename, title, idx, background_config[2]) - print_step('Removing temporary files 🗑') + print_step("Removing temporary files 🗑") cleanups = cleanup() - print_substep(f'Removed {cleanups} temporary files 🗑') - print_substep('See result in the results folder!') + print_substep(f"Removed {cleanups} temporary files 🗑") + print_substep("See result in the results folder!") print_step( f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}' diff --git a/video_creation/screenshot_downloader.py b/video_creation/screenshot_downloader.py index de7a43d..0ec7e4e 100644 --- a/video_creation/screenshot_downloader.py +++ b/video_creation/screenshot_downloader.py @@ -18,8 +18,8 @@ from attr import attrs, attrib from attr.validators import instance_of, optional from typing import TypeVar, Optional, Callable, Union -_function = TypeVar('_function', bound=Callable[..., object]) -_exceptions = TypeVar('_exceptions', bound=Optional[Union[type, tuple, list]]) +_function = TypeVar("_function", bound=Callable[..., object]) +_exceptions = TypeVar("_exceptions", bound=Optional[Union[type, tuple, list]]) @attrs @@ -45,17 +45,17 @@ class ExceptionDecorator: except Exception as caughtException: import logging - logger = logging.getLogger('webdriver_log') + logger = logging.getLogger("webdriver_log") logger.setLevel(logging.ERROR) - handler = logging.FileHandler('.webdriver.log', mode='a+', encoding='utf-8') + handler = logging.FileHandler(".webdriver.log", mode="a+", encoding="utf-8") logger.addHandler(handler) if isinstance(self.exception, type): if not type(caughtException) == self.exception: - logger.error(f'unexpected error - {caughtException}') + logger.error(f"unexpected error - {caughtException}") else: if not type(caughtException) in self.exception: - logger.error(f'unexpected error - {caughtException}') + logger.error(f"unexpected error - {caughtException}") return wrapper @@ -89,9 +89,9 @@ class Browser: default_Viewport: dict = attrib( validator=instance_of(dict), default={ - 'defaultViewport': { - 'width': 500, - 'height': 1200, + "defaultViewport": { + "width": 500, + "height": 1200, }, }, kw_only=True, @@ -230,28 +230,28 @@ class RedditScreenshot(Browser, Wait): await self.click( page_instance, - '//*[contains(@class, \'header-user-dropdown\')]', - {'timeout': 5000}, + "//*[contains(@class, 'header-user-dropdown')]", + {"timeout": 5000}, ) # It's normal not to find it, sometimes there is none :shrug: await self.click( page_instance, - '//*[contains(text(), \'Settings\')]/ancestor::button[1]', - {'timeout': 5000}, + "//*[contains(text(), 'Settings')]/ancestor::button[1]", + {"timeout": 5000}, ) await self.click( page_instance, - '//*[contains(text(), \'Dark Mode\')]/ancestor::button[1]', - {'timeout': 5000}, + "//*[contains(text(), 'Dark Mode')]/ancestor::button[1]", + {"timeout": 5000}, ) # Closes settings await self.click( page_instance, - '//*[contains(@class, \'header-user-dropdown\')]', - {'timeout': 5000}, + "//*[contains(@class, 'header-user-dropdown')]", + {"timeout": 5000}, ) async def __close_nsfw( @@ -260,7 +260,7 @@ class RedditScreenshot(Browser, Wait): ) -> None: from asyncio import ensure_future - print_substep('Post is NSFW. You are spicy...') + print_substep("Post is NSFW. You are spicy...") # To await indirectly reload navigation = ensure_future(page_instance.waitForNavigation()) @@ -268,7 +268,7 @@ class RedditScreenshot(Browser, Wait): await self.click( page_instance, '//button[text()="Yes"]', - {'timeout': 5000}, + {"timeout": 5000}, ) # Await reload @@ -277,7 +277,7 @@ class RedditScreenshot(Browser, Wait): await (await self.find_xpath( page_instance, '//button[text()="Click to see nsfw"]', - {'timeout': 5000}, + {"timeout": 5000}, )).click() async def __collect_comment( @@ -296,10 +296,10 @@ class RedditScreenshot(Browser, Wait): await comment_page.goto(f'https://reddit.com{comment_obj["comment_url"]}') # Translates submission' comment - if settings.config['reddit']['thread']['post_lang']: + if settings.config["reddit"]["thread"]["post_lang"]: comment_tl = ts.google( - comment_obj['comment_body'], - to_language=settings.config['reddit']['thread']['post_lang'], + comment_obj["comment_body"], + to_language=settings.config["reddit"]["thread"]["post_lang"], ) await comment_page.evaluate( f'([tl_content, tl_id]) => document.querySelector(`#t1_{comment_obj["comment_id"]} > div:nth-child(2) ' @@ -309,7 +309,7 @@ class RedditScreenshot(Browser, Wait): await self.screenshot( comment_page, f'//*[contains(@id, \'t1_{comment_obj["comment_id"]}\')]', - {'path': f'assets/temp/png/comment_{filename_idx}.png'}, + {"path": f"assets/temp/png/comment_{filename_idx}.png"}, ) async def download( @@ -318,31 +318,31 @@ class RedditScreenshot(Browser, Wait): """ Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png """ - print_step('Downloading screenshots of reddit posts...') + print_step("Downloading screenshots of reddit posts...") - print_substep('Launching Headless Browser...') + print_substep("Launching Headless Browser...") await self.get_browser() # ! Make sure the reddit screenshots folder exists - Path('assets/temp/png').mkdir(parents=True, exist_ok=True) + Path("assets/temp/png").mkdir(parents=True, exist_ok=True) # Get the thread screenshot reddit_main = await self.browser.newPage() - await reddit_main.goto(self.reddit_object['thread_url']) + await reddit_main.goto(self.reddit_object["thread_url"]) - if settings.config['settings']['theme'] == 'dark': + if settings.config["settings"]["theme"] == "dark": await self.__dark_theme(reddit_main) - if self.reddit_object['is_nsfw']: + if self.reddit_object["is_nsfw"]: # This means the post is NSFW and requires to click the proceed button. await self.__close_nsfw(reddit_main) # Translates submission title - if settings.config['reddit']['thread']['post_lang']: - print_substep('Translating post...') + if settings.config["reddit"]["thread"]["post_lang"]: + print_substep("Translating post...") texts_in_tl = ts.google( - self.reddit_object['thread_title'], - to_language=settings.config['reddit']['thread']['post_lang'], + self.reddit_object["thread_title"], + to_language=settings.config["reddit"]["thread"]["post_lang"], ) await reddit_main.evaluate( @@ -351,10 +351,10 @@ class RedditScreenshot(Browser, Wait): texts_in_tl, ) else: - print_substep('Skipping translation...') + print_substep("Skipping translation...") async_tasks_primary = [ - self.__collect_comment(self.reddit_object['comments'][idx], idx) for idx in + self.__collect_comment(self.reddit_object["comments"][idx], idx) for idx in self.screenshot_idx ] @@ -362,7 +362,7 @@ class RedditScreenshot(Browser, Wait): self.screenshot( reddit_main, f'//*[contains(@id, \'t3_{self.reddit_object["thread_id"]}\')]', - {'path': f'assets/temp/png/title.png'}, + {"path": "assets/temp/png/title.png"}, ) ) @@ -380,9 +380,10 @@ class RedditScreenshot(Browser, Wait): chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0) for task in track( as_completed(chunked_tasks), - description=f'Downloading comments: Chunk {idx}/{chunk_list}', + description=f"Downloading comments: Chunk {idx}/{chunk_list}", + total=chunked_tasks.__len__(), ): await task - print_substep('Comments downloaded Successfully.', style='bold green') + print_substep("Comments downloaded Successfully.", style="bold green") await self.close_browser() diff --git a/video_creation/voices.py b/video_creation/voices.py index 7d78e5f..b372042 100644 --- a/video_creation/voices.py +++ b/video_creation/voices.py @@ -9,10 +9,10 @@ from utils.console import print_table, print_step TTSProviders = { - 'GoogleTranslate': GTTS, - 'AWSPolly': AWSPolly, - 'StreamlabsPolly': StreamlabsPolly, - 'TikTok': TikTok, + "GoogleTranslate": GTTS, + "AWSPolly": AWSPolly, + "StreamlabsPolly": StreamlabsPolly, + "TikTok": TikTok, } @@ -28,15 +28,15 @@ def save_text_to_mp3( The number of comments audio was generated for """ - voice = settings.config['settings']['tts']['choice'] + voice = settings.config["settings"]["tts"]["choice"] if voice.casefold() not in map(lambda _: _.casefold(), TTSProviders): while True: - print_step('Please choose one of the following TTS providers: ') + print_step("Please choose one of the following TTS providers: ") print_table(TTSProviders) - voice = input('\n') + voice = input("\n") if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): break - print('Unknown Choice') + print("Unknown Choice") engine_instance = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj) return engine_instance.run()