Version 3.1

pull/1562/head
Simon 2 years ago committed by GitHub
commit a63dc9b695
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,3 +9,4 @@ updates:
directory: "/" # Location of package manifests
schedule:
interval: "daily"
target-branch: "develop"

@ -81,7 +81,7 @@ I have tried to simplify the code so anyone can read it and start contributing a
Please read our [contributing guidelines](CONTRIBUTING.md) for more detailed information.
### For any questions or support join the [Discord](https://discord.gg/codingwithlewis) server
### For any questions or support join the [Discord](https://discord.gg/Vkanmh6C8V) server
## Developers and maintainers.

@ -46,7 +46,7 @@ non_eng_voices: Final[tuple] = (
"de_001", # German - Female
"de_002", # German - Male
"es_002", # Spanish - Male
"it_male_m18" # Italian - Male
"it_male_m18", # Italian - Male
# South american voices
"es_mx_002", # Spanish MX - Male
"br_001", # Portuguese BR - Female 1
@ -78,17 +78,18 @@ vocals: Final[tuple] = (
class TikTok:
"""TikTok Text-to-Speech Wrapper"""
def __init__(self):
if not settings.config['settings']['tts']['tiktok_sessionid']:
raise TikTokTTSException(5)
headers = {
"User-Agent": "com.zhiliaoapp.musically/2022600030 (Linux; U; Android 7.1.2; es_ES; SM-G988N; "
"Build/NRD90M;tt-ok/3.12.13.1)",
"Cookie": f"sessionid={settings.config['settings']['tts']['tiktok_sessionid']}",
}
self.URI_BASE = "https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
self.max_chars = 300
self.URI_BASE = (
"https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
)
self.max_chars = 200
self._session = requests.Session()
# set the headers to the session, so we don't have to do it for every request
@ -113,7 +114,9 @@ class TikTok:
try:
raw_voices = data["data"]["v_str"]
except:
print("The TikTok TTS returned an invalid response. Please try again later, and report this bug.")
print(
"The TikTok TTS returned an invalid response. Please try again later, and report this bug."
)
raise TikTokTTSException(0, "Invalid response")
decoded_voices = base64.b64decode(raw_voices)
@ -160,8 +163,5 @@ class TikTokTTSException(Exception):
if self._code == 4:
return f"Code: {self._code}, reason: the speaker doesn't exist, message: {self._message}"
if self._code == 5:
return f"You have to add session id in config to use titok TTS"
return f"Code: {self._message}, reason: unknown, message: {self._message}"

@ -17,7 +17,7 @@ from utils import settings
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
DEFAULT_MAX_LENGTH: int = 50 # video length variable
DEFAULT_MAX_LENGTH: int = 50 # video length variable
class TTSEngine:
@ -51,17 +51,18 @@ class TTSEngine:
self.length = 0
self.last_clip_length = last_clip_length
def add_periods(self): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences
def add_periods(
self,
): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences
for comment in self.reddit_object["comments"]:
comment["comment_body"] = comment["comment_body"].replace('\n', '. ')
if comment["comment_body"][-1] != '.':
comment["comment_body"] += '.'
comment["comment_body"] = comment["comment_body"].replace("\n", ". ")
if comment["comment_body"][-1] != ".":
comment["comment_body"] += "."
def run(self) -> Tuple[int, int]:
Path(self.path).mkdir(parents=True, exist_ok=True)
print_step("Saving Text to MP3 files...")
self.add_periods()
self.call_tts("title", process_text(self.reddit_object["thread_title"]))
# processed_text = ##self.reddit_object["thread_post"] != ""
@ -76,12 +77,10 @@ class TTSEngine:
"postaudio", process_text(self.reddit_object["thread_post"])
)
elif settings.config["settings"]["storymodemethod"] == 1:
for idx, text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}", process_text(text))
else:
for idx, comment in track(
enumerate(self.reddit_object["comments"]), "Saving..."
):
@ -143,10 +142,10 @@ class TTSEngine:
def call_tts(self, filename: str, text: str):
self.tts_module.run(text, filepath=f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
try:
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.last_clip_length = clip.duration
@ -168,7 +167,7 @@ class TTSEngine:
)
def process_text(text: str , clean : bool = True):
def process_text(text: str, clean: bool = True):
lang = settings.config["reddit"]["thread"]["post_lang"]
new_text = sanitize_text(text) if clean else text
if lang:

Binary file not shown.

@ -7,7 +7,7 @@ from pathlib import Path
from subprocess import Popen
from prawcore import ResponseException
from utils.console import print_substep
from reddit.subreddit import get_subreddit_threads
from utils import settings
from utils.cleanup import cleanup
@ -22,8 +22,9 @@ from video_creation.background import (
from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import get_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3
from utils.ffmpeg_install import ffmpeg_install
__VERSION__ = "3.0.1"
__VERSION__ = "3.1"
print(
"""
@ -43,7 +44,7 @@ checkversion(__VERSION__)
def main(POST_ID=None) -> None:
global redditid ,reddit_object
global redditid, reddit_object
reddit_object = get_subreddit_threads(POST_ID)
redditid = id(reddit_object)
length, number_of_comments = save_text_to_mp3(reddit_object)
@ -78,14 +79,25 @@ def shutdown():
if __name__ == "__main__":
assert sys.version_info >= (3, 9), "Python 3.10 or higher is required"
if sys.version_info.major != 3 or sys.version_info.minor != 10:
print("Hey! Congratulations, you've made it so far (which is pretty rare with no Python 3.10). Unfortunately, this program only works on Python 3.10. Please install Python 3.10 and try again.")
ffmpeg_install() # install ffmpeg if not installed
directory = Path().absolute()
config = settings.check_toml(
f"{directory}/utils/.config.template.toml", "config.toml"
)
config is False and exit()
if (
not settings.config["settings"]["tts"]["tiktok_sessionid"]
or settings.config["settings"]["tts"]["tiktok_sessionid"] == ""
) and config["settings"]["tts"]["voice_choice"] == "tiktok":
print_substep(
"TikTok voice requires a sessionid! Check our documentation on how to obtain one.",
"bold red",
)
exit()
try:
if config["reddit"]["thread"]["post_id"] :
if config["reddit"]["thread"]["post_id"]:
for index, post_id in enumerate(
config["reddit"]["thread"]["post_id"].split("+")
):
@ -108,8 +120,11 @@ if __name__ == "__main__":
shutdown()
except Exception as err:
print_step(f'Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n'
f'Version: {__VERSION__},Story mode: {str(config["settings"]["storymode"])}, Story mode method: {str(config["settings"]["storymodemethod"])},\n'
f'Postid : {str(config["settings"])},allownsfw :{config["settings"]["allow_nsfw"]},is_nsfw : {str(reddit_object["is_nsfw"])}'
)
config["settings"]["tts"]["tiktok_sessionid"] = "REDACTED"
print_step(
f"Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n"
f"Version: {__VERSION__} \n"
f"Error: {err} \n"
f'Config: {config["settings"]}'
)
raise err

@ -84,22 +84,28 @@ def get_subreddit_threads(POST_ID: str):
settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
):
submission = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"])
elif settings.config["ai"]["ai_similarity_enabled"]: # ai sorting based on comparison
submission = reddit.submission(
id=settings.config["reddit"]["thread"]["post_id"]
)
elif settings.config["ai"][
"ai_similarity_enabled"
]: # ai sorting based on comparison
threads = subreddit.hot(limit=50)
keywords = settings.config["ai"]["ai_similarity_keywords"].split(',')
keywords = settings.config["ai"]["ai_similarity_keywords"].split(",")
keywords = [keyword.strip() for keyword in keywords]
# Reformat the keywords for printing
keywords_print = ", ".join(keywords)
print(f'Sorting threads by similarity to the given keywords: {keywords_print}')
print(f"Sorting threads by similarity to the given keywords: {keywords_print}")
threads, similarity_scores = sort_by_similarity(threads, keywords)
submission, similarity_score = get_subreddit_undone(threads, subreddit, similarity_scores=similarity_scores)
submission, similarity_score = get_subreddit_undone(
threads, subreddit, similarity_scores=similarity_scores
)
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
if submission is None:
return get_subreddit_threads(POST_ID) # submission already done. rerun
return get_subreddit_threads(POST_ID) # submission already done. rerun
if settings.config["settings"]["storymode"]:
if not submission.selftext:
@ -107,7 +113,9 @@ def get_subreddit_threads(POST_ID: str):
exit()
else:
# Check for the length of the post text
if len(submission.selftext) > (settings.config["settings"]["storymode_max_length"] or 2000):
if len(submission.selftext) > (
settings.config["settings"]["storymode_max_length"] or 2000
):
print_substep(
f"Post is too long ({len(submission.selftext)}), try with a different post. ({settings.config['settings']['storymode_max_length']} character limit)"
)
@ -124,12 +132,15 @@ def get_subreddit_threads(POST_ID: str):
threadurl = f"https://reddit.com{submission.permalink}"
print_substep(f"Video will be: {submission.title} :thumbsup:", style="bold green")
print_substep(f"Thread url is : {threadurl } :thumbsup:", style="bold green")
print_substep(f"Thread url is: {threadurl} :thumbsup:", style="bold green")
print_substep(f"Thread has {upvotes} upvotes", style="bold blue")
print_substep(f"Thread has a upvote ratio of {ratio}%", style="bold blue")
print_substep(f"Thread has {num_comments} comments", style="bold blue")
if similarity_score:
print_substep(f"Thread has a similarity score up to {round(similarity_score * 100)}%", style="bold blue")
print_substep(
f"Thread has a similarity score up to {round(similarity_score * 100)}%",
style="bold blue",
)
content["thread_url"] = threadurl
content["thread_title"] = submission.title
@ -158,7 +169,6 @@ def get_subreddit_threads(POST_ID: str):
if len(top_level_comment.body) >= int(
settings.config["reddit"]["thread"]["min_comment_length"]
):
if (
top_level_comment.author is not None
and sanitize_text(top_level_comment.body) is not None

@ -18,4 +18,5 @@ clean-text==0.6.0
unidecode==1.3.2
spacy==3.4.1
torch==1.12.1
transformers==4.25.1
transformers==4.25.1
ffmpeg-python==0.2.0

@ -22,14 +22,13 @@ ai_similarity_keywords = {optional = true, type="str", example= 'Elon Musk, Twit
[settings]
allow_nsfw = { optional = false, type = "bool", default = false, example = false, options = [true, false, ], explanation = "Whether to allow NSFW content, True or False" }
theme = { optional = false, default = "dark", example = "light", options = ["dark", "light", ], explanation = "Sets the Reddit theme, either LIGHT or DARK" }
theme = { optional = false, default = "dark", example = "light", options = ["dark", "light", "transparent", ], explanation = "Sets the Reddit theme, either LIGHT or DARK. For story mode you can also use a transparent background." }
times_to_run = { optional = false, default = 1, example = 2, explanation = "Used if you want to run multiple times. Set to an int e.g. 4 or 29 or 1", type = "int", nmin = 1, oob_error = "It's very hard to run something less than once." }
opacity = { optional = false, default = 0.9, example = 0.8, explanation = "Sets the opacity of the comments when overlayed over the background", type = "float", nmin = 0, nmax = 1, oob_error = "The opacity HAS to be between 0 and 1", input_error = "The opacity HAS to be a decimal number between 0 and 1" }
transition = { optional = true, default = 0.2, example = 0.2, explanation = "Sets the transition time (in seconds) between the comments. Set to 0 if you want to disable it.", type = "float", nmin = 0, nmax = 2, oob_error = "The transition HAS to be between 0 and 2", input_error = "The opacity HAS to be a decimal number between 0 and 2" }
storymode = { optional = true, type = "bool", default = false, example = false, options = [true, false,], explanation = "Only read out title and post content, great for subreddits with stories" }
storymodemethod= { optional = true, default = 1, example = 1, explanation = "Style that's used for the storymode. Set to 0 for single picture display in whole video, set to 1 for fancy looking video ", type = "int", nmin = 0, oob_error = "It's very hard to run something less than once.", options = [0, 1] }
storymode_max_length = { optional = true, default = 1000, example = 1000, explanation = "Max length of the storymode video in characters. 200 characters are approximately 50 seconds.", type = "int", nmin = 1, oob_error = "It's very hard to make a video under a second." }
fps = { optional = false, default = 30, example = 30, explanation = "Sets the FPS of the video, 30 is default for best performance. 60 FPS is smoother.", type = "int", nmin = 1, nmax = 60, oob_error = "The FPS HAS to be between 1 and 60" }
resolution_w = { optional = false, default = 1080, example = 1440, explantation = "Sets the width in pixels of the final video" }
resolution_h = { optional = false, default = 1920, example = 2560, explantation = "Sets the height in pixels of the final video" }

@ -5,16 +5,22 @@ import torch
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[0] # First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
token_embeddings = model_output[
0
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)
# This function sort the given threads based on their total similarity with the given keywords
def sort_by_similarity(thread_objects, keywords):
# Initialize tokenizer + model.
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
# Transform the generator to a list of Submission Objects, so we can sort later based on context similarity to
# keywords
@ -22,26 +28,36 @@ def sort_by_similarity(thread_objects, keywords):
threads_sentences = []
for i, thread in enumerate(thread_objects):
threads_sentences.append(' '.join([thread.title, thread.selftext]))
threads_sentences.append(" ".join([thread.title, thread.selftext]))
# Threads inference
encoded_threads = tokenizer(threads_sentences, padding=True, truncation=True, return_tensors='pt')
encoded_threads = tokenizer(
threads_sentences, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad():
threads_embeddings = model(**encoded_threads)
threads_embeddings = mean_pooling(threads_embeddings, encoded_threads['attention_mask'])
threads_embeddings = mean_pooling(
threads_embeddings, encoded_threads["attention_mask"]
)
# Keywords inference
encoded_keywords = tokenizer(keywords, padding=True, truncation=True, return_tensors='pt')
encoded_keywords = tokenizer(
keywords, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad():
keywords_embeddings = model(**encoded_keywords)
keywords_embeddings = mean_pooling(keywords_embeddings, encoded_keywords['attention_mask'])
keywords_embeddings = mean_pooling(
keywords_embeddings, encoded_keywords["attention_mask"]
)
# Compare every keyword w/ every thread embedding
threads_embeddings_tensor = torch.tensor(threads_embeddings)
total_scores = torch.zeros(threads_embeddings_tensor.shape[0])
cosine_similarity = torch.nn.CosineSimilarity()
for keyword_embedding in keywords_embeddings:
keyword_embedding = torch.tensor(keyword_embedding).repeat(threads_embeddings_tensor.shape[0], 1)
keyword_embedding = torch.tensor(keyword_embedding).repeat(
threads_embeddings_tensor.shape[0], 1
)
similarity = cosine_similarity(keyword_embedding, threads_embeddings_tensor)
total_scores += similarity
@ -51,8 +67,8 @@ def sort_by_similarity(thread_objects, keywords):
thread_objects = np.array(thread_objects)[indices.numpy()].tolist()
#print('Similarity Thread Ranking')
#for i, thread in enumerate(thread_objects):
# print('Similarity Thread Ranking')
# for i, thread in enumerate(thread_objects):
# print(f'{i}) {threads_sentences[i]} score {similarity_scores[i]}')
return thread_objects, similarity_scores

@ -0,0 +1,84 @@
import requests
import os
import subprocess
def ffmpeg_install_windows():
try:
zip = "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip"
r = requests.get(zip)
with open("ffmpeg.zip", "wb") as f:
f.write(r.content)
import zipfile
with zipfile.ZipFile("ffmpeg.zip", "r") as zip_ref:
zip_ref.extractall()
os.remove("ffmpeg.zip")
os.rename("ffmpeg-master-latest-win64-gpl", "ffmpeg")
# Move the files inside bin to the root
for file in os.listdir("ffmpeg/bin"):
os.rename(f"ffmpeg/bin/{file}", f"ffmpeg/{file}")
os.rmdir("ffmpeg/bin")
for file in os.listdir("ffmpeg/doc"):
os.remove(f"ffmpeg/doc/{file}")
os.rmdir("ffmpeg/doc")
# Add to the path
subprocess.run("setx /M PATH \"%PATH%;%CD%\\ffmpeg\"", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("FFmpeg installed successfully! Please restart your computer and then re-run the program.")
exit()
except Exception as e:
print(
"An error occurred while trying to install FFmpeg. Please try again. Otherwise, please install FFmpeg manually and try again.")
print(e)
exit()
def ffmpeg_install_linux():
try:
subprocess.run("sudo apt install ffmpeg", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
print(
"An error occurred while trying to install FFmpeg. Please try again. Otherwise, please install FFmpeg manually and try again.")
print(e)
exit()
print("FFmpeg installed successfully! Please re-run the program.")
exit()
def ffmpeg_install_mac():
try:
subprocess.run("brew install ffmpeg", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
print(
"Homebrew is not installed. Please install it and try again. Otherwise, please install FFmpeg manually and try again.")
exit()
print("FFmpeg installed successfully! Please re-run the program.")
exit()
def ffmpeg_install():
try:
# Try to run the FFmpeg command
subprocess.run(['ffmpeg', '-version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print('FFmpeg is installed on this system! If you are seeing this error for the second time, restart your computer.')
except FileNotFoundError as e:
print('FFmpeg is not installed on this system.')
resp = input("We can try to automatically install it for you. Would you like to do that? (y/n): ")
if resp.lower() == "y":
print("Installing FFmpeg...")
if os.name == "nt":
ffmpeg_install_windows()
elif os.name == "posix":
ffmpeg_install_linux()
elif os.name == "mac":
ffmpeg_install_mac()
else:
print("Your OS is not supported. Please install FFmpeg manually and try again.")
exit()
else:
print("Please install FFmpeg manually and try again.")
exit()
except Exception as e:
print("Welcome fellow traveler! You're one of the few who have made it this far. We have no idea how you got at this error, but we're glad you're here. Please report this error to the developer, and we'll try to fix it as soon as possible. Thank you for your patience!")
print(e)
return None

@ -6,7 +6,10 @@ from PIL import Image, ImageDraw, ImageFont
from rich.progress import track
from TTS.engine_wrapper import process_text
def draw_multiple_line_text(image, text, font, text_color, padding, wrap=50) -> None:
def draw_multiple_line_text(
image, text, font, text_color, padding, wrap=50, transparent=False
) -> None:
"""
Draw multiline text over given image
"""
@ -19,58 +22,70 @@ def draw_multiple_line_text(image, text, font, text_color, padding, wrap=50) ->
)
for line in lines:
line_width, line_height = font.getsize(line)
if transparent:
shadowcolor = "black"
for i in range(1, 5):
draw.text(
((image_width - line_width) / 2 - i, y - i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 + i, y - i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 - i, y + i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 + i, y + i),
line,
font=font,
fill=shadowcolor,
)
draw.text(((image_width - line_width) / 2, y), line, font=font, fill=text_color)
y += line_height + padding
# theme=bgcolor,reddit_obj=reddit_object,txtclr=txtcolor
def imagemaker(theme, reddit_obj: dict, txtclr, padding=5) -> None:
def imagemaker(theme, reddit_obj: dict, txtclr, padding=5, transparent=False) -> None:
"""
Render Images for video
"""
title = process_text(reddit_obj["thread_title"], False) #TODO if second argument cause any error
title = process_text(
reddit_obj["thread_title"], False
)
texts = reddit_obj["thread_post"]
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 27) # for title
font = ImageFont.truetype(
os.path.join("fonts", "Roboto-Regular.ttf"), 20
) # for despcription|comments
size = (500, 176)
if transparent:
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 50)
tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 50)
else:
tfont = ImageFont.truetype(
os.path.join("fonts", "Roboto-Bold.ttf"), 35
) # for title
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Regular.ttf"), 30)
size = (1920, 1080)
image = Image.new("RGBA", size, theme)
draw = ImageDraw.Draw(image)
# for title
if len(title) > 40:
draw_multiple_line_text(image, title, tfont, txtclr, padding, wrap=30)
else:
Fontperm = tfont.getsize(title)
draw.text(
((image.size[0] - Fontperm[0]) / 2, (image.size[1] - Fontperm[1]) / 2),
font=tfont,
text=title,
) # (image.size[1]/2)-(Fontperm[1]/2)
draw_multiple_line_text(
image, title, tfont, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/title.png")
# for comment|description
for idx, text in track(enumerate(texts), "Rendering Image"):#, total=len(texts)):
for idx, text in track(enumerate(texts), "Rendering Image"):
image = Image.new("RGBA", size, theme)
draw = ImageDraw.Draw(image)
text = process_text(text,False)
if len(text) > 50:
draw_multiple_line_text(image, text, font, txtclr, padding)
else:
Fontperm = font.getsize(text)
draw.text(
((image.size[0] - Fontperm[0]) / 2, (image.size[1] - Fontperm[1]) / 2),
font=font,
text=text,
) # (image.size[1]/2)-(Fontperm[1]/2)
text = process_text(text, False)
draw_multiple_line_text(
image, text, font, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/img{idx}.png")

@ -11,9 +11,11 @@ def posttextparser(obj):
text = re.sub("\n", "", obj)
try:
nlp = spacy.load('en_core_web_sm')
nlp = spacy.load("en_core_web_sm")
except OSError:
print_step("The spacy model can't load. You need to install it with \npython -m spacy download en")
print_step(
"The spacy model can't load. You need to install it with the command \npython -m spacy download en_core_web_sm"
)
exit()
doc = nlp(text)

@ -6,7 +6,9 @@ from utils.console import print_substep
from utils.ai_methods import sort_by_similarity
def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similarity_scores=None):
def get_subreddit_undone(
submissions: list, subreddit, times_checked=0, similarity_scores=None
):
"""_summary_
Args:
@ -18,8 +20,12 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
"""
# Second try of getting a valid Submission
if times_checked and settings.config["ai"]["ai_similarity_enabled"]:
print('Sorting based on similarity for a different date filter and thread limit..')
submissions = sort_by_similarity(submissions, keywords=settings.config["ai"]["ai_similarity_enabled"])
print(
"Sorting based on similarity for a different date filter and thread limit.."
)
submissions = sort_by_similarity(
submissions, keywords=settings.config["ai"]["ai_similarity_enabled"]
)
# recursively checks if the top submission in the list was already done.
if not exists("./video_creation/data/videos.json"):
@ -42,9 +48,11 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
if submission.stickied:
print_substep("This post was pinned by moderators. Skipping...")
continue
if submission.num_comments <= int(
settings.config["reddit"]["thread"]["min_comments"]
) and not settings.config["settings"]["storymode"]:
if (
submission.num_comments
<= int(settings.config["reddit"]["thread"]["min_comments"])
and not settings.config["settings"]["storymode"]
):
print_substep(
f'This post has under the specified minimum of comments ({settings.config["reddit"]["thread"]["min_comments"]}). Skipping...'
)

@ -1,14 +1,17 @@
from PIL import ImageDraw, ImageFont
def create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title):
def create_thumbnail(
thumbnail, font_family, font_size, font_color, width, height, title
):
font = ImageFont.truetype(font_family + ".ttf", font_size)
Xaxis = width - (width * 0.2) # 20% of the width
sizeLetterXaxis = font_size * 0.5 # 50% of the font size
XaxisLetterQty = round(Xaxis / sizeLetterXaxis) # Quantity of letters that can fit in the X axis
MarginYaxis = (height * 0.12) # 12% of the height
MarginXaxis = (width * 0.05) # 5% of the width
Xaxis = width - (width * 0.2) # 20% of the width
sizeLetterXaxis = font_size * 0.5 # 50% of the font size
XaxisLetterQty = round(
Xaxis / sizeLetterXaxis
) # Quantity of letters that can fit in the X axis
MarginYaxis = height * 0.12 # 12% of the height
MarginXaxis = width * 0.05 # 5% of the width
# 1.1 rem
LineHeight = font_size * 1.1
# rgb = "255,255,255" transform to list
@ -31,7 +34,8 @@ def create_thumbnail(thumbnail, font_family, font_size, font_color, width, heigh
# loop for put the title in the thumbnail
for i in range(0, len(arrayTitle)):
# 1.1 rem
draw.text((MarginXaxis, MarginYaxis + (LineHeight * i)),
arrayTitle[i], rgb, font=font)
draw.text(
(MarginXaxis, MarginYaxis + (LineHeight * i)), arrayTitle[i], rgb, font=font
)
return thumbnail

@ -1,68 +0,0 @@
from __future__ import annotations
import re
from typing import Tuple
from PIL import ImageFont, Image, ImageDraw, ImageEnhance
from moviepy.video.VideoClip import VideoClip, ImageClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
class Video:
def __init__(self, video: VideoClip, *args, **kwargs):
self.video: VideoClip = video
self.fps = self.video.fps
self.duration = self.video.duration
@staticmethod
def _create_watermark(text, redditid, fontsize, opacity=0.5):
id = re.sub(r"[^\w\s-]", "", redditid["thread_id"])
path = f"./assets/temp/{id}/png/watermark.png"
width = int(fontsize * len(text))
height = int(fontsize * len(text) / 2)
white = (255, 255, 255)
transparent = (0, 0, 0, 0)
font = ImageFont.load_default()
wm = Image.new("RGBA", (width, height), transparent)
im = Image.new("RGBA", (width, height), transparent) # Change this line too.
draw = ImageDraw.Draw(wm)
w, h = draw.textsize(text, font)
draw.text(((width - w) / 2, (height - h) / 2), text, white, font)
en = ImageEnhance.Brightness(wm) # todo allow it to use the fontsize
mask = en.enhance(1 - opacity)
im.paste(wm, (25, 25), mask)
im.save(path)
return ImageClip(path)
def add_watermark(
self,
text,
redditid,
opacity=0.5,
duration: int | float = 5,
position: Tuple = (0.7, 0.9),
fontsize=15,
):
compensation = round(
(
position[0]
/ ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])
),
ndigits=2,
)
position = (compensation, position[1])
# print(f'{compensation=}')
# print(f'{position=}')
img_clip = self._create_watermark(
text, redditid, fontsize=fontsize, opacity=opacity
)
img_clip = img_clip.set_opacity(opacity).set_duration(duration)
img_clip = img_clip.set_position(
position, relative=True
) # todo get dara from utils/CONSTANTS.py and adapt position accordingly
# Overlay the img clip on the first video clip
self.video = CompositeVideoClip([self.video, img_clip])
return self.video

@ -89,10 +89,10 @@ def sanitize_text(text: str) -> str:
regex_expr = r"\s['|]|['|]\s|[\^_~@!&;#:\-%—“”‘\"%\*/{}\[\]\(\)\\|<>=+]"
result = re.sub(regex_expr, " ", result)
result = result.replace("+", "plus").replace("&", "and")
# emoji removal if the setting is enabled
if settings.config["settings"]["tts"]["no_emojis"]:
result = clean(result, no_emoji=True)
# remove extra whitespace
return " ".join(result.split())

@ -1,33 +1,66 @@
import multiprocessing
import os
import re
import multiprocessing
from os.path import exists
from typing import Tuple, Any, Final
import translators as ts
import shutil
from os.path import exists # Needs to be imported specifically
from typing import Final
from typing import Tuple, Any
from PIL import Image
from moviepy.audio.AudioClip import concatenate_audioclips, CompositeAudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.VideoClip import ImageClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.compositing.concatenate import concatenate_videoclips
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
import ffmpeg
import translators as ts
from PIL import Image
from rich.console import Console
from rich.progress import track
from utils import settings
from utils.cleanup import cleanup
from utils.console import print_step, print_substep
from utils.video import Video
from utils.videos import save_data
from utils.thumbnail import create_thumbnail
from utils import settings
from utils.thumbnail import create_thumbnail
from utils.videos import save_data
console = Console()
import tempfile
import threading
import time
class ProgressFfmpeg(threading.Thread):
def __init__(self, vid_duration_seconds, progress_update_callback):
threading.Thread.__init__(self, name="ProgressFfmpeg")
self.stop_event = threading.Event()
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
self.vid_duration_seconds = vid_duration_seconds
self.progress_update_callback = progress_update_callback
def run(self):
while not self.stop_event.is_set():
latest_progress = self.get_latest_ms_progress()
if latest_progress is not None:
completed_percent = latest_progress / self.vid_duration_seconds
self.progress_update_callback(completed_percent)
time.sleep(1)
def get_latest_ms_progress(self):
lines = self.output_file.readlines()
if lines:
for line in lines:
if "out_time_ms" in line:
out_time_ms = line.split("=")[1]
return int(out_time_ms) / 1000000.0
return None
def stop(self):
self.stop_event.set()
def __enter__(self):
self.start()
return self
def __exit__(self, *args, **kwargs):
self.stop()
def name_normalize(name: str) -> str:
name = re.sub(r'[?\\"%*:|<>]', "", name)
@ -46,22 +79,29 @@ def name_normalize(name: str) -> str:
return name
def prepare_background(reddit_id: str, W: int, H: int) -> VideoFileClip:
clip = (
VideoFileClip(f"assets/temp/{reddit_id}/background.mp4")
.without_audio()
.resize(height=H)
def prepare_background(reddit_id: str, W: int, H: int) -> str:
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
output = (
ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4")
.filter("crop", f"ih*({W}/{H})", "ih")
.output(
output_path,
an=None,
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
)
.overwrite_output()
)
# calculate the center of the background clip
c = clip.w // 2
# calculate the coordinates where to crop
half_w = W // 2
x1 = c - half_w
x2 = c + half_w
return clip.crop(x1=x1, y1=0, x2=x2, y2=H)
try:
output.run(quiet=True)
except Exception as e:
print(e)
exit()
return output_path
def make_final_video(
@ -81,103 +121,135 @@ def make_final_video(
W: Final[int] = int(settings.config["settings"]["resolution_w"])
H: Final[int] = int(settings.config["settings"]["resolution_h"])
# try: # if it isn't found (i.e you just updated and copied over config.toml) it will throw an error
# VOLUME_MULTIPLIER = settings.config["settings"]['background']["background_audio_volume"]
# except (TypeError, KeyError):
# print('No background audio volume found in config.toml. Using default value of 1.')
# VOLUME_MULTIPLIER = 1
reddit_id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = settings.config["settings"]["opacity"]
transition = settings.config["settings"]["transition"]
background_clip = prepare_background(reddit_id, W=W, H=H)
background_clip = ffmpeg.input(prepare_background(reddit_id, W=W, H=H))
# Gather all audio clips
audio_clips = list()
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [AudioFileClip(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert(1, AudioFileClip(f"assets/temp/{reddit_id}/mp3/postaudio.mp3"))
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert(
1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")
)
elif settings.config["settings"]["storymodemethod"] == 1:
audio_clips = [
AudioFileClip(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
for i in track(
range(number_of_clips + 1), "Collecting the audio files..."
)
]
audio_clips.insert(0, AudioFileClip(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips.insert(
0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")
)
else:
audio_clips = [
AudioFileClip(f"assets/temp/{reddit_id}/mp3/{i}.mp3")
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3")
for i in range(number_of_clips)
]
audio_clips.insert(0, AudioFileClip(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips_durations = [
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"][
"duration"
]
)
for i in range(number_of_clips)
]
audio_clips_durations.insert(
0,
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
)
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
ffmpeg.output(
audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}
).overwrite_output().run(quiet=True)
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
# add title to video
image_clips = []
# Gather all images
new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity)
new_transition = (
0 if transition is None or float(transition) > 2 else float(transition)
)
screenshot_width = int((W * 90) // 100)
screenshot_width = int((W * 45) // 100)
audio = ffmpeg.input(f"assets/temp/{reddit_id}/audio.mp3")
image_clips = list()
image_clips.insert(
0,
ImageClip(f"assets/temp/{reddit_id}/png/title.png")
.set_duration(audio_clips[0].duration)
.resize(width=screenshot_width)
.set_opacity(new_opacity)
.crossfadein(new_transition)
.crossfadeout(new_transition),
ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter(
"scale", screenshot_width, -1
),
)
current_time = 0
if settings.config["settings"]["storymode"]:
audio_clips_durations = [
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")[
"format"
]["duration"]
)
for i in range(number_of_clips)
]
audio_clips_durations.insert(
0,
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
)
if settings.config["settings"]["storymodemethod"] == 0:
image_clips.insert(
1,
ImageClip(f"assets/temp/{reddit_id}/png/story_content.png")
.set_duration(audio_clips[1].duration)
.set_position("center")
.resize(width=screenshot_width)
.set_opacity(float(opacity)),
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter(
"scale", screenshot_width, -1
),
)
background_clip = background_clip.overlay(
image_clips[1],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[1]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[1]
elif settings.config["settings"]["storymodemethod"] == 1:
for i in track(
range(0, number_of_clips + 1), "Collecting the image files..."
):
image_clips.append(
ImageClip(f"assets/temp/{reddit_id}/png/img{i}.png")
.set_duration(audio_clips[i + 1].duration)
.resize(width=screenshot_width)
.set_opacity(new_opacity)
# .crossfadein(new_transition)
# .crossfadeout(new_transition)
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
"scale", screenshot_width, -1
)
)
background_clip = background_clip.overlay(
image_clips[i],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[i]
else:
for i in range(0, number_of_clips):
for i in range(0, number_of_clips + 1):
image_clips.append(
ImageClip(f"assets/temp/{reddit_id}/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
.resize(width=screenshot_width)
.set_opacity(new_opacity)
.crossfadein(new_transition)
.crossfadeout(new_transition)
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")[
"v"
].filter("scale", screenshot_width, -1)
)
background_clip = background_clip.overlay(
image_clips[i],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[i]
img_clip_pos = background_config[3]
image_concat = concatenate_videoclips(image_clips).set_position(
img_clip_pos
) # note transition kwarg for delay in imgs
image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
title_thumb = reddit_obj["thread_title"]
@ -189,13 +261,12 @@ def make_final_video(
print_substep("The results folder didn't exist so I made it")
os.makedirs(f"./results/{subreddit}")
# create a tumbnail for the video
# create a thumbnail for the video
settingsbackground = settings.config["settings"]["background"]
if settingsbackground["background_thumbnail"]:
if not exists(f"./results/{subreddit}/thumbnails"):
print_substep(
"The results/thumbnails folder didn't exist so I made it")
print_substep("The results/thumbnails folder didn't exist so I made it")
os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next(
@ -209,84 +280,75 @@ def make_final_video(
if first_image is None:
print_substep("No png files found in assets/backgrounds", "red")
if settingsbackground["background_thumbnail"] and first_image:
font_family = settingsbackground["background_thumbnail_font_family"]
font_size = settingsbackground["background_thumbnail_font_size"]
font_color = settingsbackground["background_thumbnail_font_color"]
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
width, height = thumbnail.size
thumbnailSave = create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title_thumb)
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
# create a tumbnail for the video
settingsbackground = settings.config["settings"]["background"]
if settingsbackground["background_thumbnail"]:
if not exists(f"./results/{subreddit}/thumbnails"):
else:
font_family = settingsbackground["background_thumbnail_font_family"]
font_size = settingsbackground["background_thumbnail_font_size"]
font_color = settingsbackground["background_thumbnail_font_color"]
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
width, height = thumbnail.size
thumbnailSave = create_thumbnail(
thumbnail,
font_family,
font_size,
font_color,
width,
height,
title_thumb,
)
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(
"The results/thumbnails folder didn't exist so I made it")
os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next(
(
file
for file in os.listdir("assets/backgrounds")
if file.endswith(".png")
),
None,
)
if first_image is None:
print_substep("No png files found in assets/backgrounds", "red")
f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png"
)
if settingsbackground["background_thumbnail"] and first_image:
font_family = settingsbackground["background_thumbnail_font_family"]
font_size = settingsbackground["background_thumbnail_font_size"]
font_color = settingsbackground["background_thumbnail_font_color"]
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
width, height = thumbnail.size
thumbnailSave = create_thumbnail(thumbnail, font_family, font_size, font_color, width, height, title_thumb)
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
# if settings.config["settings"]['background']["background_audio"] and exists(f"assets/backgrounds/background.mp3"):
# audioclip = mpe.AudioFileClip(f"assets/backgrounds/background.mp3").set_duration(final.duration)
# audioclip = audioclip.fx( volumex, 0.2)
# final_audio = mpe.CompositeAudioClip([final.audio, audioclip])
# # lowered_audio = audio_background.multiply_volume( # todo get this to work
# # VOLUME_MULTIPLIER) # lower volume by background_audio_volume, use with fx
# final.set_audio(final_audio)
final = Video(final).add_watermark(
text=f"Background credit: {background_config[2]}",
opacity=0.4,
redditid=reddit_obj,
)
final.write_videofile(
f"assets/temp/{reddit_id}/temp.mp4",
fps=int(settings.config["settings"]["fps"]),
audio_codec="aac",
audio_bitrate="192k",
verbose=False,
threads=multiprocessing.cpu_count(),
#preset="ultrafast", # for testing purposes
)
ffmpeg_extract_subclip(
f"assets/temp/{reddit_id}/temp.mp4",
0,
length,
targetname=f"results/{subreddit}/{filename}.mp4",
text = f"Background by {background_config[2]}"
background_clip = ffmpeg.drawtext(
background_clip,
text=text,
x=f"(w-text_w)",
y=f"(h-text_h)",
fontsize=12,
fontcolor="White",
fontfile=os.path.join("fonts", "Roboto-Regular.ttf"),
)
#get the thumbnail image from assets/temp/id/thumbnail.png and save it in results/subreddit/thumbnails
if settingsbackground["background_thumbnail"] and exists(f"assets/temp/{id}/thumbnail.png"):
shutil.move(f"assets/temp/{id}/thumbnail.png", f"./results/{subreddit}/thumbnails/{filename}.png")
print_step("Rendering the video 🎥")
from tqdm import tqdm
pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %")
def on_update_example(progress):
status = round(progress * 100, 2)
old_percentage = pbar.n
pbar.update(status - old_percentage)
path = f"results/{subreddit}/{filename}"
path = path[:251]
path = path + ".mp4"
with ProgressFfmpeg(length, on_update_example) as progress:
ffmpeg.output(
background_clip,
audio,
path,
f="mp4",
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
).overwrite_output().global_args("-progress", progress.output_file.name).run(
quiet=True,
overwrite_output=True,
capture_stdout=False,
capture_stderr=False,
)
old_percentage = pbar.n
pbar.update(100 - old_percentage)
pbar.close()
save_data(subreddit, filename+".mp4", title, idx, background_config[2])
save_data(subreddit, filename + ".mp4", title, idx, background_config[2])
print_step("Removing temporary files 🗑")
cleanups = cleanup(reddit_id)
print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!")
print_step(
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'
)
print_step("Done! 🎉 The video is in the results folder 📁")

@ -35,12 +35,55 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# ! Make sure the reddit screenshots folder exists
Path(f"assets/temp/{reddit_id}/png").mkdir(parents=True, exist_ok=True)
# set the theme and disable non-essential cookies
if settings.config["settings"]["theme"] == "dark":
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240)
transparent = False
elif settings.config["settings"]["theme"] == "transparent":
if storymode:
# Transparent theme
bgcolor = (0, 0, 0, 0)
txtcolor = (255, 255, 255)
transparent = True
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
else:
# Switch to dark theme
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240)
transparent = False
else:
cookie_file = open(
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
bgcolor = (255, 255, 255, 255)
txtcolor = (0, 0, 0)
transparent = False
if storymode and settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
print_substep("Generating images...")
return imagemaker(
theme=bgcolor,
reddit_obj=reddit_object,
txtclr=txtcolor,
transparent=transparent,
)
screenshot_num: int
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
browser = p.chromium.launch(headless=True) # headless=False will show the browser for debugging purposes
context = browser.new_context()
browser = p.chromium.launch(
headless=True
) # headless=False will show the browser for debugging purposes
# Device scale factor (or dsf for short) allows us to increase the resolution of the screenshots
# When the dsf is 1, the width of the screenshot is 600 pixels
# so we need a dsf such that the width of the screenshot is greater than the final resolution of the video
@ -52,22 +95,6 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
viewport=ViewportSize(width=W, height=H),
device_scale_factor=dsf,
)
# set the theme and disable non-essential cookies
if settings.config["settings"]["theme"] == "dark":
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240)
else:
cookie_file = open(
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
bgcolor = (255, 255, 255, 255)
txtcolor = (0, 0, 0)
if storymode and settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
return imagemaker(theme=bgcolor, reddit_obj=reddit_object, txtclr=txtcolor)
cookies = json.load(cookie_file)
cookie_file.close()
@ -80,31 +107,40 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
page.set_viewport_size(ViewportSize(width=1920, height=1080))
page.wait_for_load_state()
page.locator('[name="username"]').fill(settings.config["reddit"]["creds"]["username"])
page.locator('[name="password"]').fill(settings.config["reddit"]["creds"]["password"])
page.locator("button:has-text('Log In')").click()
page.wait_for_load_state() # Wait for page to fully load and add 5 seconds
page.locator('[name="username"]').fill(
settings.config["reddit"]["creds"]["username"]
)
page.locator('[name="password"]').fill(
settings.config["reddit"]["creds"]["password"]
)
page.locator("button[class$='m-full-width']").click()
page.wait_for_timeout(5000)
page.wait_for_load_state()
# Get the thread screenshot
page = context.new_page()
page.goto(reddit_object["thread_url"], timeout=0)
page.set_viewport_size(ViewportSize(width=W, height=H))
page.wait_for_load_state()
page.wait_for_timeout(5000)
if page.locator('[data-testid="content-gate"]').is_visible():
if page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).is_visible():
# This means the post is NSFW and requires to click the proceed button.
print_substep("Post is NSFW. You are spicy...")
page.locator('[data-testid="content-gate"] button').click()
page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).click()
page.wait_for_load_state() # Wait for page to fully load
if page.locator('[data-click-id="text"] button').is_visible():
page.locator(
'[data-click-id="text"] button'
).click() # Remove "Click to see nsfw" Button in Screenshot
# translate code
if page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).is_visible():
page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).click() # Interest popup is showing, this code will close it
if lang:
print_substep("Translating post...")
@ -122,19 +158,28 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
postcontentpath = f"assets/temp/{reddit_id}/png/title.png"
try:
page.locator('[data-test-id="post-content"]').screenshot(path=postcontentpath)
page.locator('[data-test-id="post-content"]').screenshot(
path=postcontentpath
)
except Exception as e:
print_substep("Something went wrong!",style="red")
resp = input("Something went wrong with making the screenshots! Do you want to skip the post? (y/n) ")
print_substep("Something went wrong!", style="red")
resp = input(
"Something went wrong with making the screenshots! Do you want to skip the post? (y/n) "
)
if resp.casefold().startswith("y"):
save_data("", "", "skipped", reddit_id, "")
print_substep("The post is successfully skipped! You can now restart the program and this post will skipped.","green")
print_substep(
"The post is successfully skipped! You can now restart the program and this post will skipped.",
"green",
)
resp = input("Do you want the error traceback for debugging purposes? (y/n)")
resp = input(
"Do you want the error traceback for debugging purposes? (y/n)"
)
if not resp.casefold().startswith("y"):
exit()
raise e
if storymode:
@ -143,10 +188,10 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
)
else:
for idx, comment in enumerate(
track(
reddit_object["comments"][:screenshot_num],
"Downloading screenshots...",
)
track(
reddit_object["comments"][:screenshot_num],
"Downloading screenshots...",
)
):
# Stop if we have reached the screenshot_num
if idx >= screenshot_num:

Loading…
Cancel
Save