chore: reformat

Signed-off-by: Jason Cameron <git@jasoncameron.dev>
pull/2245/head
Jason Cameron 7 months ago
parent 943814150c
commit ed20928974
No known key found for this signature in database

@ -82,7 +82,9 @@ def settings():
# Change settings # Change settings
config = gui.modify_settings(data, config_load, checks) config = gui.modify_settings(data, config_load, checks)
return render_template("settings.html", file="config.toml", data=config, checks=checks) return render_template(
"settings.html", file="config.toml", data=config, checks=checks
)
# Make videos.json accessible # Make videos.json accessible

@ -86,7 +86,9 @@ class TikTok:
"Cookie": f"sessionid={settings.config['settings']['tts']['tiktok_sessionid']}", "Cookie": f"sessionid={settings.config['settings']['tts']['tiktok_sessionid']}",
} }
self.URI_BASE = "https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/" self.URI_BASE = (
"https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
)
self.max_chars = 200 self.max_chars = 200
self._session = requests.Session() self._session = requests.Session()

@ -41,7 +41,9 @@ class AWSPolly:
raise ValueError( raise ValueError(
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}" f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}"
) )
voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize() voice = str(
settings.config["settings"]["tts"]["aws_polly_voice"]
).capitalize()
try: try:
# Request speech synthesis # Request speech synthesis
response = polly.synthesize_speech( response = polly.synthesize_speech(

@ -17,9 +17,13 @@ class elevenlabs:
if random_voice: if random_voice:
voice = self.randomvoice() voice = self.randomvoice()
else: else:
voice = str(settings.config["settings"]["tts"]["elevenlabs_voice_name"]).capitalize() voice = str(
settings.config["settings"]["tts"]["elevenlabs_voice_name"]
).capitalize()
audio = self.client.generate(text=text, voice=voice, model="eleven_multilingual_v1") audio = self.client.generate(
text=text, voice=voice, model="eleven_multilingual_v1"
)
save(audio=audio, filename=filepath) save(audio=audio, filename=filepath)
def initialize(self): def initialize(self):

@ -13,6 +13,7 @@ from utils import settings
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.voice import sanitize_text from utils.voice import sanitize_text
from moviepy.audio.fx import MultiplyVolume from moviepy.audio.fx import MultiplyVolume
DEFAULT_MAX_LENGTH: int = ( DEFAULT_MAX_LENGTH: int = (
50 # Video length variable, edit this on your own risk. It should work, but it's not supported 50 # Video length variable, edit this on your own risk. It should work, but it's not supported
) )
@ -57,7 +58,9 @@ class TTSEngine:
comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"]) comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"])
comment["comment_body"] = comment["comment_body"].replace("\n", ". ") comment["comment_body"] = comment["comment_body"].replace("\n", ". ")
comment["comment_body"] = re.sub(r"\bAI\b", "A.I", comment["comment_body"]) comment["comment_body"] = re.sub(r"\bAI\b", "A.I", comment["comment_body"])
comment["comment_body"] = re.sub(r"\bAGI\b", "A.G.I", comment["comment_body"]) comment["comment_body"] = re.sub(
r"\bAGI\b", "A.G.I", comment["comment_body"]
)
if comment["comment_body"][-1] != ".": if comment["comment_body"][-1] != ".":
comment["comment_body"] += "." comment["comment_body"] += "."
comment["comment_body"] = comment["comment_body"].replace(". . .", ".") comment["comment_body"] = comment["comment_body"].replace(". . .", ".")
@ -79,13 +82,17 @@ class TTSEngine:
if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars: if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars:
self.split_post(self.reddit_object["thread_post"], "postaudio") self.split_post(self.reddit_object["thread_post"], "postaudio")
else: else:
self.call_tts("postaudio", process_text(self.reddit_object["thread_post"])) self.call_tts(
"postaudio", process_text(self.reddit_object["thread_post"])
)
elif settings.config["settings"]["storymodemethod"] == 1: elif settings.config["settings"]["storymodemethod"] == 1:
for idx, text in track(enumerate(self.reddit_object["thread_post"])): for idx, text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}", process_text(text)) self.call_tts(f"postaudio-{idx}", process_text(text))
else: else:
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."): for idx, comment in track(
enumerate(self.reddit_object["comments"]), "Saving..."
):
# ! Stop creating mp3 files if the length is greater than max length. # ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length and idx > 1: if self.length > self.max_length and idx > 1:
self.length -= self.last_clip_length self.length -= self.last_clip_length
@ -182,6 +189,8 @@ def process_text(text: str, clean: bool = True):
new_text = sanitize_text(text) if clean else text new_text = sanitize_text(text) if clean else text
if lang: if lang:
print_substep("Translating Text...") print_substep("Translating Text...")
translated_text = translators.translate_text(text, translator="google", to_language=lang) translated_text = translators.translate_text(
text, translator="google", to_language=lang
)
new_text = sanitize_text(translated_text) new_text = sanitize_text(translated_text)
return new_text return new_text

@ -21,7 +21,9 @@ class pyttsx:
if voice_id == "" or voice_num == "": if voice_id == "" or voice_num == "":
voice_id = 2 voice_id = 2
voice_num = 3 voice_num = 3
raise ValueError("set pyttsx values to a valid value, switching to defaults") raise ValueError(
"set pyttsx values to a valid value, switching to defaults"
)
else: else:
voice_id = int(voice_id) voice_id = int(voice_id)
voice_num = int(voice_num) voice_num = int(voice_num)

@ -42,7 +42,9 @@ class StreamlabsPolly:
raise ValueError( raise ValueError(
f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}" f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}"
) )
voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize() voice = str(
settings.config["settings"]["tts"]["streamlabs_polly_voice"]
).capitalize()
body = {"voice": voice, "text": text, "service": "polly"} body = {"voice": voice, "text": text, "service": "polly"}
headers = {"Referer": "https://streamlabs.com/"} headers = {"Referer": "https://streamlabs.com/"}

@ -106,7 +106,9 @@ if __name__ == "__main__":
sys.exit() sys.exit()
try: try:
if config["reddit"]["thread"]["post_id"]: if config["reddit"]["thread"]["post_id"]:
for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")): for index, post_id in enumerate(
config["reddit"]["thread"]["post_id"].split("+")
):
index += 1 index += 1
print_step( print_step(
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}' f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'

@ -22,7 +22,9 @@ def get_subreddit_threads(POST_ID: str):
content = {} content = {}
if settings.config["reddit"]["creds"]["2fa"]: if settings.config["reddit"]["creds"]["2fa"]:
print("\nEnter your two-factor authentication code from your authenticator app.\n") print(
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
code = input("> ") code = input("> ")
print() print()
pw = settings.config["reddit"]["creds"]["password"] pw = settings.config["reddit"]["creds"]["password"]
@ -55,7 +57,9 @@ def get_subreddit_threads(POST_ID: str):
]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython") ]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
try: try:
subreddit = reddit.subreddit( subreddit = reddit.subreddit(
re.sub(r"r\/", "", input("What subreddit would you like to pull from? ")) re.sub(
r"r\/", "", input("What subreddit would you like to pull from? ")
)
# removes the r/ from the input # removes the r/ from the input
) )
except ValueError: except ValueError:
@ -65,7 +69,9 @@ def get_subreddit_threads(POST_ID: str):
sub = settings.config["reddit"]["thread"]["subreddit"] sub = settings.config["reddit"]["thread"]["subreddit"]
print_substep(f"Using subreddit: r/{sub} from TOML config") print_substep(f"Using subreddit: r/{sub} from TOML config")
subreddit_choice = sub subreddit_choice = sub
if str(subreddit_choice).casefold().startswith("r/"): # removes the r/ from the input if (
str(subreddit_choice).casefold().startswith("r/")
): # removes the r/ from the input
subreddit_choice = subreddit_choice[2:] subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit(subreddit_choice) subreddit = reddit.subreddit(subreddit_choice)
@ -76,8 +82,12 @@ def get_subreddit_threads(POST_ID: str):
settings.config["reddit"]["thread"]["post_id"] settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1 and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
): ):
submission = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"]) submission = reddit.submission(
elif settings.config["ai"]["ai_similarity_enabled"]: # ai sorting based on comparison id=settings.config["reddit"]["thread"]["post_id"]
)
elif settings.config["ai"][
"ai_similarity_enabled"
]: # ai sorting based on comparison
threads = subreddit.hot(limit=50) threads = subreddit.hot(limit=50)
keywords = settings.config["ai"]["ai_similarity_keywords"].split(",") keywords = settings.config["ai"]["ai_similarity_keywords"].split(",")
keywords = [keyword.strip() for keyword in keywords] keywords = [keyword.strip() for keyword in keywords]
@ -95,7 +105,10 @@ def get_subreddit_threads(POST_ID: str):
if submission is None: if submission is None:
return get_subreddit_threads(POST_ID) # submission already done. rerun return get_subreddit_threads(POST_ID) # submission already done. rerun
elif not submission.num_comments and settings.config["settings"]["storymode"] == "false": elif (
not submission.num_comments
and settings.config["settings"]["storymode"] == "false"
):
print_substep("No comments found. Skipping.") print_substep("No comments found. Skipping.")
exit() exit()

@ -5,8 +5,12 @@ from transformers import AutoModel, AutoTokenizer
# Mean Pooling - Take attention mask into account for correct averaging # Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask): def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[0] # First element of model_output contains all token embeddings token_embeddings = model_output[
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() 0
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9 input_mask_expanded.sum(1), min=1e-9
) )
@ -32,13 +36,19 @@ def sort_by_similarity(thread_objects, keywords):
) )
with torch.no_grad(): with torch.no_grad():
threads_embeddings = model(**encoded_threads) threads_embeddings = model(**encoded_threads)
threads_embeddings = mean_pooling(threads_embeddings, encoded_threads["attention_mask"]) threads_embeddings = mean_pooling(
threads_embeddings, encoded_threads["attention_mask"]
)
# Keyword inference # Keyword inference
encoded_keywords = tokenizer(keywords, padding=True, truncation=True, return_tensors="pt") encoded_keywords = tokenizer(
keywords, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad(): with torch.no_grad():
keywords_embeddings = model(**encoded_keywords) keywords_embeddings = model(**encoded_keywords)
keywords_embeddings = mean_pooling(keywords_embeddings, encoded_keywords["attention_mask"]) keywords_embeddings = mean_pooling(
keywords_embeddings, encoded_keywords["attention_mask"]
)
# Compare every keyword w/ every thread embedding # Compare every keyword w/ every thread embedding
threads_embeddings_tensor = torch.tensor(threads_embeddings) threads_embeddings_tensor = torch.tensor(threads_embeddings)

@ -49,7 +49,10 @@ def handle_input(
optional=False, optional=False,
): ):
if optional: if optional:
console.print(message + "\n[green]This is an optional value. Do you want to skip it? (y/n)") console.print(
message
+ "\n[green]This is an optional value. Do you want to skip it? (y/n)"
)
if input().casefold().startswith("y"): if input().casefold().startswith("y"):
return default if default is not NotImplemented else "" return default if default is not NotImplemented else ""
if default is not NotImplemented: if default is not NotImplemented:
@ -83,7 +86,11 @@ def handle_input(
console.print("[red]" + err_message) console.print("[red]" + err_message)
continue continue
elif match != "" and re.match(match, user_input) is None: elif match != "" and re.match(match, user_input) is None:
console.print("[red]" + err_message + "\nAre you absolutely sure it's correct?(y/n)") console.print(
"[red]"
+ err_message
+ "\nAre you absolutely sure it's correct?(y/n)"
)
if input().casefold().startswith("y"): if input().casefold().startswith("y"):
break break
continue continue
@ -116,5 +123,9 @@ def handle_input(
if user_input in options: if user_input in options:
return user_input return user_input
console.print( console.print(
"[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + "." "[red bold]"
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
) )

@ -7,9 +7,7 @@ import requests
def ffmpeg_install_windows(): def ffmpeg_install_windows():
try: try:
ffmpeg_url = ( ffmpeg_url = "https://github.com/GyanD/codexffmpeg/releases/download/6.0/ffmpeg-6.0-full_build.zip"
"https://github.com/GyanD/codexffmpeg/releases/download/6.0/ffmpeg-6.0-full_build.zip"
)
ffmpeg_zip_filename = "ffmpeg.zip" ffmpeg_zip_filename = "ffmpeg.zip"
ffmpeg_extracted_folder = "ffmpeg" ffmpeg_extracted_folder = "ffmpeg"
@ -129,7 +127,9 @@ def ffmpeg_install():
elif os.name == "mac": elif os.name == "mac":
ffmpeg_install_mac() ffmpeg_install_mac()
else: else:
print("Your OS is not supported. Please install FFmpeg manually and try again.") print(
"Your OS is not supported. Please install FFmpeg manually and try again."
)
exit() exit()
else: else:
print("Please install FFmpeg manually and try again.") print("Please install FFmpeg manually and try again.")

@ -69,7 +69,11 @@ def check(value, checks):
and not hasattr(value, "__iter__") and not hasattr(value, "__iter__")
and ( and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"]) ("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or ("nmax" in checks and checks["nmax"] is not None and value > checks["nmax"]) or (
"nmax" in checks
and checks["nmax"] is not None
and value > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -78,8 +82,16 @@ def check(value, checks):
not incorrect not incorrect
and hasattr(value, "__iter__") and hasattr(value, "__iter__")
and ( and (
("nmin" in checks and checks["nmin"] is not None and len(value) < checks["nmin"]) (
or ("nmax" in checks and checks["nmax"] is not None and len(value) > checks["nmax"]) "nmin" in checks
and checks["nmin"] is not None
and len(value) < checks["nmin"]
)
or (
"nmax" in checks
and checks["nmax"] is not None
and len(value) > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -152,7 +164,9 @@ def delete_background(key):
# Add background video # Add background video
def add_background(youtube_uri, filename, citation, position): def add_background(youtube_uri, filename, citation, position):
# Validate YouTube URI # Validate YouTube URI
regex = re.compile(r"(?:\/|%3D|v=|vi=)([0-9A-z\-_]{11})(?:[%#?&]|$)").search(youtube_uri) regex = re.compile(r"(?:\/|%3D|v=|vi=)([0-9A-z\-_]{11})(?:[%#?&]|$)").search(
youtube_uri
)
if not regex: if not regex:
flash("YouTube URI is invalid!", "error") flash("YouTube URI is invalid!", "error")

@ -20,7 +20,9 @@ def draw_multiple_line_text(
font_height = getheight(font, text) font_height = getheight(font, text)
image_width, image_height = image.size image_width, image_height = image.size
lines = textwrap.wrap(text, width=wrap) lines = textwrap.wrap(text, width=wrap)
y = (image_height / 2) - (((font_height + (len(lines) * padding) / len(lines)) * len(lines)) / 2) y = (image_height / 2) - (
((font_height + (len(lines) * padding) / len(lines)) * len(lines)) / 2
)
for line in lines: for line in lines:
line_width, line_height = getsize(font, line) line_width, line_height = getsize(font, line)
if transparent: if transparent:
@ -70,5 +72,7 @@ def imagemaker(theme, reddit_obj: dict, txtclr, padding=5, transparent=False) ->
for idx, text in track(enumerate(texts), "Rendering Image"): for idx, text in track(enumerate(texts), "Rendering Image"):
image = Image.new("RGBA", size, theme) image = Image.new("RGBA", size, theme)
text = process_text(text, False) text = process_text(text, False)
draw_multiple_line_text(image, text, font, txtclr, padding, wrap=30, transparent=transparent) draw_multiple_line_text(
image, text, font, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{reddit_id}/png/img{idx}.png") image.save(f"assets/temp/{reddit_id}/png/img{idx}.png")

@ -1,5 +1,7 @@
def clear_cookie_by_name(context, cookie_cleared_name): def clear_cookie_by_name(context, cookie_cleared_name):
cookies = context.cookies() cookies = context.cookies()
filtered_cookies = [cookie for cookie in cookies if cookie["name"] != cookie_cleared_name] filtered_cookies = [
cookie for cookie in cookies if cookie["name"] != cookie_cleared_name
]
context.clear_cookies() context.clear_cookies()
context.add_cookies(filtered_cookies) context.add_cookies(filtered_cookies)

@ -53,7 +53,11 @@ def check(value, checks, name):
and not hasattr(value, "__iter__") and not hasattr(value, "__iter__")
and ( and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"]) ("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or ("nmax" in checks and checks["nmax"] is not None and value > checks["nmax"]) or (
"nmax" in checks
and checks["nmax"] is not None
and value > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -61,8 +65,16 @@ def check(value, checks, name):
not incorrect not incorrect
and hasattr(value, "__iter__") and hasattr(value, "__iter__")
and ( and (
("nmin" in checks and checks["nmin"] is not None and len(value) < checks["nmin"]) (
or ("nmax" in checks and checks["nmax"] is not None and len(value) > checks["nmax"]) "nmin" in checks
and checks["nmin"] is not None
and len(value) < checks["nmin"]
)
or (
"nmax" in checks
and checks["nmax"] is not None
and len(value) > checks["nmax"]
)
) )
): ):
incorrect = True incorrect = True
@ -70,9 +82,15 @@ def check(value, checks, name):
if incorrect: if incorrect:
value = handle_input( value = handle_input(
message=( message=(
(("[blue]Example: " + str(checks["example"]) + "\n") if "example" in checks else "") (
("[blue]Example: " + str(checks["example"]) + "\n")
if "example" in checks
else ""
)
+ "[red]" + "[red]"
+ ("Non-optional ", "Optional ")["optional" in checks and checks["optional"] is True] + ("Non-optional ", "Optional ")[
"optional" in checks and checks["optional"] is True
]
) )
+ "[#C0CAF5 bold]" + "[#C0CAF5 bold]"
+ str(name) + str(name)
@ -113,7 +131,9 @@ def check_toml(template_file, config_file) -> Tuple[bool, Dict]:
try: try:
template = toml.load(template_file) template = toml.load(template_file)
except Exception as error: except Exception as error:
console.print(f"[red bold]Encountered error when trying to to load {template_file}: {error}") console.print(
f"[red bold]Encountered error when trying to to load {template_file}: {error}"
)
return False return False
try: try:
config = toml.load(config_file) config = toml.load(config_file)

@ -6,7 +6,9 @@ from utils.ai_methods import sort_by_similarity
from utils.console import print_substep from utils.console import print_substep
def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similarity_scores=None): def get_subreddit_undone(
submissions: list, subreddit, times_checked=0, similarity_scores=None
):
"""_summary_ """_summary_
Args: Args:
@ -18,7 +20,9 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
""" """
# Second try of getting a valid Submission # Second try of getting a valid Submission
if times_checked and settings.config["ai"]["ai_similarity_enabled"]: if times_checked and settings.config["ai"]["ai_similarity_enabled"]:
print("Sorting based on similarity for a different date filter and thread limit..") print(
"Sorting based on similarity for a different date filter and thread limit.."
)
submissions = sort_by_similarity( submissions = sort_by_similarity(
submissions, keywords=settings.config["ai"]["ai_similarity_enabled"] submissions, keywords=settings.config["ai"]["ai_similarity_enabled"]
) )
@ -27,7 +31,9 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
if not exists("./video_creation/data/videos.json"): if not exists("./video_creation/data/videos.json"):
with open("./video_creation/data/videos.json", "w+") as f: with open("./video_creation/data/videos.json", "w+") as f:
json.dump([], f) json.dump([], f)
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw: with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for i, submission in enumerate(submissions): for i, submission in enumerate(submissions):
if already_done(done_videos, submission): if already_done(done_videos, submission):
@ -43,7 +49,8 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
print_substep("This post was pinned by moderators. Skipping...") print_substep("This post was pinned by moderators. Skipping...")
continue continue
if ( if (
submission.num_comments <= int(settings.config["reddit"]["thread"]["min_comments"]) submission.num_comments
<= int(settings.config["reddit"]["thread"]["min_comments"])
and not settings.config["settings"]["storymode"] and not settings.config["settings"]["storymode"]
): ):
print_substep( print_substep(
@ -52,7 +59,9 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
continue continue
if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymode"]:
if not submission.selftext: if not submission.selftext:
print_substep("You are trying to use story mode on post with no post text") print_substep(
"You are trying to use story mode on post with no post text"
)
continue continue
else: else:
# Check for the length of the post text # Check for the length of the post text

@ -1,11 +1,15 @@
from PIL import ImageDraw, ImageFont from PIL import ImageDraw, ImageFont
def create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title): def create_thumbnail(
thumbnail, font_family, font_size, font_color, width, height, title
):
font = ImageFont.truetype(font_family + ".ttf", font_size) font = ImageFont.truetype(font_family + ".ttf", font_size)
Xaxis = width - (width * 0.2) # 20% of the width Xaxis = width - (width * 0.2) # 20% of the width
sizeLetterXaxis = font_size * 0.5 # 50% of the font size sizeLetterXaxis = font_size * 0.5 # 50% of the font size
XaxisLetterQty = round(Xaxis / sizeLetterXaxis) # Quantity of letters that can fit in the X axis XaxisLetterQty = round(
Xaxis / sizeLetterXaxis
) # Quantity of letters that can fit in the X axis
MarginYaxis = height * 0.12 # 12% of the height MarginYaxis = height * 0.12 # 12% of the height
MarginXaxis = width * 0.05 # 5% of the width MarginXaxis = width * 0.05 # 5% of the width
# 1.1 rem # 1.1 rem
@ -30,6 +34,8 @@ def create_thumbnail(thumbnail, font_family, font_size, font_color, width, heigh
# loop for put the title in the thumbnail # loop for put the title in the thumbnail
for i in range(0, len(arrayTitle)): for i in range(0, len(arrayTitle)):
# 1.1 rem # 1.1 rem
draw.text((MarginXaxis, MarginYaxis + (LineHeight * i)), arrayTitle[i], rgb, font=font) draw.text(
(MarginXaxis, MarginYaxis + (LineHeight * i)), arrayTitle[i], rgb, font=font
)
return thumbnail return thumbnail

@ -19,7 +19,9 @@ def check_done(
Returns: Returns:
Submission|None: Reddit object in args Submission|None: Reddit object in args
""" """
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw: with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for video in done_videos: for video in done_videos:
if video["id"] == str(redditobj): if video["id"] == str(redditobj):
@ -33,7 +35,9 @@ def check_done(
return redditobj return redditobj
def save_data(subreddit: str, filename: str, reddit_title: str, reddit_id: str, credit: str): def save_data(
subreddit: str, filename: str, reddit_title: str, reddit_id: str, credit: str
):
"""Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json """Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json
Args: Args:

@ -43,7 +43,9 @@ def sleep_until(time) -> None:
if sys.version_info[0] >= 3 and time.tzinfo: if sys.version_info[0] >= 3 and time.tzinfo:
end = time.astimezone(timezone.utc).timestamp() end = time.astimezone(timezone.utc).timestamp()
else: else:
zoneDiff = pytime.time() - (datetime.now() - datetime(1970, 1, 1)).total_seconds() zoneDiff = (
pytime.time() - (datetime.now() - datetime(1970, 1, 1)).total_seconds()
)
end = (time - datetime(1970, 1, 1)).total_seconds() + zoneDiff end = (time - datetime(1970, 1, 1)).total_seconds() + zoneDiff
# Type check # Type check

@ -60,7 +60,9 @@ def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int
def get_background_config(mode: str): def get_background_config(mode: str):
"""Fetch the background/s configuration""" """Fetch the background/s configuration"""
try: try:
choice = str(settings.config["settings"]["background"][f"background_{mode}"]).casefold() choice = str(
settings.config["settings"]["background"][f"background_{mode}"]
).casefold()
except AttributeError: except AttributeError:
print_substep("No background selected. Picking random background'") print_substep("No background selected. Picking random background'")
choice = None choice = None
@ -120,7 +122,9 @@ def download_background_audio(background_config: Tuple[str, str, str]):
print_substep("Background audio downloaded successfully! 🎉", style="bold green") print_substep("Background audio downloaded successfully! 🎉", style="bold green")
def chop_background(background_config: Dict[str, Tuple], video_length: int, reddit_object: dict): def chop_background(
background_config: Dict[str, Tuple], video_length: int, reddit_object: dict
):
"""Generates the background audio and footage to be used in the video and writes it to assets/temp/background.mp3 and assets/temp/background.mp4 """Generates the background audio and footage to be used in the video and writes it to assets/temp/background.mp3 and assets/temp/background.mp4
Args: Args:
@ -134,7 +138,9 @@ def chop_background(background_config: Dict[str, Tuple], video_length: int, redd
print_step("Volume was set to 0. Skipping background audio creation . . .") print_step("Volume was set to 0. Skipping background audio creation . . .")
else: else:
print_step("Finding a spot in the backgrounds audio to chop...✂️") print_step("Finding a spot in the backgrounds audio to chop...✂️")
audio_choice = f"{background_config['audio'][2]}-{background_config['audio'][1]}" audio_choice = (
f"{background_config['audio'][2]}-{background_config['audio'][1]}"
)
background_audio = AudioFileClip(f"assets/backgrounds/audio/{audio_choice}") background_audio = AudioFileClip(f"assets/backgrounds/audio/{audio_choice}")
start_time_audio, end_time_audio = get_start_and_end_times( start_time_audio, end_time_audio = get_start_and_end_times(
video_length, background_audio.duration video_length, background_audio.duration
@ -153,7 +159,7 @@ def chop_background(background_config: Dict[str, Tuple], video_length: int, redd
with VideoFileClip(f"assets/backgrounds/video/{video_choice}") as video: with VideoFileClip(f"assets/backgrounds/video/{video_choice}") as video:
new = video.subclipped(start_time_video, end_time_video) new = video.subclipped(start_time_video, end_time_video)
new.write_videofile(f"assets/temp/{thread_id}/background.mp4") new.write_videofile(f"assets/temp/{thread_id}/background.mp4")
except (OSError, IOError): # ffmpeg issue see #348 except (OSError, IOError): # ffmpeg issue see #348
print_substep("FFMPEG issue. Trying again...") print_substep("FFMPEG issue. Trying again...")
ffmpeg_extract_subclip( ffmpeg_extract_subclip(

@ -78,7 +78,9 @@ def name_normalize(name: str) -> str:
lang = settings.config["reddit"]["thread"]["post_lang"] lang = settings.config["reddit"]["thread"]["post_lang"]
if lang: if lang:
print_substep("Translating filename...") print_substep("Translating filename...")
translated_name = translators.translate_text(name, translator="google", to_language=lang) translated_name = translators.translate_text(
name, translator="google", to_language=lang
)
return translated_name return translated_name
else: else:
return name return name
@ -141,8 +143,12 @@ def create_fancy_thumbnail(image, text, text_color, padding, wrap=35):
bottom_part_height = image_height - top_part_height - middle_part_height bottom_part_height = image_height - top_part_height - middle_part_height
top_part = image.crop((0, 0, image_width, top_part_height)) top_part = image.crop((0, 0, image_width, top_part_height))
middle_part = image.crop((0, top_part_height, image_width, top_part_height + middle_part_height)) middle_part = image.crop(
bottom_part = image.crop((0, top_part_height + middle_part_height, image_width, image_height)) (0, top_part_height, image_width, top_part_height + middle_part_height)
)
bottom_part = image.crop(
(0, top_part_height + middle_part_height, image_width, image_height)
)
# Stretch the middle part # Stretch the middle part
new_middle_height = new_image_height - top_part_height - bottom_part_height new_middle_height = new_image_height - top_part_height - bottom_part_height
@ -182,7 +188,9 @@ def merge_background_audio(audio: ffmpeg, reddit_id: str):
audio (ffmpeg): The TTS final audio but without background. audio (ffmpeg): The TTS final audio but without background.
reddit_id (str): The ID of subreddit reddit_id (str): The ID of subreddit
""" """
background_audio_volume = settings.config["settings"]["background"]["background_audio_volume"] background_audio_volume = settings.config["settings"]["background"][
"background_audio_volume"
]
if background_audio_volume == 0: if background_audio_volume == 0:
return audio # Return the original audio return audio # Return the original audio
else: else:
@ -236,27 +244,42 @@ def make_final_video(
if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0: if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")] audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert(1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")) audio_clips.insert(
1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")
)
elif settings.config["settings"]["storymodemethod"] == 1: elif settings.config["settings"]["storymodemethod"] == 1:
audio_clips = [ audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3") ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
for i in track(range(number_of_clips + 1), "Collecting the audio files...") for i in track(
range(number_of_clips + 1), "Collecting the audio files..."
)
] ]
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")) audio_clips.insert(
0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")
)
else: else:
audio_clips = [ audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips) ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3")
for i in range(number_of_clips)
] ]
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")) audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips_durations = [ audio_clips_durations = [
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"]["duration"]) float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"][
"duration"
]
)
for i in range(number_of_clips) for i in range(number_of_clips)
] ]
audio_clips_durations.insert( audio_clips_durations.insert(
0, 0,
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]), float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
) )
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0) audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
ffmpeg.output( ffmpeg.output(
@ -299,13 +322,19 @@ def make_final_video(
if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymode"]:
audio_clips_durations = [ audio_clips_durations = [
float( float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")["format"]["duration"] ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")[
"format"
]["duration"]
) )
for i in range(number_of_clips) for i in range(number_of_clips)
] ]
audio_clips_durations.insert( audio_clips_durations.insert(
0, 0,
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]), float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
) )
if settings.config["settings"]["storymodemethod"] == 0: if settings.config["settings"]["storymodemethod"] == 0:
image_clips.insert( image_clips.insert(
@ -322,7 +351,9 @@ def make_final_video(
) )
current_time += audio_clips_durations[0] current_time += audio_clips_durations[0]
elif settings.config["settings"]["storymodemethod"] == 1: elif settings.config["settings"]["storymodemethod"] == 1:
for i in track(range(0, number_of_clips + 1), "Collecting the image files..."): for i in track(
range(0, number_of_clips + 1), "Collecting the image files..."
):
image_clips.append( image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter( ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
"scale", screenshot_width, -1 "scale", screenshot_width, -1
@ -338,9 +369,9 @@ def make_final_video(
else: else:
for i in range(0, number_of_clips + 1): for i in range(0, number_of_clips + 1):
image_clips.append( image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")["v"].filter( ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")[
"scale", screenshot_width, -1 "v"
) ].filter("scale", screenshot_width, -1)
) )
image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity) image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity)
assert ( assert (
@ -362,11 +393,15 @@ def make_final_video(
subreddit = settings.config["reddit"]["thread"]["subreddit"] subreddit = settings.config["reddit"]["thread"]["subreddit"]
if not exists(f"./results/{subreddit}"): if not exists(f"./results/{subreddit}"):
print_substep("The 'results' folder could not be found so it was automatically created.") print_substep(
"The 'results' folder could not be found so it was automatically created."
)
os.makedirs(f"./results/{subreddit}") os.makedirs(f"./results/{subreddit}")
if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder: if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder:
print_substep("The 'OnlyTTS' folder could not be found so it was automatically created.") print_substep(
"The 'OnlyTTS' folder could not be found so it was automatically created."
)
os.makedirs(f"./results/{subreddit}/OnlyTTS") os.makedirs(f"./results/{subreddit}/OnlyTTS")
# create a thumbnail for the video # create a thumbnail for the video
@ -380,7 +415,11 @@ def make_final_video(
os.makedirs(f"./results/{subreddit}/thumbnails") os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail # get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next( first_image = next(
(file for file in os.listdir("assets/backgrounds") if file.endswith(".png")), (
file
for file in os.listdir("assets/backgrounds")
if file.endswith(".png")
),
None, None,
) )
if first_image is None: if first_image is None:
@ -402,7 +441,9 @@ def make_final_video(
title_thumb, title_thumb,
) )
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png") thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png") print_substep(
f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png"
)
text = f"Background by {background_config['video'][2]}" text = f"Background by {background_config['video'][2]}"
background_clip = ffmpeg.drawtext( background_clip = ffmpeg.drawtext(
@ -443,7 +484,9 @@ def make_final_video(
"b:a": "192k", "b:a": "192k",
"threads": multiprocessing.cpu_count(), "threads": multiprocessing.cpu_count(),
}, },
).overwrite_output().global_args("-progress", progress.output_file.name).run( ).overwrite_output().global_args(
"-progress", progress.output_file.name
).run(
quiet=True, quiet=True,
overwrite_output=True, overwrite_output=True,
capture_stdout=False, capture_stdout=False,
@ -473,7 +516,9 @@ def make_final_video(
"b:a": "192k", "b:a": "192k",
"threads": multiprocessing.cpu_count(), "threads": multiprocessing.cpu_count(),
}, },
).overwrite_output().global_args("-progress", progress.output_file.name).run( ).overwrite_output().global_args(
"-progress", progress.output_file.name
).run(
quiet=True, quiet=True,
overwrite_output=True, overwrite_output=True,
capture_stdout=False, capture_stdout=False,

@ -36,7 +36,9 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
voice = settings.config["settings"]["tts"]["voice_choice"] voice = settings.config["settings"]["tts"]["voice_choice"]
if str(voice).casefold() in map(lambda _: _.casefold(), TTSProviders): if str(voice).casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj) text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, voice), reddit_obj
)
else: else:
while True: while True:
print_step("Please choose one of the following TTS providers: ") print_step("Please choose one of the following TTS providers: ")
@ -45,12 +47,18 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break break
print("Unknown Choice") print("Unknown Choice")
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj) text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, choice), reddit_obj
)
return text_to_mp3.run() return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key): def get_case_insensitive_key_value(input_dict, key):
return next( return next(
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()), (
value
for dict_key, value in input_dict.items()
if dict_key.lower() == key.lower()
),
None, None,
) )

Loading…
Cancel
Save