fixup: Format Python code with Black

pull/1931/head
github-actions 12 months ago
parent 84c25b0323
commit 425286fe90

@ -82,9 +82,7 @@ def settings():
# Change settings # Change settings
config = gui.modify_settings(data, config_load, checks) config = gui.modify_settings(data, config_load, checks)
return render_template( return render_template("settings.html", file="config.toml", data=config, checks=checks)
"settings.html", file="config.toml", data=config, checks=checks
)
# Make videos.json accessible # Make videos.json accessible

@ -86,9 +86,7 @@ class TikTok:
"Cookie": f"sessionid={settings.config['settings']['tts']['tiktok_sessionid']}", "Cookie": f"sessionid={settings.config['settings']['tts']['tiktok_sessionid']}",
} }
self.URI_BASE = ( self.URI_BASE = "https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
"https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
)
self.max_chars = 200 self.max_chars = 200
self._session = requests.Session() self._session = requests.Session()

@ -41,9 +41,7 @@ class AWSPolly:
raise ValueError( raise ValueError(
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}" f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}"
) )
voice = str( voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize()
settings.config["settings"]["tts"]["aws_polly_voice"]
).capitalize()
try: try:
# Request speech synthesis # Request speech synthesis
response = polly.synthesize_speech( response = polly.synthesize_speech(

@ -26,9 +26,7 @@ class elevenlabs:
if random_voice: if random_voice:
voice = self.randomvoice() voice = self.randomvoice()
else: else:
voice = str( voice = str(settings.config["settings"]["tts"]["elevenlabs_voice_name"]).capitalize()
settings.config["settings"]["tts"]["elevenlabs_voice_name"]
).capitalize()
if settings.config["settings"]["tts"]["elevenlabs_api_key"]: if settings.config["settings"]["tts"]["elevenlabs_api_key"]:
api_key = settings.config["settings"]["tts"]["elevenlabs_api_key"] api_key = settings.config["settings"]["tts"]["elevenlabs_api_key"]
@ -37,9 +35,7 @@ class elevenlabs:
"You didn't set an Elevenlabs API key! Please set the config variable ELEVENLABS_API_KEY to a valid API key." "You didn't set an Elevenlabs API key! Please set the config variable ELEVENLABS_API_KEY to a valid API key."
) )
audio = generate( audio = generate(api_key=api_key, text=text, voice=voice, model="eleven_multilingual_v1")
api_key=api_key, text=text, voice=voice, model="eleven_multilingual_v1"
)
save(audio=audio, filename=filepath) save(audio=audio, filename=filepath)
def randomvoice(self): def randomvoice(self):

@ -14,11 +14,12 @@ from utils import settings
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.voice import sanitize_text from utils.voice import sanitize_text
DEFAULT_MAX_LENGTH: int = 50 # Video length variable, edit this on your own risk. It should work, but it's not supported DEFAULT_MAX_LENGTH: int = (
50 # Video length variable, edit this on your own risk. It should work, but it's not supported
)
class TTSEngine: class TTSEngine:
"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines. """Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.
Args: Args:
@ -57,9 +58,7 @@ class TTSEngine:
comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"]) comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"])
comment["comment_body"] = comment["comment_body"].replace("\n", ". ") comment["comment_body"] = comment["comment_body"].replace("\n", ". ")
comment["comment_body"] = re.sub(r"\bAI\b", "A.I", comment["comment_body"]) comment["comment_body"] = re.sub(r"\bAI\b", "A.I", comment["comment_body"])
comment["comment_body"] = re.sub( comment["comment_body"] = re.sub(r"\bAGI\b", "A.G.I", comment["comment_body"])
r"\bAGI\b", "A.G.I", comment["comment_body"]
)
if comment["comment_body"][-1] != ".": if comment["comment_body"][-1] != ".":
comment["comment_body"] += "." comment["comment_body"] += "."
comment["comment_body"] = comment["comment_body"].replace(". . .", ".") comment["comment_body"] = comment["comment_body"].replace(". . .", ".")
@ -81,17 +80,13 @@ class TTSEngine:
if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars: if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars:
self.split_post(self.reddit_object["thread_post"], "postaudio") self.split_post(self.reddit_object["thread_post"], "postaudio")
else: else:
self.call_tts( self.call_tts("postaudio", process_text(self.reddit_object["thread_post"]))
"postaudio", process_text(self.reddit_object["thread_post"])
)
elif settings.config["settings"]["storymodemethod"] == 1: elif settings.config["settings"]["storymodemethod"] == 1:
for idx, text in track(enumerate(self.reddit_object["thread_post"])): for idx, text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}", process_text(text)) self.call_tts(f"postaudio-{idx}", process_text(text))
else: else:
for idx, comment in track( for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
enumerate(self.reddit_object["comments"]), "Saving..."
):
# ! Stop creating mp3 files if the length is greater than max length. # ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length and idx > 1: if self.length > self.max_length and idx > 1:
self.length -= self.last_clip_length self.length -= self.last_clip_length
@ -174,9 +169,7 @@ class TTSEngine:
fps=44100, fps=44100,
) )
silence = volumex(silence, 0) silence = volumex(silence, 0)
silence.write_audiofile( silence.write_audiofile(f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None)
f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None
)
def process_text(text: str, clean: bool = True): def process_text(text: str, clean: bool = True):
@ -184,8 +177,6 @@ def process_text(text: str, clean: bool = True):
new_text = sanitize_text(text) if clean else text new_text = sanitize_text(text) if clean else text
if lang: if lang:
print_substep("Translating Text...") print_substep("Translating Text...")
translated_text = translators.translate_text( translated_text = translators.translate_text(text, translator="google", to_language=lang)
text, translator="google", to_language=lang
)
new_text = sanitize_text(translated_text) new_text = sanitize_text(translated_text)
return new_text return new_text

@ -21,9 +21,7 @@ class pyttsx:
if voice_id == "" or voice_num == "": if voice_id == "" or voice_num == "":
voice_id = 2 voice_id = 2
voice_num = 3 voice_num = 3
raise ValueError( raise ValueError("set pyttsx values to a valid value, switching to defaults")
"set pyttsx values to a valid value, switching to defaults"
)
else: else:
voice_id = int(voice_id) voice_id = int(voice_id)
voice_num = int(voice_num) voice_num = int(voice_num)

@ -42,9 +42,7 @@ class StreamlabsPolly:
raise ValueError( raise ValueError(
f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}" f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}"
) )
voice = str( voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize()
settings.config["settings"]["tts"]["streamlabs_polly_voice"]
).capitalize()
body = {"voice": voice, "text": text, "service": "polly"} body = {"voice": voice, "text": text, "service": "polly"}
response = requests.post(self.url, data=body) response = requests.post(self.url, data=body)
if not check_ratelimit(response): if not check_ratelimit(response):

@ -104,9 +104,7 @@ if __name__ == "__main__":
sys.exit() sys.exit()
try: try:
if config["reddit"]["thread"]["post_id"]: if config["reddit"]["thread"]["post_id"]:
for index, post_id in enumerate( for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")):
config["reddit"]["thread"]["post_id"].split("+")
):
index += 1 index += 1
print_step( print_step(
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}' f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'

@ -22,9 +22,7 @@ def get_subreddit_threads(POST_ID: str):
content = {} content = {}
if settings.config["reddit"]["creds"]["2fa"]: if settings.config["reddit"]["creds"]["2fa"]:
print( print("\nEnter your two-factor authentication code from your authenticator app.\n")
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
code = input("> ") code = input("> ")
print() print()
pw = settings.config["reddit"]["creds"]["password"] pw = settings.config["reddit"]["creds"]["password"]
@ -57,9 +55,7 @@ def get_subreddit_threads(POST_ID: str):
]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython") ]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
try: try:
subreddit = reddit.subreddit( subreddit = reddit.subreddit(
re.sub( re.sub(r"r\/", "", input("What subreddit would you like to pull from? "))
r"r\/", "", input("What subreddit would you like to pull from? ")
)
# removes the r/ from the input # removes the r/ from the input
) )
except ValueError: except ValueError:
@ -69,9 +65,7 @@ def get_subreddit_threads(POST_ID: str):
sub = settings.config["reddit"]["thread"]["subreddit"] sub = settings.config["reddit"]["thread"]["subreddit"]
print_substep(f"Using subreddit: r/{sub} from TOML config") print_substep(f"Using subreddit: r/{sub} from TOML config")
subreddit_choice = sub subreddit_choice = sub
if ( if str(subreddit_choice).casefold().startswith("r/"): # removes the r/ from the input
str(subreddit_choice).casefold().startswith("r/")
): # removes the r/ from the input
subreddit_choice = subreddit_choice[2:] subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit(subreddit_choice) subreddit = reddit.subreddit(subreddit_choice)
@ -82,12 +76,8 @@ def get_subreddit_threads(POST_ID: str):
settings.config["reddit"]["thread"]["post_id"] settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1 and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
): ):
submission = reddit.submission( submission = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"])
id=settings.config["reddit"]["thread"]["post_id"] elif settings.config["ai"]["ai_similarity_enabled"]: # ai sorting based on comparison
)
elif settings.config["ai"][
"ai_similarity_enabled"
]: # ai sorting based on comparison
threads = subreddit.hot(limit=50) threads = subreddit.hot(limit=50)
keywords = settings.config["ai"]["ai_similarity_keywords"].split(",") keywords = settings.config["ai"]["ai_similarity_keywords"].split(",")
keywords = [keyword.strip() for keyword in keywords] keywords = [keyword.strip() for keyword in keywords]
@ -105,10 +95,7 @@ def get_subreddit_threads(POST_ID: str):
if submission is None: if submission is None:
return get_subreddit_threads(POST_ID) # submission already done. rerun return get_subreddit_threads(POST_ID) # submission already done. rerun
elif ( elif not submission.num_comments and settings.config["settings"]["storymode"] == "false":
not submission.num_comments
and settings.config["settings"]["storymode"] == "false"
):
print_substep("No comments found. Skipping.") print_substep("No comments found. Skipping.")
exit() exit()

@ -5,12 +5,8 @@ from transformers import AutoTokenizer, AutoModel
# Mean Pooling - Take attention mask into account for correct averaging # Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask): def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[ token_embeddings = model_output[0] # First element of model_output contains all token embeddings
0 input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9 input_mask_expanded.sum(1), min=1e-9
) )
@ -36,19 +32,13 @@ def sort_by_similarity(thread_objects, keywords):
) )
with torch.no_grad(): with torch.no_grad():
threads_embeddings = model(**encoded_threads) threads_embeddings = model(**encoded_threads)
threads_embeddings = mean_pooling( threads_embeddings = mean_pooling(threads_embeddings, encoded_threads["attention_mask"])
threads_embeddings, encoded_threads["attention_mask"]
)
# Keywords inference # Keywords inference
encoded_keywords = tokenizer( encoded_keywords = tokenizer(keywords, padding=True, truncation=True, return_tensors="pt")
keywords, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad(): with torch.no_grad():
keywords_embeddings = model(**encoded_keywords) keywords_embeddings = model(**encoded_keywords)
keywords_embeddings = mean_pooling( keywords_embeddings = mean_pooling(keywords_embeddings, encoded_keywords["attention_mask"])
keywords_embeddings, encoded_keywords["attention_mask"]
)
# Compare every keyword w/ every thread embedding # Compare every keyword w/ every thread embedding
threads_embeddings_tensor = torch.tensor(threads_embeddings) threads_embeddings_tensor = torch.tensor(threads_embeddings)

@ -49,10 +49,7 @@ def handle_input(
optional=False, optional=False,
): ):
if optional: if optional:
console.print( console.print(message + "\n[green]This is an optional value. Do you want to skip it? (y/n)")
message
+ "\n[green]This is an optional value. Do you want to skip it? (y/n)"
)
if input().casefold().startswith("y"): if input().casefold().startswith("y"):
return default if default is not NotImplemented else "" return default if default is not NotImplemented else ""
if default is not NotImplemented: if default is not NotImplemented:
@ -86,11 +83,7 @@ def handle_input(
console.print("[red]" + err_message) console.print("[red]" + err_message)
continue continue
elif match != "" and re.match(match, user_input) is None: elif match != "" and re.match(match, user_input) is None:
console.print( console.print("[red]" + err_message + "\nAre you absolutely sure it's correct?(y/n)")
"[red]"
+ err_message
+ "\nAre you absolutely sure it's correct?(y/n)"
)
if input().casefold().startswith("y"): if input().casefold().startswith("y"):
break break
continue continue
@ -123,9 +116,5 @@ def handle_input(
if user_input in options: if user_input in options:
return user_input return user_input
console.print( console.print(
"[red bold]" "[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + "."
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
) )

@ -7,7 +7,9 @@ import requests
def ffmpeg_install_windows(): def ffmpeg_install_windows():
try: try:
ffmpeg_url = "https://github.com/GyanD/codexffmpeg/releases/download/6.0/ffmpeg-6.0-full_build.zip" ffmpeg_url = (
"https://github.com/GyanD/codexffmpeg/releases/download/6.0/ffmpeg-6.0-full_build.zip"
)
ffmpeg_zip_filename = "ffmpeg.zip" ffmpeg_zip_filename = "ffmpeg.zip"
ffmpeg_extracted_folder = "ffmpeg" ffmpeg_extracted_folder = "ffmpeg"
@ -127,9 +129,7 @@ def ffmpeg_install():
elif os.name == "mac": elif os.name == "mac":
ffmpeg_install_mac() ffmpeg_install_mac()
else: else:
print( print("Your OS is not supported. Please install FFmpeg manually and try again.")
"Your OS is not supported. Please install FFmpeg manually and try again."
)
exit() exit()
else: else:
print("Please install FFmpeg manually and try again.") print("Please install FFmpeg manually and try again.")

@ -67,11 +67,7 @@ def check(value, checks):
and not hasattr(value, "__iter__") and not hasattr(value, "__iter__")
and ( and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"]) ("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or ( or ("nmax" in checks and checks["nmax"] is not None and value > checks["nmax"])
"nmax" in checks
and checks["nmax"] is not None
and value > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -80,16 +76,8 @@ def check(value, checks):
not incorrect not incorrect
and hasattr(value, "__iter__") and hasattr(value, "__iter__")
and ( and (
( ("nmin" in checks and checks["nmin"] is not None and len(value) < checks["nmin"])
"nmin" in checks or ("nmax" in checks and checks["nmax"] is not None and len(value) > checks["nmax"])
and checks["nmin"] is not None
and len(value) < checks["nmin"]
)
or (
"nmax" in checks
and checks["nmax"] is not None
and len(value) > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -162,9 +150,7 @@ def delete_background(key):
# Add background video # Add background video
def add_background(youtube_uri, filename, citation, position): def add_background(youtube_uri, filename, citation, position):
# Validate YouTube URI # Validate YouTube URI
regex = re.compile(r"(?:\/|%3D|v=|vi=)([0-9A-z\-_]{11})(?:[%#?&]|$)").search( regex = re.compile(r"(?:\/|%3D|v=|vi=)([0-9A-z\-_]{11})(?:[%#?&]|$)").search(youtube_uri)
youtube_uri
)
if not regex: if not regex:
flash("YouTube URI is invalid!", "error") flash("YouTube URI is invalid!", "error")

@ -18,9 +18,7 @@ def draw_multiple_line_text(
Fontperm = font.getsize(text) Fontperm = font.getsize(text)
image_width, image_height = image.size image_width, image_height = image.size
lines = textwrap.wrap(text, width=wrap) lines = textwrap.wrap(text, width=wrap)
y = (image_height / 2) - ( y = (image_height / 2) - (((Fontperm[1] + (len(lines) * padding) / len(lines)) * len(lines)) / 2)
((Fontperm[1] + (len(lines) * padding) / len(lines)) * len(lines)) / 2
)
for line in lines: for line in lines:
line_width, line_height = font.getsize(line) line_width, line_height = font.getsize(line)
if transparent: if transparent:
@ -66,25 +64,19 @@ def imagemaker(theme, reddit_obj: dict, txtclr, padding=5, transparent=False) ->
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 100) font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 100)
tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 100) tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 100)
else: else:
tfont = ImageFont.truetype( tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 100) # for title
os.path.join("fonts", "Roboto-Bold.ttf"), 100
) # for title
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Regular.ttf"), 100) font = ImageFont.truetype(os.path.join("fonts", "Roboto-Regular.ttf"), 100)
size = (1920, 1080) size = (1920, 1080)
image = Image.new("RGBA", size, theme) image = Image.new("RGBA", size, theme)
# for title # for title
draw_multiple_line_text( draw_multiple_line_text(image, title, tfont, txtclr, padding, wrap=30, transparent=transparent)
image, title, tfont, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/title.png") image.save(f"assets/temp/{id}/png/title.png")
for idx, text in track(enumerate(texts), "Rendering Image"): for idx, text in track(enumerate(texts), "Rendering Image"):
image = Image.new("RGBA", size, theme) image = Image.new("RGBA", size, theme)
text = process_text(text, False) text = process_text(text, False)
draw_multiple_line_text( draw_multiple_line_text(image, text, font, txtclr, padding, wrap=30, transparent=transparent)
image, text, font, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/img{idx}.png") image.save(f"assets/temp/{id}/png/img{idx}.png")

@ -1,7 +1,5 @@
def clear_cookie_by_name(context, cookie_cleared_name): def clear_cookie_by_name(context, cookie_cleared_name):
cookies = context.cookies() cookies = context.cookies()
filtered_cookies = [ filtered_cookies = [cookie for cookie in cookies if cookie["name"] != cookie_cleared_name]
cookie for cookie in cookies if cookie["name"] != cookie_cleared_name
]
context.clear_cookies() context.clear_cookies()
context.add_cookies(filtered_cookies) context.add_cookies(filtered_cookies)

@ -53,11 +53,7 @@ def check(value, checks, name):
and not hasattr(value, "__iter__") and not hasattr(value, "__iter__")
and ( and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"]) ("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or ( or ("nmax" in checks and checks["nmax"] is not None and value > checks["nmax"])
"nmax" in checks
and checks["nmax"] is not None
and value > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -65,16 +61,8 @@ def check(value, checks, name):
not incorrect not incorrect
and hasattr(value, "__iter__") and hasattr(value, "__iter__")
and ( and (
( ("nmin" in checks and checks["nmin"] is not None and len(value) < checks["nmin"])
"nmin" in checks or ("nmax" in checks and checks["nmax"] is not None and len(value) > checks["nmax"])
and checks["nmin"] is not None
and len(value) < checks["nmin"]
)
or (
"nmax" in checks
and checks["nmax"] is not None
and len(value) > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -82,15 +70,9 @@ def check(value, checks, name):
if incorrect: if incorrect:
value = handle_input( value = handle_input(
message=( message=(
( (("[blue]Example: " + str(checks["example"]) + "\n") if "example" in checks else "")
("[blue]Example: " + str(checks["example"]) + "\n")
if "example" in checks
else ""
)
+ "[red]" + "[red]"
+ ("Non-optional ", "Optional ")[ + ("Non-optional ", "Optional ")["optional" in checks and checks["optional"] is True]
"optional" in checks and checks["optional"] is True
]
) )
+ "[#C0CAF5 bold]" + "[#C0CAF5 bold]"
+ str(name) + str(name)
@ -131,9 +113,7 @@ def check_toml(template_file, config_file) -> Tuple[bool, Dict]:
try: try:
template = toml.load(template_file) template = toml.load(template_file)
except Exception as error: except Exception as error:
console.print( console.print(f"[red bold]Encountered error when trying to to load {template_file}: {error}")
f"[red bold]Encountered error when trying to to load {template_file}: {error}"
)
return False return False
try: try:
config = toml.load(config_file) config = toml.load(config_file)

@ -6,9 +6,7 @@ from utils.ai_methods import sort_by_similarity
from utils.console import print_substep from utils.console import print_substep
def get_subreddit_undone( def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similarity_scores=None):
submissions: list, subreddit, times_checked=0, similarity_scores=None
):
"""_summary_ """_summary_
Args: Args:
@ -20,9 +18,7 @@ def get_subreddit_undone(
""" """
# Second try of getting a valid Submission # Second try of getting a valid Submission
if times_checked and settings.config["ai"]["ai_similarity_enabled"]: if times_checked and settings.config["ai"]["ai_similarity_enabled"]:
print( print("Sorting based on similarity for a different date filter and thread limit..")
"Sorting based on similarity for a different date filter and thread limit.."
)
submissions = sort_by_similarity( submissions = sort_by_similarity(
submissions, keywords=settings.config["ai"]["ai_similarity_enabled"] submissions, keywords=settings.config["ai"]["ai_similarity_enabled"]
) )
@ -31,9 +27,7 @@ def get_subreddit_undone(
if not exists("./video_creation/data/videos.json"): if not exists("./video_creation/data/videos.json"):
with open("./video_creation/data/videos.json", "w+") as f: with open("./video_creation/data/videos.json", "w+") as f:
json.dump([], f) json.dump([], f)
with open( with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for i, submission in enumerate(submissions): for i, submission in enumerate(submissions):
if already_done(done_videos, submission): if already_done(done_videos, submission):
@ -49,8 +43,7 @@ def get_subreddit_undone(
print_substep("This post was pinned by moderators. Skipping...") print_substep("This post was pinned by moderators. Skipping...")
continue continue
if ( if (
submission.num_comments submission.num_comments <= int(settings.config["reddit"]["thread"]["min_comments"])
<= int(settings.config["reddit"]["thread"]["min_comments"])
and not settings.config["settings"]["storymode"] and not settings.config["settings"]["storymode"]
): ):
print_substep( print_substep(
@ -59,9 +52,7 @@ def get_subreddit_undone(
continue continue
if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymode"]:
if not submission.selftext: if not submission.selftext:
print_substep( print_substep("You are trying to use story mode on post with no post text")
"You are trying to use story mode on post with no post text"
)
continue continue
else: else:
# Check for the length of the post text # Check for the length of the post text

@ -1,15 +1,11 @@
from PIL import ImageDraw, ImageFont from PIL import ImageDraw, ImageFont
def create_thumbnail( def create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title):
thumbnail, font_family, font_size, font_color, width, height, title
):
font = ImageFont.truetype(font_family + ".ttf", font_size) font = ImageFont.truetype(font_family + ".ttf", font_size)
Xaxis = width - (width * 0.2) # 20% of the width Xaxis = width - (width * 0.2) # 20% of the width
sizeLetterXaxis = font_size * 0.5 # 50% of the font size sizeLetterXaxis = font_size * 0.5 # 50% of the font size
XaxisLetterQty = round( XaxisLetterQty = round(Xaxis / sizeLetterXaxis) # Quantity of letters that can fit in the X axis
Xaxis / sizeLetterXaxis
) # Quantity of letters that can fit in the X axis
MarginYaxis = height * 0.12 # 12% of the height MarginYaxis = height * 0.12 # 12% of the height
MarginXaxis = width * 0.05 # 5% of the width MarginXaxis = width * 0.05 # 5% of the width
# 1.1 rem # 1.1 rem
@ -34,8 +30,6 @@ def create_thumbnail(
# loop for put the title in the thumbnail # loop for put the title in the thumbnail
for i in range(0, len(arrayTitle)): for i in range(0, len(arrayTitle)):
# 1.1 rem # 1.1 rem
draw.text( draw.text((MarginXaxis, MarginYaxis + (LineHeight * i)), arrayTitle[i], rgb, font=font)
(MarginXaxis, MarginYaxis + (LineHeight * i)), arrayTitle[i], rgb, font=font
)
return thumbnail return thumbnail

@ -19,9 +19,7 @@ def check_done(
Returns: Returns:
Submission|None: Reddit object in args Submission|None: Reddit object in args
""" """
with open( with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for video in done_videos: for video in done_videos:
if video["id"] == str(redditobj): if video["id"] == str(redditobj):
@ -35,9 +33,7 @@ def check_done(
return redditobj return redditobj
def save_data( def save_data(subreddit: str, filename: str, reddit_title: str, reddit_id: str, credit: str):
subreddit: str, filename: str, reddit_title: str, reddit_id: str, credit: str
):
"""Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json """Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json
Args: Args:

@ -43,9 +43,7 @@ def sleep_until(time) -> None:
if sys.version_info[0] >= 3 and time.tzinfo: if sys.version_info[0] >= 3 and time.tzinfo:
end = time.astimezone(timezone.utc).timestamp() end = time.astimezone(timezone.utc).timestamp()
else: else:
zoneDiff = ( zoneDiff = pytime.time() - (datetime.now() - datetime(1970, 1, 1)).total_seconds()
pytime.time() - (datetime.now() - datetime(1970, 1, 1)).total_seconds()
)
end = (time - datetime(1970, 1, 1)).total_seconds() + zoneDiff end = (time - datetime(1970, 1, 1)).total_seconds() + zoneDiff
# Type check # Type check

@ -60,9 +60,7 @@ def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int
def get_background_config(mode: str): def get_background_config(mode: str):
"""Fetch the background/s configuration""" """Fetch the background/s configuration"""
try: try:
choice = str( choice = str(settings.config["settings"]["background"][f"background_{mode}"]).casefold()
settings.config["settings"]["background"][f"background_{mode}"]
).casefold()
except AttributeError: except AttributeError:
print_substep("No background selected. Picking random background'") print_substep("No background selected. Picking random background'")
choice = None choice = None
@ -122,9 +120,7 @@ def download_background_audio(background_config: Tuple[str, str, str]):
print_substep("Background audio downloaded successfully! 🎉", style="bold green") print_substep("Background audio downloaded successfully! 🎉", style="bold green")
def chop_background( def chop_background(background_config: Dict[str, Tuple], video_length: int, reddit_object: dict):
background_config: Dict[str, Tuple], video_length: int, reddit_object: dict
):
"""Generates the background audio and footage to be used in the video and writes it to assets/temp/background.mp3 and assets/temp/background.mp4 """Generates the background audio and footage to be used in the video and writes it to assets/temp/background.mp3 and assets/temp/background.mp4
Args: Args:
@ -137,9 +133,7 @@ def chop_background(
print_step("Volume was set to 0. Skipping background audio creation . . .") print_step("Volume was set to 0. Skipping background audio creation . . .")
else: else:
print_step("Finding a spot in the backgrounds audio to chop...✂️") print_step("Finding a spot in the backgrounds audio to chop...✂️")
audio_choice = ( audio_choice = f"{background_config['audio'][2]}-{background_config['audio'][1]}"
f"{background_config['audio'][2]}-{background_config['audio'][1]}"
)
background_audio = AudioFileClip(f"assets/backgrounds/audio/{audio_choice}") background_audio = AudioFileClip(f"assets/backgrounds/audio/{audio_choice}")
start_time_audio, end_time_audio = get_start_and_end_times( start_time_audio, end_time_audio = get_start_and_end_times(
video_length, background_audio.duration video_length, background_audio.duration

@ -75,9 +75,7 @@ def name_normalize(name: str) -> str:
lang = settings.config["reddit"]["thread"]["post_lang"] lang = settings.config["reddit"]["thread"]["post_lang"]
if lang: if lang:
print_substep("Translating filename...") print_substep("Translating filename...")
translated_name = translators.translate_text( translated_name = translators.translate_text(name, translator="google", to_language=lang)
name, translator="google", to_language=lang
)
return translated_name return translated_name
else: else:
return name return name
@ -114,9 +112,7 @@ def merge_background_audio(audio: ffmpeg, reddit_id: str):
audio (ffmpeg): The TTS final audio but without background. audio (ffmpeg): The TTS final audio but without background.
reddit_id (str): The ID of subreddit reddit_id (str): The ID of subreddit
""" """
background_audio_volume = settings.config["settings"]["background"][ background_audio_volume = settings.config["settings"]["background"]["background_audio_volume"]
"background_audio_volume"
]
if background_audio_volume == 0: if background_audio_volume == 0:
return audio # Return the original audio return audio # Return the original audio
else: else:
@ -170,42 +166,27 @@ def make_final_video(
if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0: if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")] audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert( audio_clips.insert(1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3"))
1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")
)
elif settings.config["settings"]["storymodemethod"] == 1: elif settings.config["settings"]["storymodemethod"] == 1:
audio_clips = [ audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3") ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
for i in track( for i in track(range(number_of_clips + 1), "Collecting the audio files...")
range(number_of_clips + 1), "Collecting the audio files..."
)
] ]
audio_clips.insert( audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")
)
else: else:
audio_clips = [ audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips)
for i in range(number_of_clips)
] ]
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")) audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips_durations = [ audio_clips_durations = [
float( float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"]["duration"])
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"][
"duration"
]
)
for i in range(number_of_clips) for i in range(number_of_clips)
] ]
audio_clips_durations.insert( audio_clips_durations.insert(
0, 0,
float( float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]),
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
) )
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0) audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
ffmpeg.output( ffmpeg.output(
@ -231,19 +212,13 @@ def make_final_video(
if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymode"]:
audio_clips_durations = [ audio_clips_durations = [
float( float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")[ ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")["format"]["duration"]
"format"
]["duration"]
) )
for i in range(number_of_clips) for i in range(number_of_clips)
] ]
audio_clips_durations.insert( audio_clips_durations.insert(
0, 0,
float( float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]),
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
) )
if settings.config["settings"]["storymodemethod"] == 0: if settings.config["settings"]["storymodemethod"] == 0:
image_clips.insert( image_clips.insert(
@ -260,9 +235,7 @@ def make_final_video(
) )
current_time += audio_clips_durations[0] current_time += audio_clips_durations[0]
elif settings.config["settings"]["storymodemethod"] == 1: elif settings.config["settings"]["storymodemethod"] == 1:
for i in track( for i in track(range(0, number_of_clips + 1), "Collecting the image files..."):
range(0, number_of_clips + 1), "Collecting the image files..."
):
image_clips.append( image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter( ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
"scale", screenshot_width, -1 "scale", screenshot_width, -1
@ -278,9 +251,9 @@ def make_final_video(
else: else:
for i in range(0, number_of_clips + 1): for i in range(0, number_of_clips + 1):
image_clips.append( image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")[ ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")["v"].filter(
"v" "scale", screenshot_width, -1
].filter("scale", screenshot_width, -1) )
) )
image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity) image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity)
background_clip = background_clip.overlay( background_clip = background_clip.overlay(
@ -299,15 +272,11 @@ def make_final_video(
subreddit = settings.config["reddit"]["thread"]["subreddit"] subreddit = settings.config["reddit"]["thread"]["subreddit"]
if not exists(f"./results/{subreddit}"): if not exists(f"./results/{subreddit}"):
print_substep( print_substep("The 'results' folder could not be found so it was automatically created.")
"The 'results' folder could not be found so it was automatically created."
)
os.makedirs(f"./results/{subreddit}") os.makedirs(f"./results/{subreddit}")
if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder: if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder:
print_substep( print_substep("The 'OnlyTTS' folder could not be found so it was automatically created.")
"The 'OnlyTTS' folder could not be found so it was automatically created."
)
os.makedirs(f"./results/{subreddit}/OnlyTTS") os.makedirs(f"./results/{subreddit}/OnlyTTS")
# create a thumbnail for the video # create a thumbnail for the video
@ -321,11 +290,7 @@ def make_final_video(
os.makedirs(f"./results/{subreddit}/thumbnails") os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail # get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next( first_image = next(
( (file for file in os.listdir("assets/backgrounds") if file.endswith(".png")),
file
for file in os.listdir("assets/backgrounds")
if file.endswith(".png")
),
None, None,
) )
if first_image is None: if first_image is None:
@ -347,9 +312,7 @@ def make_final_video(
title_thumb, title_thumb,
) )
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png") thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep( print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png"
)
text = f"Background by {background_config['video'][2]}" text = f"Background by {background_config['video'][2]}"
background_clip = ffmpeg.drawtext( background_clip = ffmpeg.drawtext(
@ -390,9 +353,7 @@ def make_final_video(
"b:a": "192k", "b:a": "192k",
"threads": multiprocessing.cpu_count(), "threads": multiprocessing.cpu_count(),
}, },
).overwrite_output().global_args( ).overwrite_output().global_args("-progress", progress.output_file.name).run(
"-progress", progress.output_file.name
).run(
quiet=True, quiet=True,
overwrite_output=True, overwrite_output=True,
capture_stdout=False, capture_stdout=False,
@ -422,9 +383,7 @@ def make_final_video(
"b:a": "192k", "b:a": "192k",
"threads": multiprocessing.cpu_count(), "threads": multiprocessing.cpu_count(),
}, },
).overwrite_output().global_args( ).overwrite_output().global_args("-progress", progress.output_file.name).run(
"-progress", progress.output_file.name
).run(
quiet=True, quiet=True,
overwrite_output=True, overwrite_output=True,
capture_stdout=False, capture_stdout=False,

@ -36,9 +36,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# set the theme and disable non-essential cookies # set the theme and disable non-essential cookies
if settings.config["settings"]["theme"] == "dark": if settings.config["settings"]["theme"] == "dark":
cookie_file = open( cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8")
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255) bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240) txtcolor = (240, 240, 240)
transparent = False transparent = False
@ -48,21 +46,15 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
bgcolor = (0, 0, 0, 0) bgcolor = (0, 0, 0, 0)
txtcolor = (255, 255, 255) txtcolor = (255, 255, 255)
transparent = True transparent = True
cookie_file = open( cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8")
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
else: else:
# Switch to dark theme # Switch to dark theme
cookie_file = open( cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8")
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255) bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240) txtcolor = (240, 240, 240)
transparent = False transparent = False
else: else:
cookie_file = open( cookie_file = open("./video_creation/data/cookie-light-mode.json", encoding="utf-8")
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
bgcolor = (255, 255, 255, 255) bgcolor = (255, 255, 255, 255)
txtcolor = (0, 0, 0) txtcolor = (0, 0, 0)
transparent = False transparent = False
@ -106,12 +98,8 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
page.set_viewport_size(ViewportSize(width=1920, height=1080)) page.set_viewport_size(ViewportSize(width=1920, height=1080))
page.wait_for_load_state() page.wait_for_load_state()
page.locator('[name="username"]').fill( page.locator('[name="username"]').fill(settings.config["reddit"]["creds"]["username"])
settings.config["reddit"]["creds"]["username"] page.locator('[name="password"]').fill(settings.config["reddit"]["creds"]["password"])
)
page.locator('[name="password"]').fill(
settings.config["reddit"]["creds"]["password"]
)
page.locator("button[class$='m-full-width']").click() page.locator("button[class$='m-full-width']").click()
page.wait_for_timeout(5000) page.wait_for_timeout(5000)
@ -192,9 +180,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
location[i] = float("{:.2f}".format(location[i] * zoom)) location[i] = float("{:.2f}".format(location[i] * zoom))
page.screenshot(clip=location, path=postcontentpath) page.screenshot(clip=location, path=postcontentpath)
else: else:
page.locator('[data-test-id="post-content"]').screenshot( page.locator('[data-test-id="post-content"]').screenshot(path=postcontentpath)
path=postcontentpath
)
except Exception as e: except Exception as e:
print_substep("Something went wrong!", style="red") print_substep("Something went wrong!", style="red")
resp = input( resp = input(
@ -208,9 +194,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
"green", "green",
) )
resp = input( resp = input("Do you want the error traceback for debugging purposes? (y/n)")
"Do you want the error traceback for debugging purposes? (y/n)"
)
if not resp.casefold().startswith("y"): if not resp.casefold().startswith("y"):
exit() exit()
@ -255,13 +239,9 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# zoom the body of the page # zoom the body of the page
page.evaluate("document.body.style.zoom=" + str(zoom)) page.evaluate("document.body.style.zoom=" + str(zoom))
# scroll comment into view # scroll comment into view
page.locator( page.locator(f"#t1_{comment['comment_id']}").scroll_into_view_if_needed()
f"#t1_{comment['comment_id']}"
).scroll_into_view_if_needed()
# as zooming the body doesn't change the properties of the divs, we need to adjust for the zoom # as zooming the body doesn't change the properties of the divs, we need to adjust for the zoom
location = page.locator( location = page.locator(f"#t1_{comment['comment_id']}").bounding_box()
f"#t1_{comment['comment_id']}"
).bounding_box()
for i in location: for i in location:
location[i] = float("{:.2f}".format(location[i] * zoom)) location[i] = float("{:.2f}".format(location[i] * zoom))
page.screenshot( page.screenshot(

@ -36,9 +36,7 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
voice = settings.config["settings"]["tts"]["voice_choice"] voice = settings.config["settings"]["tts"]["voice_choice"]
if str(voice).casefold() in map(lambda _: _.casefold(), TTSProviders): if str(voice).casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine( text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
get_case_insensitive_key_value(TTSProviders, voice), reddit_obj
)
else: else:
while True: while True:
print_step("Please choose one of the following TTS providers: ") print_step("Please choose one of the following TTS providers: ")
@ -47,18 +45,12 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break break
print("Unknown Choice") print("Unknown Choice")
text_to_mp3 = TTSEngine( text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj)
get_case_insensitive_key_value(TTSProviders, choice), reddit_obj
)
return text_to_mp3.run() return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key): def get_case_insensitive_key_value(input_dict, key):
return next( return next(
( (value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
value
for dict_key, value in input_dict.items()
if dict_key.lower() == key.lower()
),
None, None,
) )

Loading…
Cancel
Save