diff --git a/TTS/TikTok.py b/TTS/TikTok.py index 91bf43d..91ba526 100644 --- a/TTS/TikTok.py +++ b/TTS/TikTok.py @@ -62,7 +62,9 @@ noneng = [ class TikTok: # TikTok Text-to-Speech Wrapper def __init__(self): - self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker=" + self.URI_BASE = ( + "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker=" + ) self.max_chars = 300 self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng} @@ -76,9 +78,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper else (os.getenv("TIKTOK_VOICE") or random.choice(self.voices["human"])) ) try: - r = requests.post( - f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0" - ) + r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") except requests.exceptions.SSLError: # https://stackoverflow.com/a/47475019/18516611 session = requests.Session() @@ -86,9 +86,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper adapter = HTTPAdapter(max_retries=retry) session.mount("http://", adapter) session.mount("https://", adapter) - r = session.post( - f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0" - ) + r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") # print(r.text) vstr = [r.json()["data"]["v_str"]][0] b64d = base64.b64decode(vstr) diff --git a/TTS/engine_wrapper.py b/TTS/engine_wrapper.py index 01d09e2..bbe4e9a 100644 --- a/TTS/engine_wrapper.py +++ b/TTS/engine_wrapper.py @@ -9,7 +9,9 @@ from rich.progress import track from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips from utils.console import print_step, print_substep from utils.voice import sanitize_text -DEFUALT_MAX_LENGTH: int = 50 # video length variable + +DEFUALT_MAX_LENGTH: int = 50 # video length variable + class TTSEngine: @@ -51,16 +53,11 @@ class TTSEngine: print_step("Saving Text to MP3 files...") self.call_tts("title", self.reddit_object["thread_title"]) - if ( - self.reddit_object["thread_post"] != "" - and getenv("STORYMODE", "").casefold() == "true" - ): + if self.reddit_object["thread_post"] != "" and getenv("STORYMODE", "").casefold() == "true": self.call_tts("posttext", self.reddit_object["thread_post"]) idx = None - for idx, comment in track( - enumerate(self.reddit_object["comments"]), "Saving..." - ): + for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."): # ! Stop creating mp3 files if the length is greater than max length. if self.length > self.max_length: break @@ -76,9 +73,7 @@ class TTSEngine: split_files = [] split_text = [ x.group().strip() - for x in re.finditer( - rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text - ) + for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text) ] idy = None @@ -95,9 +90,7 @@ class TTSEngine: Path(f"{self.path}/{idx}-{i}.part.mp3").unlink() def call_tts(self, filename: str, text: str): - self.tts_module.run( - text=process_text(text), filepath=f"{self.path}/{filename}.mp3" - ) + self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3") self.length += MP3(f"{self.path}/{filename}.mp3").info.length diff --git a/TTS/streamlabs_polly.py b/TTS/streamlabs_polly.py index 68ec379..41fe269 100644 --- a/TTS/streamlabs_polly.py +++ b/TTS/streamlabs_polly.py @@ -49,9 +49,12 @@ class StreamlabsPolly: except (KeyError, JSONDecodeError): try: if response.json()["error"] == "No text specified!": - raise ValueError('Please specify a text to convert to speech.') + raise ValueError("Please specify a text to convert to speech.") except (KeyError, JSONDecodeError): print("Error occurred calling Streamlabs Polly") + def randomvoice(self): return random.choice(self.voices) -#StreamlabsPolly().run(text=str('hi hi ' * 92)[1:], filepath='hello.mp3', random_voice=True) + + +# StreamlabsPolly().run(text=str('hi hi ' * 92)[1:], filepath='hello.mp3', random_voice=True) diff --git a/main.py b/main.py index 99d0995..96cc764 100755 --- a/main.py +++ b/main.py @@ -58,10 +58,12 @@ if __name__ == "__main__": if getenv("TIMES_TO_RUN") and isinstance(int(getenv("TIMES_TO_RUN")), int): run_many(int(getenv("TIMES_TO_RUN"))) - elif len(getenv("POST_ID", '').split('+')) > 1: - for index, post_id in enumerate(getenv("POST_ID", '').split('+')): + elif len(getenv("POST_ID", "").split("+")) > 1: + for index, post_id in enumerate(getenv("POST_ID", "").split("+")): index += 1 - print_step(f'on the {index}{("st" if index == 1 else ("nd" if index == 2 else ("rd" if index == 3 else "th")))} post of {len(getenv("POST_ID", "").split("+"))}') + print_step( + f'on the {index}{("st" if index == 1 else ("nd" if index == 2 else ("rd" if index == 3 else "th")))} post of {len(getenv("POST_ID", "").split("+"))}' + ) main(post_id) Popen("cls" if name == "nt" else "clear", shell=True).wait() else: diff --git a/reddit/subreddit.py b/reddit/subreddit.py index ddbdfe1..e1f8940 100644 --- a/reddit/subreddit.py +++ b/reddit/subreddit.py @@ -18,9 +18,7 @@ def get_subreddit_threads(POST_ID: str): content = {} if str(getenv("REDDIT_2FA")).casefold() == "yes": - print( - "\nEnter your two-factor authentication code from your authenticator app.\n" - ) + print("\nEnter your two-factor authentication code from your authenticator app.\n") code = input("> ") print() pw = getenv("REDDIT_PASSWORD") @@ -28,7 +26,7 @@ def get_subreddit_threads(POST_ID: str): else: passkey = getenv("REDDIT_PASSWORD") username = getenv("REDDIT_USERNAME") - if username.casefold().startswith('u/'): + if username.casefold().startswith("u/"): username = username[2:] reddit = praw.Reddit( client_id=getenv("REDDIT_CLIENT_ID"), @@ -42,24 +40,20 @@ def get_subreddit_threads(POST_ID: str): # Ask user for subreddit input print_step("Getting subreddit threads...") if not getenv( - "SUBREDDIT" + "SUBREDDIT" ): # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython") try: subreddit = reddit.subreddit( - re.sub( - r"r\/", "", input("What subreddit would you like to pull from? ") - ) + re.sub(r"r\/", "", input("What subreddit would you like to pull from? ")) # removes the r/ from the input ) except ValueError: subreddit = reddit.subreddit("askreddit") print_substep("Subreddit not defined. Using AskReddit.") else: - print_substep( - f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config" - ) + print_substep(f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config") subreddit_choice = getenv("SUBREDDIT") - if subreddit_choice.casefold().startswith('r/'): # removes the r/ from the input + if subreddit_choice.casefold().startswith("r/"): # removes the r/ from the input subreddit_choice = subreddit_choice[2:] subreddit = reddit.subreddit( subreddit_choice @@ -67,13 +61,13 @@ def get_subreddit_threads(POST_ID: str): if POST_ID: # would only be called if there are multiple queued posts submission = reddit.submission(id=POST_ID) - elif getenv("POST_ID") and len(getenv("POST_ID").split('+')) == 1: + elif getenv("POST_ID") and len(getenv("POST_ID").split("+")) == 1: submission = reddit.submission(id=getenv("POST_ID")) else: threads = subreddit.hot(limit=25) submission = get_subreddit_undone(threads, subreddit) - submission = check_done(submission) # double checking + submission = check_done(submission) # double-checking if submission is None: return get_subreddit_threads(POST_ID) # submission already done. rerun upvotes = submission.score @@ -98,12 +92,14 @@ def get_subreddit_threads(POST_ID: str): continue # # see https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/78 if not top_level_comment.stickied: if len(top_level_comment.body) <= int(getenv("MAX_COMMENT_LENGTH", "500")): - if top_level_comment.author is not None: # if errors occur with this change to if not. + if ( + top_level_comment.author is not None + ): # if errors occur with this change to if not. content["comments"].append( { "comment_body": top_level_comment.body, "comment_url": top_level_comment.permalink, - "comment_id": top_level_comment.id + "comment_id": top_level_comment.id, } ) print_substep("Received subreddit threads Successfully.", style="bold green") diff --git a/utils/checker.py b/utils/checker.py index c56e063..791a376 100755 --- a/utils/checker.py +++ b/utils/checker.py @@ -35,11 +35,7 @@ def check_env() -> bool: req_envs = [] var_optional = False for line in template.readlines(): - if ( - line.startswith("#") is not True - and "=" in line - and var_optional is not True - ): + if line.startswith("#") is not True and "=" in line and var_optional is not True: req_envs.append(line.split("=")[0]) if "#" in line: examples[line.split("=")[0]] = "#".join(line.split("#")[1:]).strip() @@ -60,9 +56,7 @@ def check_env() -> bool: ) var_optional = False elif line.startswith("#MATCH_TYPE "): - types[req_envs[-1]] = eval( - line.removeprefix("#MATCH_TYPE ")[:-1].split()[0] - ) + types[req_envs[-1]] = eval(line.removeprefix("#MATCH_TYPE ")[:-1].split()[0]) var_optional = False elif line.startswith("#EXPLANATION "): explanations[req_envs[-1]] = line.removeprefix("#EXPLANATION ")[:-1] @@ -88,9 +82,9 @@ def check_env() -> bool: try: temp = types[env](value) if env in bounds.keys(): - (bounds[env][0] <= temp or incorrect.add(env)) and len( - bounds[env] - ) > 1 and (bounds[env][1] >= temp or incorrect.add(env)) + (bounds[env][0] <= temp or incorrect.add(env)) and len(bounds[env]) > 1 and ( + bounds[env][1] >= temp or incorrect.add(env) + ) except ValueError: incorrect.add(env) @@ -113,17 +107,11 @@ def check_env() -> bool: for env in missing: table.add_row( env, - explanations[env] - if env in explanations.keys() - else "No explanation given", + explanations[env] if env in explanations.keys() else "No explanation given", examples[env] if env in examples.keys() else "", - str(bounds[env][0]) - if env in bounds.keys() and bounds[env][1] is not None - else "", + str(bounds[env][0]) if env in bounds.keys() and bounds[env][1] is not None else "", str(bounds[env][1]) - if env in bounds.keys() - and len(bounds[env]) > 1 - and bounds[env][1] is not None + if env in bounds.keys() and len(bounds[env]) > 1 and bounds[env][1] is not None else "", ) console.print(table) @@ -140,9 +128,7 @@ def check_env() -> bool: title_style="#C0CAF5 bold", ) table.add_column("Variable", justify="left", style="#7AA2F7 bold", no_wrap=True) - table.add_column( - "Current value", justify="left", style="#F7768E", no_wrap=False - ) + table.add_column("Current value", justify="left", style="#F7768E", no_wrap=False) table.add_column("Explanation", justify="left", style="#BB9AF7", no_wrap=False) table.add_column("Example", justify="center", style="#F7768E", no_wrap=True) table.add_column("Min", justify="right", style="#F7768E", no_wrap=True) @@ -151,14 +137,10 @@ def check_env() -> bool: table.add_row( env, os.getenv(env), - explanations[env] - if env in explanations.keys() - else "No explanation given", + explanations[env] if env in explanations.keys() else "No explanation given", str(types[env].__name__) if env in types.keys() else "str", str(bounds[env][0]) if env in bounds.keys() else "None", - str(bounds[env][1]) - if env in bounds.keys() and len(bounds[env]) > 1 - else "None", + str(bounds[env][1]) if env in bounds.keys() and len(bounds[env]) > 1 else "None", ) missing.add(env) console.print(table) @@ -195,18 +177,10 @@ def check_env() -> bool: if env in explanations.keys() else "Incorrect input. Try again.", bounds[env][0] if env in bounds.keys() else None, - bounds[env][1] - if env in bounds.keys() and len(bounds[env]) > 1 - else None, - oob_errors[env] - if env in oob_errors.keys() - else "Input too long/short.", + bounds[env][1] if env in bounds.keys() and len(bounds[env]) > 1 else None, + oob_errors[env] if env in oob_errors.keys() else "Input too long/short.", extra_info="[#C0CAF5 bold]⮶ " - + ( - explanations[env] - if env in explanations.keys() - else "No info available" - ), + + (explanations[env] if env in explanations.keys() else "No info available"), ) ) + ('"' if env not in types.keys() else "") diff --git a/utils/cleanup.py b/utils/cleanup.py index 44629a9..ef4fc44 100644 --- a/utils/cleanup.py +++ b/utils/cleanup.py @@ -10,9 +10,7 @@ def cleanup() -> int: """ if exists("./assets/temp"): count = 0 - files = [ - f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower() - ] + files = [f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()] count += len(files) for f in files: os.remove(f) diff --git a/utils/config.py b/utils/config.py index 000b615..29cbb79 100644 --- a/utils/config.py +++ b/utils/config.py @@ -1,5 +1,4 @@ # write a class that takes .env file and parses it into a dictionary - from dotenv import dotenv_values DEFAULTS = { @@ -38,3 +37,10 @@ class Config: config = Config() print(config.SUBREDDIT) +# def temp(): +# root = '' +# if isinstance(root, praw.models.Submission): +# root_type = 'submission' +# elif isinstance(root, praw.models.Comment): +# root_type = 'comment' +# diff --git a/utils/console.py b/utils/console.py index 2a9296c..cd72fd0 100644 --- a/utils/console.py +++ b/utils/console.py @@ -53,7 +53,7 @@ def handle_input( if re.match(match, user_input) is not None: if check_type is not False: try: - user_input = check_type(user_input) + user_input = check_type(user_input) # this line is fine if nmin is not None and user_input < nmin: console.print("[red]" + oob_error) # Input too low failstate continue @@ -64,14 +64,10 @@ def handle_input( except ValueError: console.print("[red]" + err_message) # Type conversion failed continue - if ( - nmin is not None and len(user_input) < nmin - ): # Check if string is long enough + if nmin is not None and len(user_input) < nmin: # Check if string is long enough console.print("[red]" + oob_error) continue - if ( - nmax is not None and len(user_input) > nmax - ): # Check if string is not too long + if nmax is not None and len(user_input) > nmax: # Check if string is not too long console.print("[red]" + oob_error) continue break diff --git a/utils/subreddit.py b/utils/subreddit.py index 949b5bf..f6ca686 100644 --- a/utils/subreddit.py +++ b/utils/subreddit.py @@ -15,9 +15,7 @@ def get_subreddit_undone(submissions: list, subreddit): """ # recursively checks if the top submission in the list was already done. - with open( - "./video_creation/data/videos.json", "r", encoding="utf-8" - ) as done_vids_raw: + with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw: done_videos = json.load(done_vids_raw) for submission in submissions: if already_done(done_videos, submission): diff --git a/utils/videos.py b/utils/videos.py index 7d8d6c9..ecbca7e 100755 --- a/utils/videos.py +++ b/utils/videos.py @@ -19,9 +19,7 @@ def check_done( dict[str]|None: Reddit object in args """ - with open( - "./video_creation/data/videos.json", "r", encoding="utf-8" - ) as done_vids_raw: + with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw: done_videos = json.load(done_vids_raw) for video in done_videos: if video["id"] == str(redditobj): diff --git a/video_creation/background.py b/video_creation/background.py index c88204b..7bf6ae2 100644 --- a/video_creation/background.py +++ b/video_creation/background.py @@ -51,9 +51,7 @@ def download_background(): "assets/backgrounds", filename=f"{credit}-{filename}" ) - print_substep( - "Background videos downloaded successfully! 🎉", style="bold green" - ) + print_substep("Background videos downloaded successfully! 🎉", style="bold green") def chop_background_video(video_length: int): diff --git a/video_creation/final_video.py b/video_creation/final_video.py index 62a55e1..5c6fd15 100755 --- a/video_creation/final_video.py +++ b/video_creation/final_video.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 import json -import time import multiprocessing -import re import os +import re +import time from os.path import exists from moviepy.editor import ( @@ -62,9 +62,7 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]): # add title to video image_clips = [] # Gather all images - if ( - opacity is None or float(opacity) >= 1 - ): # opacity not set or is set to one OR MORE + if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE image_clips.insert( 0, ImageClip("assets/temp/png/title.png") @@ -83,9 +81,7 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]): ) for i in range(0, number_of_clips): - if ( - opacity is None or float(opacity) >= 1 - ): # opacity not set or is set to one OR MORE + if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE image_clips.append( ImageClip(f"assets/temp/png/comment_{i}.png") .set_duration(audio_clips[i + 1].duration) @@ -111,15 +107,13 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]): # .set_opacity(float(opacity)), # ) # else: - image_concat = concatenate_videoclips(image_clips).set_position( - ("center", "center") - ) + image_concat = concatenate_videoclips(image_clips).set_position(("center", "center")) image_concat.audio = audio_composite final = CompositeVideoClip([background_clip, image_concat]) title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) filename = f"{title}.mp4" - subreddit = os.getenv("SUBREDDIT"); + subreddit = os.getenv("SUBREDDIT") save_data(filename, title, idx) @@ -146,7 +140,7 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]): print_substep("See result in the results folder!") print_step( - f'Reddit title: { reddit_obj["thread_title"] } \n Background Credit: {os.getenv("background_credit")}' + f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {os.getenv("background_credit")}' ) diff --git a/video_creation/screenshot_downloader.py b/video_creation/screenshot_downloader.py index a9daf40..c93427a 100644 --- a/video_creation/screenshot_downloader.py +++ b/video_creation/screenshot_downloader.py @@ -4,7 +4,6 @@ from os import getenv from pathlib import Path from playwright.async_api import async_playwright # pylint: disable=unused-import - # do not remove the above line from playwright.sync_api import sync_playwright, ViewportSize @@ -36,13 +35,9 @@ def download_screenshots_of_reddit_posts(reddit_object: dict[str], screenshot_nu context = browser.new_context() if getenv("THEME").upper() == "DARK": - cookie_file = open( - "./video_creation/data/cookie-dark-mode.json", encoding="utf-8" - ) + cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8") else: - cookie_file = open( - "./video_creation/data/cookie-light-mode.json", encoding="utf-8" - ) + cookie_file = open("./video_creation/data/cookie-light-mode.json", encoding="utf-8") cookies = json.load(cookie_file) context.add_cookies(cookies) # load preference cookies # Get the thread screenshot @@ -62,9 +57,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict[str], screenshot_nu if getenv("POSTLANG"): print_substep("Translating post...") - texts_in_tl = ts.google( - reddit_object["thread_title"], to_language=os.getenv("POSTLANG") - ) + texts_in_tl = ts.google(reddit_object["thread_title"], to_language=os.getenv("POSTLANG")) page.evaluate( "tl_content => document.querySelector('[data-test-id=\"post-content\"] > div:nth-child(3) > div > div').textContent = tl_content", @@ -73,9 +66,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict[str], screenshot_nu else: print_substep("Skipping translation...") - page.locator('[data-test-id="post-content"]').screenshot( - path="assets/temp/png/title.png" - ) + page.locator('[data-test-id="post-content"]').screenshot(path="assets/temp/png/title.png") if storymode: page.locator('[data-click-id="text"]').screenshot( diff --git a/video_creation/voices.py b/video_creation/voices.py index d6f3d89..240c851 100644 --- a/video_creation/voices.py +++ b/video_creation/voices.py @@ -22,6 +22,7 @@ TTSProviders = { "TikTok": TikTok, } + def save_text_to_mp3(reddit_obj: dict[str]) -> tuple[int, int]: """Saves text to MP3 files. @@ -34,9 +35,7 @@ def save_text_to_mp3(reddit_obj: dict[str]) -> tuple[int, int]: env = os.getenv("TTSCHOICE", "") if env.casefold() in map(lambda _: _.casefold(), TTSProviders): - text_to_mp3 = TTSEngine( - get_case_insensitive_key_value(TTSProviders, env), reddit_obj - ) + text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, env), reddit_obj) else: while True: print_step("Please choose one of the following TTS providers: ") @@ -45,19 +44,13 @@ def save_text_to_mp3(reddit_obj: dict[str]) -> tuple[int, int]: if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): break print("Unknown Choice") - text_to_mp3 = TTSEngine( - get_case_insensitive_key_value(TTSProviders, choice), reddit_obj - ) + text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj) return text_to_mp3.run() def get_case_insensitive_key_value(input_dict, key): return next( - ( - value - for dict_key, value in input_dict.items() - if dict_key.lower() == key.lower() - ), + (value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()), None, )