pull/1550/head
Simon 2 years ago
parent 5b8964dbe0
commit 8b103437af

@ -78,14 +78,17 @@ vocals: Final[tuple] = (
class TikTok:
"""TikTok Text-to-Speech Wrapper"""
def __init__(self):
headers = {
"User-Agent": "com.zhiliaoapp.musically/2022600030 (Linux; U; Android 7.1.2; es_ES; SM-G988N; "
"Build/NRD90M;tt-ok/3.12.13.1)",
"Cookie": f"sessionid={settings.config['settings']['tts']['tiktok_sessionid']}",
}
self.URI_BASE = "https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
self.URI_BASE = (
"https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
)
self.max_chars = 300
self._session = requests.Session()
@ -111,7 +114,9 @@ class TikTok:
try:
raw_voices = data["data"]["v_str"]
except:
print("The TikTok TTS returned an invalid response. Please try again later, and report this bug.")
print(
"The TikTok TTS returned an invalid response. Please try again later, and report this bug."
)
raise TikTokTTSException(0, "Invalid response")
decoded_voices = base64.b64decode(raw_voices)
@ -159,4 +164,4 @@ class TikTokTTSException(Exception):
if self._code == 4:
return f"Code: {self._code}, reason: the speaker doesn't exist, message: {self._message}"
return f"Code: {self._message}, reason: unknown, message: {self._message}"
return f"Code: {self._message}, reason: unknown, message: {self._message}"

@ -17,7 +17,7 @@ from utils import settings
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
DEFAULT_MAX_LENGTH: int = 50 # video length variable
DEFAULT_MAX_LENGTH: int = 50 # video length variable
class TTSEngine:
@ -51,17 +51,18 @@ class TTSEngine:
self.length = 0
self.last_clip_length = last_clip_length
def add_periods(self): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences
def add_periods(
self,
): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences
for comment in self.reddit_object["comments"]:
comment["comment_body"] = comment["comment_body"].replace('\n', '. ')
if comment["comment_body"][-1] != '.':
comment["comment_body"] += '.'
comment["comment_body"] = comment["comment_body"].replace("\n", ". ")
if comment["comment_body"][-1] != ".":
comment["comment_body"] += "."
def run(self) -> Tuple[int, int]:
Path(self.path).mkdir(parents=True, exist_ok=True)
print_step("Saving Text to MP3 files...")
self.add_periods()
self.call_tts("title", process_text(self.reddit_object["thread_title"]))
# processed_text = ##self.reddit_object["thread_post"] != ""
@ -76,12 +77,10 @@ class TTSEngine:
"postaudio", process_text(self.reddit_object["thread_post"])
)
elif settings.config["settings"]["storymodemethod"] == 1:
for idx, text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}", process_text(text))
else:
for idx, comment in track(
enumerate(self.reddit_object["comments"]), "Saving..."
):
@ -143,10 +142,10 @@ class TTSEngine:
def call_tts(self, filename: str, text: str):
self.tts_module.run(text, filepath=f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
try:
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.last_clip_length = clip.duration
@ -168,7 +167,7 @@ class TTSEngine:
)
def process_text(text: str , clean : bool = True):
def process_text(text: str, clean: bool = True):
lang = settings.config["reddit"]["thread"]["post_lang"]
new_text = sanitize_text(text) if clean else text
if lang:

@ -14,24 +14,31 @@ from utils.cleanup import cleanup
from utils.console import print_markdown, print_step
from utils.id import id
from utils.version import checkversion
from video_creation.background import (download_background, chop_background_video, get_background_config, )
from video_creation.background import (
download_background,
chop_background_video,
get_background_config,
)
from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import get_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3
__VERSION__ = "3.0.1"
print("""
print(
"""
""")
"""
)
# Modified by JasonLovesDoggo
print_markdown(
"### Thanks for using this tool! [Feel free to contribute to this project on GitHub!](https://lewismenelaws.com) If you have any questions, feel free to reach out to me on Twitter or submit a GitHub issue. You can find solutions to many common problems in the [Documentation](): https://reddit-video-maker-bot.netlify.app/")
"### Thanks for using this tool! [Feel free to contribute to this project on GitHub!](https://lewismenelaws.com) If you have any questions, feel free to reach out to me on Twitter or submit a GitHub issue. You can find solutions to many common problems in the [Documentation](): https://reddit-video-maker-bot.netlify.app/"
)
checkversion(__VERSION__)
@ -51,7 +58,8 @@ def main(POST_ID=None) -> None:
def run_many(times) -> None:
for x in range(1, times + 1):
print_step(
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}') # correct 1st 2nd 3rd 4th 5th....
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}'
) # correct 1st 2nd 3rd 4th 5th....
main()
Popen("cls" if name == "nt" else "clear", shell=True).wait()
@ -72,18 +80,28 @@ def shutdown():
if __name__ == "__main__":
assert sys.version_info >= (3, 9), "Python 3.10 or higher is required"
directory = Path().absolute()
config = settings.check_toml(f"{directory}/utils/.config.template.toml", "config.toml")
config = settings.check_toml(
f"{directory}/utils/.config.template.toml", "config.toml"
)
config is False and exit()
if (not settings.config['settings']['tts']['tiktok_sessionid'] or settings.config['settings']['tts'][
'tiktok_sessionid'] == "") and config["settings"]["tts"]["voice_choice"] == "tiktok":
print_substep("TikTok voice requires a sessionid! Check our documentation on how to obtain one.", "bold red")
if (
not settings.config["settings"]["tts"]["tiktok_sessionid"]
or settings.config["settings"]["tts"]["tiktok_sessionid"] == ""
) and config["settings"]["tts"]["voice_choice"] == "tiktok":
print_substep(
"TikTok voice requires a sessionid! Check our documentation on how to obtain one.",
"bold red",
)
exit()
try:
if config["reddit"]["thread"]["post_id"]:
for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")):
for index, post_id in enumerate(
config["reddit"]["thread"]["post_id"].split("+")
):
index += 1
print_step(
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}')
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'
)
main(post_id)
Popen("cls" if name == "nt" else "clear", shell=True).wait()
elif config["settings"]["times_to_run"]:
@ -101,8 +119,9 @@ if __name__ == "__main__":
except Exception as err:
config["settings"]["tts"]["tiktok_sessionid"] = "REDACTED"
print_step(
f'Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n'
f'Version: {__VERSION__} \n'
f'Error: {err} \n'
f'Config: {config["settings"]}')
f"Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n"
f"Version: {__VERSION__} \n"
f"Error: {err} \n"
f'Config: {config["settings"]}'
)
raise err

@ -84,22 +84,28 @@ def get_subreddit_threads(POST_ID: str):
settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
):
submission = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"])
elif settings.config["ai"]["ai_similarity_enabled"]: # ai sorting based on comparison
submission = reddit.submission(
id=settings.config["reddit"]["thread"]["post_id"]
)
elif settings.config["ai"][
"ai_similarity_enabled"
]: # ai sorting based on comparison
threads = subreddit.hot(limit=50)
keywords = settings.config["ai"]["ai_similarity_keywords"].split(',')
keywords = settings.config["ai"]["ai_similarity_keywords"].split(",")
keywords = [keyword.strip() for keyword in keywords]
# Reformat the keywords for printing
keywords_print = ", ".join(keywords)
print(f'Sorting threads by similarity to the given keywords: {keywords_print}')
print(f"Sorting threads by similarity to the given keywords: {keywords_print}")
threads, similarity_scores = sort_by_similarity(threads, keywords)
submission, similarity_score = get_subreddit_undone(threads, subreddit, similarity_scores=similarity_scores)
submission, similarity_score = get_subreddit_undone(
threads, subreddit, similarity_scores=similarity_scores
)
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
if submission is None:
return get_subreddit_threads(POST_ID) # submission already done. rerun
return get_subreddit_threads(POST_ID) # submission already done. rerun
if settings.config["settings"]["storymode"]:
if not submission.selftext:
@ -107,7 +113,9 @@ def get_subreddit_threads(POST_ID: str):
exit()
else:
# Check for the length of the post text
if len(submission.selftext) > (settings.config["settings"]["storymode_max_length"] or 2000):
if len(submission.selftext) > (
settings.config["settings"]["storymode_max_length"] or 2000
):
print_substep(
f"Post is too long ({len(submission.selftext)}), try with a different post. ({settings.config['settings']['storymode_max_length']} character limit)"
)
@ -129,7 +137,10 @@ def get_subreddit_threads(POST_ID: str):
print_substep(f"Thread has a upvote ratio of {ratio}%", style="bold blue")
print_substep(f"Thread has {num_comments} comments", style="bold blue")
if similarity_score:
print_substep(f"Thread has a similarity score up to {round(similarity_score * 100)}%", style="bold blue")
print_substep(
f"Thread has a similarity score up to {round(similarity_score * 100)}%",
style="bold blue",
)
content["thread_url"] = threadurl
content["thread_title"] = submission.title
@ -158,7 +169,6 @@ def get_subreddit_threads(POST_ID: str):
if len(top_level_comment.body) >= int(
settings.config["reddit"]["thread"]["min_comment_length"]
):
if (
top_level_comment.author is not None
and sanitize_text(top_level_comment.body) is not None

@ -5,16 +5,22 @@ import torch
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[0] # First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
token_embeddings = model_output[
0
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)
# This function sort the given threads based on their total similarity with the given keywords
def sort_by_similarity(thread_objects, keywords):
# Initialize tokenizer + model.
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
# Transform the generator to a list of Submission Objects, so we can sort later based on context similarity to
# keywords
@ -22,26 +28,36 @@ def sort_by_similarity(thread_objects, keywords):
threads_sentences = []
for i, thread in enumerate(thread_objects):
threads_sentences.append(' '.join([thread.title, thread.selftext]))
threads_sentences.append(" ".join([thread.title, thread.selftext]))
# Threads inference
encoded_threads = tokenizer(threads_sentences, padding=True, truncation=True, return_tensors='pt')
encoded_threads = tokenizer(
threads_sentences, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad():
threads_embeddings = model(**encoded_threads)
threads_embeddings = mean_pooling(threads_embeddings, encoded_threads['attention_mask'])
threads_embeddings = mean_pooling(
threads_embeddings, encoded_threads["attention_mask"]
)
# Keywords inference
encoded_keywords = tokenizer(keywords, padding=True, truncation=True, return_tensors='pt')
encoded_keywords = tokenizer(
keywords, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad():
keywords_embeddings = model(**encoded_keywords)
keywords_embeddings = mean_pooling(keywords_embeddings, encoded_keywords['attention_mask'])
keywords_embeddings = mean_pooling(
keywords_embeddings, encoded_keywords["attention_mask"]
)
# Compare every keyword w/ every thread embedding
threads_embeddings_tensor = torch.tensor(threads_embeddings)
total_scores = torch.zeros(threads_embeddings_tensor.shape[0])
cosine_similarity = torch.nn.CosineSimilarity()
for keyword_embedding in keywords_embeddings:
keyword_embedding = torch.tensor(keyword_embedding).repeat(threads_embeddings_tensor.shape[0], 1)
keyword_embedding = torch.tensor(keyword_embedding).repeat(
threads_embeddings_tensor.shape[0], 1
)
similarity = cosine_similarity(keyword_embedding, threads_embeddings_tensor)
total_scores += similarity
@ -51,8 +67,8 @@ def sort_by_similarity(thread_objects, keywords):
thread_objects = np.array(thread_objects)[indices.numpy()].tolist()
#print('Similarity Thread Ranking')
#for i, thread in enumerate(thread_objects):
# print('Similarity Thread Ranking')
# for i, thread in enumerate(thread_objects):
# print(f'{i}) {threads_sentences[i]} score {similarity_scores[i]}')
return thread_objects, similarity_scores

@ -7,7 +7,9 @@ from rich.progress import track
from TTS.engine_wrapper import process_text
def draw_multiple_line_text(image, text, font, text_color, padding, wrap=50, transparent=False) -> None:
def draw_multiple_line_text(
image, text, font, text_color, padding, wrap=50, transparent=False
) -> None:
"""
Draw multiline text over given image
"""
@ -15,16 +17,38 @@ def draw_multiple_line_text(image, text, font, text_color, padding, wrap=50, tra
Fontperm = font.getsize(text)
image_width, image_height = image.size
lines = textwrap.wrap(text, width=wrap)
y = (image_height / 2) - (((Fontperm[1] + (len(lines) * padding) / len(lines)) * len(lines)) / 2)
y = (image_height / 2) - (
((Fontperm[1] + (len(lines) * padding) / len(lines)) * len(lines)) / 2
)
for line in lines:
line_width, line_height = font.getsize(line)
if transparent:
shadowcolor = "black"
for i in range(1, 5):
draw.text(((image_width - line_width) / 2 - i, y - i), line, font=font, fill=shadowcolor)
draw.text(((image_width - line_width) / 2 + i, y - i), line, font=font, fill=shadowcolor)
draw.text(((image_width - line_width) / 2 - i, y + i), line, font=font, fill=shadowcolor)
draw.text(((image_width - line_width) / 2 + i, y + i), line, font=font, fill=shadowcolor)
draw.text(
((image_width - line_width) / 2 - i, y - i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 + i, y - i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 - i, y + i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 + i, y + i),
line,
font=font,
fill=shadowcolor,
)
draw.text(((image_width - line_width) / 2, y), line, font=font, fill=text_color)
y += line_height + padding
@ -33,7 +57,9 @@ def imagemaker(theme, reddit_obj: dict, txtclr, padding=5, transparent=False) ->
"""
Render Images for video
"""
title = process_text(reddit_obj["thread_title"], False) # TODO if second argument cause any error
title = process_text(
reddit_obj["thread_title"], False
)
texts = reddit_obj["thread_post"]
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
@ -41,19 +67,25 @@ def imagemaker(theme, reddit_obj: dict, txtclr, padding=5, transparent=False) ->
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 50)
tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 50)
else:
tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 35) # for title
tfont = ImageFont.truetype(
os.path.join("fonts", "Roboto-Bold.ttf"), 35
) # for title
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Regular.ttf"), 30)
size = (1920, 1080)
image = Image.new("RGBA", size, theme)
# for title
draw_multiple_line_text(image, title, tfont, txtclr, padding, wrap=30, transparent=transparent)
draw_multiple_line_text(
image, title, tfont, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/title.png")
for idx, text in track(enumerate(texts), "Rendering Image"):
image = Image.new("RGBA", size, theme)
text = process_text(text, False)
draw_multiple_line_text(image, text, font, txtclr, padding, wrap=30, transparent=transparent)
draw_multiple_line_text(
image, text, font, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/img{idx}.png")

@ -11,9 +11,11 @@ def posttextparser(obj):
text = re.sub("\n", "", obj)
try:
nlp = spacy.load('en_core_web_sm')
nlp = spacy.load("en_core_web_sm")
except OSError:
print_step("The spacy model can't load. You need to install it with \npython -m spacy download en")
print_step(
"The spacy model can't load. You need to install it with \npython -m spacy download en"
)
exit()
doc = nlp(text)

@ -6,7 +6,9 @@ from utils.console import print_substep
from utils.ai_methods import sort_by_similarity
def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similarity_scores=None):
def get_subreddit_undone(
submissions: list, subreddit, times_checked=0, similarity_scores=None
):
"""_summary_
Args:
@ -18,8 +20,12 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
"""
# Second try of getting a valid Submission
if times_checked and settings.config["ai"]["ai_similarity_enabled"]:
print('Sorting based on similarity for a different date filter and thread limit..')
submissions = sort_by_similarity(submissions, keywords=settings.config["ai"]["ai_similarity_enabled"])
print(
"Sorting based on similarity for a different date filter and thread limit.."
)
submissions = sort_by_similarity(
submissions, keywords=settings.config["ai"]["ai_similarity_enabled"]
)
# recursively checks if the top submission in the list was already done.
if not exists("./video_creation/data/videos.json"):
@ -42,9 +48,11 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
if submission.stickied:
print_substep("This post was pinned by moderators. Skipping...")
continue
if submission.num_comments <= int(
settings.config["reddit"]["thread"]["min_comments"]
) and not settings.config["settings"]["storymode"]:
if (
submission.num_comments
<= int(settings.config["reddit"]["thread"]["min_comments"])
and not settings.config["settings"]["storymode"]
):
print_substep(
f'This post has under the specified minimum of comments ({settings.config["reddit"]["thread"]["min_comments"]}). Skipping...'
)

@ -1,14 +1,17 @@
from PIL import ImageDraw, ImageFont
def create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title):
def create_thumbnail(
thumbnail, font_family, font_size, font_color, width, height, title
):
font = ImageFont.truetype(font_family + ".ttf", font_size)
Xaxis = width - (width * 0.2) # 20% of the width
sizeLetterXaxis = font_size * 0.5 # 50% of the font size
XaxisLetterQty = round(Xaxis / sizeLetterXaxis) # Quantity of letters that can fit in the X axis
MarginYaxis = (height * 0.12) # 12% of the height
MarginXaxis = (width * 0.05) # 5% of the width
Xaxis = width - (width * 0.2) # 20% of the width
sizeLetterXaxis = font_size * 0.5 # 50% of the font size
XaxisLetterQty = round(
Xaxis / sizeLetterXaxis
) # Quantity of letters that can fit in the X axis
MarginYaxis = height * 0.12 # 12% of the height
MarginXaxis = width * 0.05 # 5% of the width
# 1.1 rem
LineHeight = font_size * 1.1
# rgb = "255,255,255" transform to list
@ -31,7 +34,8 @@ def create_thumbnail(thumbnail, font_family, font_size, font_color, width, heigh
# loop for put the title in the thumbnail
for i in range(0, len(arrayTitle)):
# 1.1 rem
draw.text((MarginXaxis, MarginYaxis + (LineHeight * i)),
arrayTitle[i], rgb, font=font)
draw.text(
(MarginXaxis, MarginYaxis + (LineHeight * i)), arrayTitle[i], rgb, font=font
)
return thumbnail

@ -89,10 +89,10 @@ def sanitize_text(text: str) -> str:
regex_expr = r"\s['|]|['|]\s|[\^_~@!&;#:\-%—“”‘\"%\*/{}\[\]\(\)\\|<>=+]"
result = re.sub(regex_expr, " ", result)
result = result.replace("+", "plus").replace("&", "and")
# emoji removal if the setting is enabled
if settings.config["settings"]["tts"]["no_emojis"]:
result = clean(result, no_emoji=True)
# remove extra whitespace
return " ".join(result.split())

@ -27,14 +27,13 @@ import time
class ProgressFfmpeg(threading.Thread):
def __init__(self, vid_duration_seconds, progress_update_callback):
threading.Thread.__init__(self, name='ProgressFfmpeg')
threading.Thread.__init__(self, name="ProgressFfmpeg")
self.stop_event = threading.Event()
self.output_file = tempfile.NamedTemporaryFile(mode='w+', delete=False)
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
self.vid_duration_seconds = vid_duration_seconds
self.progress_update_callback = progress_update_callback
def run(self):
while not self.stop_event.is_set():
latest_progress = self.get_latest_ms_progress()
if latest_progress is not None:
@ -47,8 +46,8 @@ class ProgressFfmpeg(threading.Thread):
if lines:
for line in lines:
if 'out_time_ms' in line:
out_time_ms = line.split('=')[1]
if "out_time_ms" in line:
out_time_ms = line.split("=")[1]
return int(out_time_ms) / 1000000.0
return None
@ -82,18 +81,30 @@ def name_normalize(name: str) -> str:
def prepare_background(reddit_id: str, W: int, H: int) -> str:
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
output = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4").filter('crop', f"ih*({W}/{H})", "ih").output(
output_path, an=None,
**{"c:v": "h264", "b:v": "20M", "b:a": "192k", "threads": multiprocessing.cpu_count()}).overwrite_output()
output = (
ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4")
.filter("crop", f"ih*({W}/{H})", "ih")
.output(
output_path,
an=None,
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
)
.overwrite_output()
)
output.run(quiet=True)
return output_path
def make_final_video(
number_of_clips: int,
length: int,
reddit_obj: dict,
background_config: Tuple[str, str, str, Any],
number_of_clips: int,
length: int,
reddit_obj: dict,
background_config: Tuple[str, str, str, Any],
):
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args:
@ -116,7 +127,9 @@ def make_final_video(
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert(1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3"))
audio_clips.insert(
1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")
)
elif settings.config["settings"]["storymodemethod"] == 1:
audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
@ -124,20 +137,37 @@ def make_final_video(
range(number_of_clips + 1), "Collecting the audio files..."
)
]
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips.insert(
0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")
)
else:
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips)]
audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3")
for i in range(number_of_clips)
]
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips_durations = [float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")['format']['duration']) for i
in
range(number_of_clips)]
audio_clips_durations.insert(0, float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")['format']['duration']))
audio_clips_durations = [
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"][
"duration"
]
)
for i in range(number_of_clips)
]
audio_clips_durations.insert(
0,
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
)
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
ffmpeg.output(audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}).overwrite_output().run(
quiet=True)
ffmpeg.output(
audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}
).overwrite_output().run(quiet=True)
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
# Create a screenshot_width variable to scale the screenshots to the correct size, the calculation is int((W * 90) // 100)
@ -149,48 +179,72 @@ def make_final_video(
image_clips.insert(
0,
ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")['v']
.filter('scale', screenshot_width, -1)
ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter(
"scale", screenshot_width, -1
),
)
current_time = 0
if settings.config["settings"]["storymode"]:
audio_clips_durations = [
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")['format']['duration']) for i in
range(number_of_clips)]
audio_clips_durations.insert(0, float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")['format']['duration']))
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")[
"format"
]["duration"]
)
for i in range(number_of_clips)
]
audio_clips_durations.insert(
0,
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
)
if settings.config["settings"]["storymodemethod"] == 0:
image_clips.insert(
1,
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png")
.filter('scale', screenshot_width, -1)
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter(
"scale", screenshot_width, -1
),
)
background_clip = background_clip.overlay(
image_clips[1],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[1]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
background_clip = background_clip.overlay(image_clips[1],
enable=f'between(t,{current_time},{current_time + audio_clips_durations[1]})',
x='(main_w-overlay_w)/2', y='(main_h-overlay_h)/2')
current_time += audio_clips_durations[1]
elif settings.config["settings"]["storymodemethod"] == 1:
for i in track(
range(0, number_of_clips + 1), "Collecting the image files..."
range(0, number_of_clips + 1), "Collecting the image files..."
):
image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")['v']
.filter('scale', 1080, -1)
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
"scale", 1080, -1
)
)
background_clip = background_clip.overlay(
image_clips[i],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
background_clip = background_clip.overlay(image_clips[i],
enable=f'between(t,{current_time},{current_time + audio_clips_durations[i]})',
x='(main_w-overlay_w)/2', y='(main_h-overlay_h)/2')
current_time += audio_clips_durations[i]
else:
for i in range(0, number_of_clips + 1):
image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")['v']
.filter('scale', screenshot_width, -1)
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")[
"v"
].filter("scale", screenshot_width, -1)
)
background_clip = background_clip.overlay(
image_clips[i],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
background_clip = background_clip.overlay(image_clips[i],
enable=f'between(t,{current_time},{current_time + audio_clips_durations[i]})',
x='(main_w-overlay_w)/2', y='(main_h-overlay_h)/2')
current_time += audio_clips_durations[i]
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
@ -209,8 +263,7 @@ def make_final_video(
if settingsbackground["background_thumbnail"]:
if not exists(f"./results/{subreddit}/thumbnails"):
print_substep(
"The results/thumbnails folder didn't exist so I made it")
print_substep("The results/thumbnails folder didn't exist so I made it")
os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next(
@ -230,19 +283,33 @@ def make_final_video(
font_color = settingsbackground["background_thumbnail_font_color"]
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
width, height = thumbnail.size
thumbnailSave = create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title_thumb)
thumbnailSave = create_thumbnail(
thumbnail,
font_family,
font_size,
font_color,
width,
height,
title_thumb,
)
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
print_substep(
f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png"
)
text = f"Background by {background_config[2]}"
background_clip = ffmpeg.drawtext(background_clip,
text=text,
x=f'(w-text_w)', y=f'(h-text_h)',
fontsize=12,
fontcolor="White",
fontfile=os.path.join("fonts", "Roboto-Regular.ttf"))
background_clip = ffmpeg.drawtext(
background_clip,
text=text,
x=f"(w-text_w)",
y=f"(h-text_h)",
fontsize=12,
fontcolor="White",
fontfile=os.path.join("fonts", "Roboto-Regular.ttf"),
)
print_step("Rendering the video 🎥")
from tqdm import tqdm
pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %")
def on_update_example(progress):
@ -251,22 +318,31 @@ def make_final_video(
pbar.update(status - old_percentage)
with ProgressFfmpeg(length, on_update_example) as progress:
ffmpeg.output(background_clip, audio, f"results/{subreddit}/{filename}.mp4", f='mp4',
**{"c:v": "h264", "b:v": "20M", "b:a": "192k",
"threads": multiprocessing.cpu_count()}).overwrite_output().global_args('-progress',
progress.output_file.name).run(
quiet=True, overwrite_output=True, capture_stdout=False, capture_stderr=False)
ffmpeg.output(
background_clip,
audio,
f"results/{subreddit}/{filename}.mp4",
f="mp4",
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
).overwrite_output().global_args("-progress", progress.output_file.name).run(
quiet=True,
overwrite_output=True,
capture_stdout=False,
capture_stderr=False,
)
old_percentage = pbar.n
pbar.update(100 - old_percentage)
pbar.close()
if settingsbackground["background_thumbnail"]:
if not exists(f"./results/{subreddit}/thumbnails"):
print_substep(
"The results/thumbnails folder didn't exist so I made it")
print_substep("The results/thumbnails folder didn't exist so I made it")
os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next(
@ -286,13 +362,22 @@ def make_final_video(
font_color = settingsbackground["background_thumbnail_font_color"]
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
width, height = thumbnail.size
thumbnailSave = create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title_thumb)
thumbnailSave = create_thumbnail(
thumbnail, font_family, font_size, font_color, width, height, title_thumb
)
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
print_substep(
f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png"
)
# get the thumbnail image from assets/temp/id/thumbnail.png and save it in results/subreddit/thumbnails
if settingsbackground["background_thumbnail"] and exists(f"assets/temp/{reddit_id}/thumbnail.png"):
shutil.move(f"assets/temp/{reddit_id}/thumbnail.png", f"./results/{subreddit}/thumbnails/{filename}.png")
if settingsbackground["background_thumbnail"] and exists(
f"assets/temp/{reddit_id}/thumbnail.png"
):
shutil.move(
f"assets/temp/{reddit_id}/thumbnail.png",
f"./results/{subreddit}/thumbnails/{filename}.png",
)
save_data(subreddit, filename + ".mp4", title, idx, background_config[2])
print_step("Removing temporary files 🗑")

@ -70,13 +70,20 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
if storymode and settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
print_substep("Generating images...")
return imagemaker(theme=bgcolor, reddit_obj=reddit_object, txtclr=txtcolor, transparent=transparent)
return imagemaker(
theme=bgcolor,
reddit_obj=reddit_object,
txtclr=txtcolor,
transparent=transparent,
)
screenshot_num: int
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
browser = p.chromium.launch(headless=False) # headless=False will show the browser for debugging purposes
browser = p.chromium.launch(
headless=True
) # headless=False will show the browser for debugging purposes
# Device scale factor (or dsf for short) allows us to increase the resolution of the screenshots
# When the dsf is 1, the width of the screenshot is 600 pixels
# so we need a dsf such that the width of the screenshot is greater than the final resolution of the video
@ -94,40 +101,46 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
context.add_cookies(cookies) # load preference cookies
# Login to Reddit
if storymode:
print_substep("Logging in to Reddit...")
page = context.new_page()
page.goto("https://www.reddit.com/login", timeout=0)
page.set_viewport_size(ViewportSize(width=1920, height=1080))
page.wait_for_load_state()
page.locator('[name="username"]').fill(settings.config["reddit"]["creds"]["username"])
page.locator('[name="password"]').fill(settings.config["reddit"]["creds"]["password"])
page.locator("button[class$='m-full-width']").click()
print_substep("Logging in to Reddit...")
page = context.new_page()
page.goto("https://www.reddit.com/login", timeout=0)
page.set_viewport_size(ViewportSize(width=1920, height=1080))
page.wait_for_load_state()
page.wait_for_load_state() # Wait for page to fully load and add 5 seconds
page.wait_for_timeout(5000)
page.locator('[name="username"]').fill(
settings.config["reddit"]["creds"]["username"]
)
page.locator('[name="password"]').fill(
settings.config["reddit"]["creds"]["password"]
)
page.locator("button[class$='m-full-width']").click()
page.wait_for_timeout(5000)
page.wait_for_load_state()
# Get the thread screenshot
page = context.new_page()
page.goto(reddit_object["thread_url"], timeout=0)
page.set_viewport_size(ViewportSize(width=W, height=H))
page.wait_for_load_state()
page.wait_for_timeout(5000)
if page.locator('[data-testid="content-gate"]').is_visible():
if page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).is_visible():
# This means the post is NSFW and requires to click the proceed button.
print_substep("Post is NSFW. You are spicy...")
page.locator('[data-testid="content-gate"] button').click()
page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).click()
page.wait_for_load_state() # Wait for page to fully load
if page.locator('[data-click-id="text"] button').is_visible():
page.locator(
'[data-click-id="text"] button'
).click() # Remove "Click to see nsfw" Button in Screenshot
# translate code
if page.locator("#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i").is_visible():
page.locator("#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i").click() # Interest popup is showing, this code will close it
if page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).is_visible():
page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).click() # Interest popup is showing, this code will close it
if lang:
print_substep("Translating post...")
@ -145,19 +158,28 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
postcontentpath = f"assets/temp/{reddit_id}/png/title.png"
try:
page.locator('[data-test-id="post-content"]').screenshot(path=postcontentpath)
page.locator('[data-test-id="post-content"]').screenshot(
path=postcontentpath
)
except Exception as e:
print_substep("Something went wrong!",style="red")
resp = input("Something went wrong with making the screenshots! Do you want to skip the post? (y/n) ")
print_substep("Something went wrong!", style="red")
resp = input(
"Something went wrong with making the screenshots! Do you want to skip the post? (y/n) "
)
if resp.casefold().startswith("y"):
save_data("", "", "skipped", reddit_id, "")
print_substep("The post is successfully skipped! You can now restart the program and this post will skipped.","green")
print_substep(
"The post is successfully skipped! You can now restart the program and this post will skipped.",
"green",
)
resp = input("Do you want the error traceback for debugging purposes? (y/n)")
resp = input(
"Do you want the error traceback for debugging purposes? (y/n)"
)
if not resp.casefold().startswith("y"):
exit()
raise e
if storymode:
@ -166,10 +188,10 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
)
else:
for idx, comment in enumerate(
track(
reddit_object["comments"][:screenshot_num],
"Downloading screenshots...",
)
track(
reddit_object["comments"][:screenshot_num],
"Downloading screenshots...",
)
):
# Stop if we have reached the screenshot_num
if idx >= screenshot_num:

Loading…
Cancel
Save