refactor: refactored using black formatter.

cmd: black . --line-length 101
pull/790/head
Jason 3 years ago
parent d03f05a1e4
commit 804967e5e4

@ -62,7 +62,9 @@ noneng = [
class TikTok: # TikTok Text-to-Speech Wrapper
def __init__(self):
self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
self.URI_BASE = (
"https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
)
self.max_chars = 300
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng}
@ -76,9 +78,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper
else (os.getenv("TIKTOK_VOICE") or random.choice(self.voices["human"]))
)
try:
r = requests.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611
session = requests.Session()
@ -86,9 +86,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
r = session.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
# print(r.text)
vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr)

@ -9,7 +9,9 @@ from rich.progress import track
from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
DEFUALT_MAX_LENGTH: int = 50 # video length variable
DEFUALT_MAX_LENGTH: int = 50 # video length variable
class TTSEngine:
@ -51,16 +53,11 @@ class TTSEngine:
print_step("Saving Text to MP3 files...")
self.call_tts("title", self.reddit_object["thread_title"])
if (
self.reddit_object["thread_post"] != ""
and getenv("STORYMODE", "").casefold() == "true"
):
if self.reddit_object["thread_post"] != "" and getenv("STORYMODE", "").casefold() == "true":
self.call_tts("posttext", self.reddit_object["thread_post"])
idx = None
for idx, comment in track(
enumerate(self.reddit_object["comments"]), "Saving..."
):
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
# ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length:
break
@ -76,9 +73,7 @@ class TTSEngine:
split_files = []
split_text = [
x.group().strip()
for x in re.finditer(
rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text
)
for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text)
]
idy = None
@ -95,9 +90,7 @@ class TTSEngine:
Path(f"{self.path}/{idx}-{i}.part.mp3").unlink()
def call_tts(self, filename: str, text: str):
self.tts_module.run(
text=process_text(text), filepath=f"{self.path}/{filename}.mp3"
)
self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3")
self.length += MP3(f"{self.path}/{filename}.mp3").info.length

@ -49,9 +49,12 @@ class StreamlabsPolly:
except (KeyError, JSONDecodeError):
try:
if response.json()["error"] == "No text specified!":
raise ValueError('Please specify a text to convert to speech.')
raise ValueError("Please specify a text to convert to speech.")
except (KeyError, JSONDecodeError):
print("Error occurred calling Streamlabs Polly")
def randomvoice(self):
return random.choice(self.voices)
#StreamlabsPolly().run(text=str('hi hi ' * 92)[1:], filepath='hello.mp3', random_voice=True)
# StreamlabsPolly().run(text=str('hi hi ' * 92)[1:], filepath='hello.mp3', random_voice=True)

@ -58,10 +58,12 @@ if __name__ == "__main__":
if getenv("TIMES_TO_RUN") and isinstance(int(getenv("TIMES_TO_RUN")), int):
run_many(int(getenv("TIMES_TO_RUN")))
elif len(getenv("POST_ID", '').split('+')) > 1:
for index, post_id in enumerate(getenv("POST_ID", '').split('+')):
elif len(getenv("POST_ID", "").split("+")) > 1:
for index, post_id in enumerate(getenv("POST_ID", "").split("+")):
index += 1
print_step(f'on the {index}{("st" if index == 1 else ("nd" if index == 2 else ("rd" if index == 3 else "th")))} post of {len(getenv("POST_ID", "").split("+"))}')
print_step(
f'on the {index}{("st" if index == 1 else ("nd" if index == 2 else ("rd" if index == 3 else "th")))} post of {len(getenv("POST_ID", "").split("+"))}'
)
main(post_id)
Popen("cls" if name == "nt" else "clear", shell=True).wait()
else:

@ -18,9 +18,7 @@ def get_subreddit_threads(POST_ID: str):
content = {}
if str(getenv("REDDIT_2FA")).casefold() == "yes":
print(
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
print("\nEnter your two-factor authentication code from your authenticator app.\n")
code = input("> ")
print()
pw = getenv("REDDIT_PASSWORD")
@ -28,7 +26,7 @@ def get_subreddit_threads(POST_ID: str):
else:
passkey = getenv("REDDIT_PASSWORD")
username = getenv("REDDIT_USERNAME")
if username.casefold().startswith('u/'):
if username.casefold().startswith("u/"):
username = username[2:]
reddit = praw.Reddit(
client_id=getenv("REDDIT_CLIENT_ID"),
@ -42,24 +40,20 @@ def get_subreddit_threads(POST_ID: str):
# Ask user for subreddit input
print_step("Getting subreddit threads...")
if not getenv(
"SUBREDDIT"
"SUBREDDIT"
): # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
try:
subreddit = reddit.subreddit(
re.sub(
r"r\/", "", input("What subreddit would you like to pull from? ")
)
re.sub(r"r\/", "", input("What subreddit would you like to pull from? "))
# removes the r/ from the input
)
except ValueError:
subreddit = reddit.subreddit("askreddit")
print_substep("Subreddit not defined. Using AskReddit.")
else:
print_substep(
f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config"
)
print_substep(f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config")
subreddit_choice = getenv("SUBREDDIT")
if subreddit_choice.casefold().startswith('r/'): # removes the r/ from the input
if subreddit_choice.casefold().startswith("r/"): # removes the r/ from the input
subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit(
subreddit_choice
@ -67,13 +61,13 @@ def get_subreddit_threads(POST_ID: str):
if POST_ID: # would only be called if there are multiple queued posts
submission = reddit.submission(id=POST_ID)
elif getenv("POST_ID") and len(getenv("POST_ID").split('+')) == 1:
elif getenv("POST_ID") and len(getenv("POST_ID").split("+")) == 1:
submission = reddit.submission(id=getenv("POST_ID"))
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
submission = check_done(submission) # double checking
submission = check_done(submission) # double-checking
if submission is None:
return get_subreddit_threads(POST_ID) # submission already done. rerun
upvotes = submission.score
@ -98,12 +92,14 @@ def get_subreddit_threads(POST_ID: str):
continue # # see https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/78
if not top_level_comment.stickied:
if len(top_level_comment.body) <= int(getenv("MAX_COMMENT_LENGTH", "500")):
if top_level_comment.author is not None: # if errors occur with this change to if not.
if (
top_level_comment.author is not None
): # if errors occur with this change to if not.
content["comments"].append(
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id
"comment_id": top_level_comment.id,
}
)
print_substep("Received subreddit threads Successfully.", style="bold green")

@ -35,11 +35,7 @@ def check_env() -> bool:
req_envs = []
var_optional = False
for line in template.readlines():
if (
line.startswith("#") is not True
and "=" in line
and var_optional is not True
):
if line.startswith("#") is not True and "=" in line and var_optional is not True:
req_envs.append(line.split("=")[0])
if "#" in line:
examples[line.split("=")[0]] = "#".join(line.split("#")[1:]).strip()
@ -60,9 +56,7 @@ def check_env() -> bool:
)
var_optional = False
elif line.startswith("#MATCH_TYPE "):
types[req_envs[-1]] = eval(
line.removeprefix("#MATCH_TYPE ")[:-1].split()[0]
)
types[req_envs[-1]] = eval(line.removeprefix("#MATCH_TYPE ")[:-1].split()[0])
var_optional = False
elif line.startswith("#EXPLANATION "):
explanations[req_envs[-1]] = line.removeprefix("#EXPLANATION ")[:-1]
@ -88,9 +82,9 @@ def check_env() -> bool:
try:
temp = types[env](value)
if env in bounds.keys():
(bounds[env][0] <= temp or incorrect.add(env)) and len(
bounds[env]
) > 1 and (bounds[env][1] >= temp or incorrect.add(env))
(bounds[env][0] <= temp or incorrect.add(env)) and len(bounds[env]) > 1 and (
bounds[env][1] >= temp or incorrect.add(env)
)
except ValueError:
incorrect.add(env)
@ -113,17 +107,11 @@ def check_env() -> bool:
for env in missing:
table.add_row(
env,
explanations[env]
if env in explanations.keys()
else "No explanation given",
explanations[env] if env in explanations.keys() else "No explanation given",
examples[env] if env in examples.keys() else "",
str(bounds[env][0])
if env in bounds.keys() and bounds[env][1] is not None
else "",
str(bounds[env][0]) if env in bounds.keys() and bounds[env][1] is not None else "",
str(bounds[env][1])
if env in bounds.keys()
and len(bounds[env]) > 1
and bounds[env][1] is not None
if env in bounds.keys() and len(bounds[env]) > 1 and bounds[env][1] is not None
else "",
)
console.print(table)
@ -140,9 +128,7 @@ def check_env() -> bool:
title_style="#C0CAF5 bold",
)
table.add_column("Variable", justify="left", style="#7AA2F7 bold", no_wrap=True)
table.add_column(
"Current value", justify="left", style="#F7768E", no_wrap=False
)
table.add_column("Current value", justify="left", style="#F7768E", no_wrap=False)
table.add_column("Explanation", justify="left", style="#BB9AF7", no_wrap=False)
table.add_column("Example", justify="center", style="#F7768E", no_wrap=True)
table.add_column("Min", justify="right", style="#F7768E", no_wrap=True)
@ -151,14 +137,10 @@ def check_env() -> bool:
table.add_row(
env,
os.getenv(env),
explanations[env]
if env in explanations.keys()
else "No explanation given",
explanations[env] if env in explanations.keys() else "No explanation given",
str(types[env].__name__) if env in types.keys() else "str",
str(bounds[env][0]) if env in bounds.keys() else "None",
str(bounds[env][1])
if env in bounds.keys() and len(bounds[env]) > 1
else "None",
str(bounds[env][1]) if env in bounds.keys() and len(bounds[env]) > 1 else "None",
)
missing.add(env)
console.print(table)
@ -195,18 +177,10 @@ def check_env() -> bool:
if env in explanations.keys()
else "Incorrect input. Try again.",
bounds[env][0] if env in bounds.keys() else None,
bounds[env][1]
if env in bounds.keys() and len(bounds[env]) > 1
else None,
oob_errors[env]
if env in oob_errors.keys()
else "Input too long/short.",
bounds[env][1] if env in bounds.keys() and len(bounds[env]) > 1 else None,
oob_errors[env] if env in oob_errors.keys() else "Input too long/short.",
extra_info="[#C0CAF5 bold]⮶ "
+ (
explanations[env]
if env in explanations.keys()
else "No info available"
),
+ (explanations[env] if env in explanations.keys() else "No info available"),
)
)
+ ('"' if env not in types.keys() else "")

@ -10,9 +10,7 @@ def cleanup() -> int:
"""
if exists("./assets/temp"):
count = 0
files = [
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
files = [f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()]
count += len(files)
for f in files:
os.remove(f)

@ -1,5 +1,4 @@
# write a class that takes .env file and parses it into a dictionary
from dotenv import dotenv_values
DEFAULTS = {
@ -38,3 +37,10 @@ class Config:
config = Config()
print(config.SUBREDDIT)
# def temp():
# root = ''
# if isinstance(root, praw.models.Submission):
# root_type = 'submission'
# elif isinstance(root, praw.models.Comment):
# root_type = 'comment'
#

@ -53,7 +53,7 @@ def handle_input(
if re.match(match, user_input) is not None:
if check_type is not False:
try:
user_input = check_type(user_input)
user_input = check_type(user_input) # this line is fine
if nmin is not None and user_input < nmin:
console.print("[red]" + oob_error) # Input too low failstate
continue
@ -64,14 +64,10 @@ def handle_input(
except ValueError:
console.print("[red]" + err_message) # Type conversion failed
continue
if (
nmin is not None and len(user_input) < nmin
): # Check if string is long enough
if nmin is not None and len(user_input) < nmin: # Check if string is long enough
console.print("[red]" + oob_error)
continue
if (
nmax is not None and len(user_input) > nmax
): # Check if string is not too long
if nmax is not None and len(user_input) > nmax: # Check if string is not too long
console.print("[red]" + oob_error)
continue
break

@ -15,9 +15,7 @@ def get_subreddit_undone(submissions: list, subreddit):
"""
# recursively checks if the top submission in the list was already done.
with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
done_videos = json.load(done_vids_raw)
for submission in submissions:
if already_done(done_videos, submission):

@ -19,9 +19,7 @@ def check_done(
dict[str]|None: Reddit object in args
"""
with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
done_videos = json.load(done_vids_raw)
for video in done_videos:
if video["id"] == str(redditobj):

@ -51,9 +51,7 @@ def download_background():
"assets/backgrounds", filename=f"{credit}-{filename}"
)
print_substep(
"Background videos downloaded successfully! 🎉", style="bold green"
)
print_substep("Background videos downloaded successfully! 🎉", style="bold green")
def chop_background_video(video_length: int):

@ -1,9 +1,9 @@
#!/usr/bin/env python3
import json
import time
import multiprocessing
import re
import os
import re
import time
from os.path import exists
from moviepy.editor import (
@ -62,9 +62,7 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]):
# add title to video
image_clips = []
# Gather all images
if (
opacity is None or float(opacity) >= 1
): # opacity not set or is set to one OR MORE
if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE
image_clips.insert(
0,
ImageClip("assets/temp/png/title.png")
@ -83,9 +81,7 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]):
)
for i in range(0, number_of_clips):
if (
opacity is None or float(opacity) >= 1
): # opacity not set or is set to one OR MORE
if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE
image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
@ -111,15 +107,13 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]):
# .set_opacity(float(opacity)),
# )
# else:
image_concat = concatenate_videoclips(image_clips).set_position(
("center", "center")
)
image_concat = concatenate_videoclips(image_clips).set_position(("center", "center"))
image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
filename = f"{title}.mp4"
subreddit = os.getenv("SUBREDDIT");
subreddit = os.getenv("SUBREDDIT")
save_data(filename, title, idx)
@ -146,7 +140,7 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]):
print_substep("See result in the results folder!")
print_step(
f'Reddit title: { reddit_obj["thread_title"] } \n Background Credit: {os.getenv("background_credit")}'
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {os.getenv("background_credit")}'
)

@ -4,7 +4,6 @@ from os import getenv
from pathlib import Path
from playwright.async_api import async_playwright # pylint: disable=unused-import
# do not remove the above line
from playwright.sync_api import sync_playwright, ViewportSize
@ -36,13 +35,9 @@ def download_screenshots_of_reddit_posts(reddit_object: dict[str], screenshot_nu
context = browser.new_context()
if getenv("THEME").upper() == "DARK":
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8")
else:
cookie_file = open(
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
cookie_file = open("./video_creation/data/cookie-light-mode.json", encoding="utf-8")
cookies = json.load(cookie_file)
context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot
@ -62,9 +57,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict[str], screenshot_nu
if getenv("POSTLANG"):
print_substep("Translating post...")
texts_in_tl = ts.google(
reddit_object["thread_title"], to_language=os.getenv("POSTLANG")
)
texts_in_tl = ts.google(reddit_object["thread_title"], to_language=os.getenv("POSTLANG"))
page.evaluate(
"tl_content => document.querySelector('[data-test-id=\"post-content\"] > div:nth-child(3) > div > div').textContent = tl_content",
@ -73,9 +66,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict[str], screenshot_nu
else:
print_substep("Skipping translation...")
page.locator('[data-test-id="post-content"]').screenshot(
path="assets/temp/png/title.png"
)
page.locator('[data-test-id="post-content"]').screenshot(path="assets/temp/png/title.png")
if storymode:
page.locator('[data-click-id="text"]').screenshot(

@ -22,6 +22,7 @@ TTSProviders = {
"TikTok": TikTok,
}
def save_text_to_mp3(reddit_obj: dict[str]) -> tuple[int, int]:
"""Saves text to MP3 files.
@ -34,9 +35,7 @@ def save_text_to_mp3(reddit_obj: dict[str]) -> tuple[int, int]:
env = os.getenv("TTSCHOICE", "")
if env.casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, env), reddit_obj
)
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, env), reddit_obj)
else:
while True:
print_step("Please choose one of the following TTS providers: ")
@ -45,19 +44,13 @@ def save_text_to_mp3(reddit_obj: dict[str]) -> tuple[int, int]:
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break
print("Unknown Choice")
text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, choice), reddit_obj
)
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj)
return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key):
return next(
(
value
for dict_key, value in input_dict.items()
if dict_key.lower() == key.lower()
),
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
None,
)

Loading…
Cancel
Save