A lot of reformatting and a couple of bug fixes

pull/1288/head
Simon 2 years ago
parent bb838710e3
commit e8cadd9b04

@ -75,10 +75,15 @@ class TikTok: # TikTok Text-to-Speech Wrapper
voice = (
self.randomvoice()
if random_voice
else (settings.config["settings"]["tts"]["tiktok_voice"] or random.choice(self.voices["human"]))
else (
settings.config["settings"]["tts"]["tiktok_voice"]
or random.choice(self.voices["human"])
)
)
try:
r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
r = requests.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611
session = requests.Session()
@ -86,7 +91,9 @@ class TikTok: # TikTok Text-to-Speech Wrapper
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
r = session.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
# print(r.text)
vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr)

@ -39,11 +39,17 @@ class AWSPolly:
voice = self.randomvoice()
else:
if not settings.config["settings"]["tts"]["aws_polly_voice"]:
raise ValueError(f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}")
voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize()
raise ValueError(
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}"
)
voice = str(
settings.config["settings"]["tts"]["aws_polly_voice"]
).capitalize()
try:
# Request speech synthesis
response = polly.synthesize_speech(Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural")
response = polly.synthesize_speech(
Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural"
)
except (BotoCoreError, ClientError) as error:
# The service returned an error, exit gracefully
print(error)

@ -18,7 +18,8 @@ from utils import settings
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
DEFAULT_MAX_LENGTH: int = 50 # video length variable
DEFAULT_MAX_LENGTH: int = 50 # video length variable
class TTSEngine:
@ -53,27 +54,31 @@ class TTSEngine:
def run(self) -> Tuple[int, int]:
Path(self.path).mkdir(parents=True, exist_ok=True)
Path(self.path).mkdir(parents=True, exist_ok=True)
print_step("Saving Text to MP3 files...")
self.call_tts("title", process_text(self.reddit_object["thread_title"]))
# processed_text = ##self.reddit_object["thread_post"] != ""
idx = None
if settings.config["settings"]["storymode"] :
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
if (len(self.reddit_object["thread_post"]) > self.tts_module.max_chars):
if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars:
self.split_post(self.reddit_object["thread_post"], "postaudio")
else :
self.call_tts("postaudio",process_text(self.reddit_object["thread_post"]) )
else:
self.call_tts(
"postaudio", process_text(self.reddit_object["thread_post"])
)
elif settings.config["settings"]["storymodemethod"] == 1:
for idx,text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}",process_text(text) )
else :
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
for idx, text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}", process_text(text))
else:
for idx, comment in track(
enumerate(self.reddit_object["comments"]), "Saving..."
):
# ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length and idx > 1:
self.length -= self.last_clip_length
@ -92,7 +97,10 @@ class TTSEngine:
def split_post(self, text: str, idx):
split_files = []
split_text = [
x.group().strip() for x in re.finditer(r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text)
x.group().strip()
for x in re.finditer(
r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text
)
]
self.create_silence_mp3()
@ -106,15 +114,19 @@ class TTSEngine:
continue
else:
self.call_tts(f"{idx}-{idy}.part", newtext)
with open(f"{self.path}/list.txt", 'w') as f:
with open(f"{self.path}/list.txt", "w") as f:
for idz in range(0, len(split_text)):
f.write("file " + f"'{idx}-{idz}.part.mp3'" + "\n")
split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3"))
f.write("file " + f"'silence.mp3'" + "\n")
os.system("ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 " +
"-i " + f"{self.path}/list.txt " +
"-c copy " + f"{self.path}/{idx}.mp3")
os.system(
"ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 "
+ "-i "
+ f"{self.path}/list.txt "
+ "-c copy "
+ f"{self.path}/{idx}.mp3"
)
try:
for i in range(0, len(split_files)):
os.unlink(split_files[i])
@ -129,13 +141,17 @@ class TTSEngine:
self.tts_module.run(text, filepath=f"{self.path}/{filename}_no_silence.mp3")
self.create_silence_mp3()
with open(f"{self.path}/{filename}.txt", 'w') as f:
with open(f"{self.path}/{filename}.txt", "w") as f:
f.write("file " + f"'{filename}_no_silence.mp3'" + "\n")
f.write("file " + f"'silence.mp3'" + "\n")
f.close()
os.system("ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 " +
"-i " + f"{self.path}/{filename}.txt " +
"-c copy " + f"{self.path}/{filename}.mp3")
os.system(
"ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 "
+ "-i "
+ f"{self.path}/{filename}.txt "
+ "-c copy "
+ f"{self.path}/{filename}.mp3"
)
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.length += clip.duration
self.last_clip_length = clip.duration
@ -153,9 +169,15 @@ class TTSEngine:
def create_silence_mp3(self):
silence_duration = settings.config["settings"]["tts"]["silence_duration"]
silence = AudioClip(make_frame=lambda t: np.sin(440 * 2 * np.pi * t), duration=silence_duration, fps=44100)
silence = AudioClip(
make_frame=lambda t: np.sin(440 * 2 * np.pi * t),
duration=silence_duration,
fps=44100,
)
silence = volumex(silence, 0)
silence.write_audiofile(f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None)
silence.write_audiofile(
f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None
)
def process_text(text: str):

@ -21,7 +21,9 @@ class pyttsx:
if voice_id == "" or voice_num == "":
voice_id = 2
voice_num = 3
raise ValueError("set pyttsx values to a valid value, switching to defaults")
raise ValueError(
"set pyttsx values to a valid value, switching to defaults"
)
else:
voice_id = int(voice_id)
voice_num = int(voice_num)
@ -32,7 +34,9 @@ class pyttsx:
voice_id = self.randomvoice()
engine = pyttsx3.init()
voices = engine.getProperty("voices")
engine.setProperty("voice", voices[voice_id].id) # changing index changes voices but ony 0 and 1 are working here
engine.setProperty(
"voice", voices[voice_id].id
) # changing index changes voices but ony 0 and 1 are working here
engine.save_to_file(text, f"{filepath}")
engine.runAndWait()

@ -39,8 +39,12 @@ class StreamlabsPolly:
voice = self.randomvoice()
else:
if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]:
raise ValueError(f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}")
voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize()
raise ValueError(
f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}"
)
voice = str(
settings.config["settings"]["tts"]["streamlabs_polly_voice"]
).capitalize()
body = {"voice": voice, "text": text, "service": "polly"}
response = requests.post(self.url, data=body)
if not check_ratelimit(response):

@ -1,11 +1,11 @@
#!/usr/bin/env python
from logging import error
import math
import sys
from logging import error
from os import name
from pathlib import Path
from subprocess import Popen
from pathlib import Path
from prawcore import ResponseException
from reddit.subreddit import get_subreddit_threads
@ -80,11 +80,15 @@ def shutdown():
if __name__ == "__main__":
assert sys.version_info >= (3, 9), "Python 3.10 or higher is required"
directory = Path().absolute()
config = settings.check_toml(f"{directory}/utils/.config.template.toml", "config.toml")
config = settings.check_toml(
f"{directory}/utils/.config.template.toml", "config.toml"
)
config is False and exit()
try:
if len(config["reddit"]["thread"]["post_id"].split("+")) > 1:
for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")):
for index, post_id in enumerate(
config["reddit"]["thread"]["post_id"].split("+")
):
index += 1
print_step(
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'
@ -103,8 +107,9 @@ if __name__ == "__main__":
print_markdown("Please check your credentials in the config.toml file")
shutdown()
except Exception as err :
print_step('Sorry, something went wrong with this test version! Try again, and feel free to report this issue at GitHub or the Discord community.')
except Exception as err:
print_step(
"Sorry, something went wrong with this test version! Try again, and feel free to report this issue at GitHub or the Discord community."
)
raise err
# todo error

@ -21,7 +21,9 @@ def get_subreddit_threads(POST_ID: str):
content = {}
if settings.config["reddit"]["creds"]["2fa"]:
print("\nEnter your two-factor authentication code from your authenticator app.\n")
print(
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
code = input("> ")
print()
pw = settings.config["reddit"]["creds"]["password"]
@ -41,7 +43,7 @@ def get_subreddit_threads(POST_ID: str):
check_for_async=False,
)
except ResponseException as e:
if e.response.status_code == 401:
if e.response.status_code == 401:
print("Invalid credentials - please check them in config.toml")
except:
print("Something went wrong...")
@ -53,7 +55,9 @@ def get_subreddit_threads(POST_ID: str):
]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
try:
subreddit = reddit.subreddit(
re.sub(r"r\/", "", input("What subreddit would you like to pull from? "))
re.sub(
r"r\/", "", input("What subreddit would you like to pull from? ")
)
# removes the r/ from the input
)
except ValueError:
@ -63,18 +67,22 @@ def get_subreddit_threads(POST_ID: str):
sub = settings.config["reddit"]["thread"]["subreddit"]
print_substep(f"Using subreddit: r/{sub} from TOML config")
subreddit_choice = sub
if str(subreddit_choice).casefold().startswith("r/"): # removes the r/ from the input
if (
str(subreddit_choice).casefold().startswith("r/")
): # removes the r/ from the input
subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit(subreddit_choice)
if POST_ID: # would only be called if there are multiple queued posts
submission = reddit.submission(id=POST_ID)
elif (
settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
):
submission = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"])
submission = reddit.submission(
id=settings.config["reddit"]["thread"]["post_id"]
)
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
@ -98,7 +106,7 @@ def get_subreddit_threads(POST_ID: str):
if settings.config["settings"]["storymodemethod"] == 1:
content["thread_post"] = posttextparser(submission.selftext)
else:
content["thread_post"] =submission.selftext
content["thread_post"] = submission.selftext
else:
for top_level_comment in submission.comments:
if isinstance(top_level_comment, MoreComments):
@ -111,23 +119,23 @@ def get_subreddit_threads(POST_ID: str):
if not sanitised or sanitised == " ":
continue
if len(top_level_comment.body) <= int(
settings.config["reddit"]["thread"]["max_comment_length"]
settings.config["reddit"]["thread"]["max_comment_length"]
):
if len(top_level_comment.body)>= int(
settings.config["reddit"]["thread"]["min_comment_length"]
):
if len(top_level_comment.body) >= int(
settings.config["reddit"]["thread"]["min_comment_length"]
):
if (
top_level_comment.author is not None
and sanitize_text(top_level_comment.body) is not None
): # if errors occur with this change to if not.
top_level_comment.author is not None
and sanitize_text(top_level_comment.body) is not None
): # if errors occur with this change to if not.
content["comments"].append(
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id,
}
)
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id,
}
)
print_substep("Received subreddit threads Successfully.", style="bold green")
return content

@ -3,7 +3,7 @@ botocore==1.27.24
gTTS==2.2.4
moviepy==1.0.3
playwright==1.23.0
praw==7.6.0
praw==7.6.1
prawcore~=2.3.0
pytube==12.1.0
requests==2.28.1

@ -12,7 +12,7 @@ subreddit = { optional = false, regex = "[_0-9a-zA-Z\\+]+$", nmin = 3, explanati
post_id = { optional = true, default = "", regex = "^((?!://|://)[+a-zA-Z0-9])*$", explanation = "Used if you want to use a specific post.", example = "urdtfx" }
max_comment_length = { default = 500, optional = false, nmin = 10, nmax = 10000, type = "int", explanation = "max number of characters a comment can have. default is 500", example = 500, oob_error = "the max comment length should be between 10 and 10000" }
min_comment_length = { default = 1, optional = true, nmin = 0, nmax = 10000, type = "int", explanation = "min_comment_length number of characters a comment can have. default is 0", example = 50, oob_error = "the max comment length should be between 1 and 100" }
post_lang = { default = "", optional = true, explanation = "The language you would like to translate to.", example = "es-cr" }
post_lang = { default = "", optional = true, explanation = "The language you would like to translate to.", example = "es-cr", options = ['','af', 'ak', 'am', 'ar', 'as', 'ay', 'az', 'be', 'bg', 'bho', 'bm', 'bn', 'bs', 'ca', 'ceb', 'ckb', 'co', 'cs', 'cy', 'da', 'de', 'doi', 'dv', 'ee', 'el', 'en', 'en-US', 'eo', 'es', 'et', 'eu', 'fa', 'fi', 'fr', 'fy', 'ga', 'gd', 'gl', 'gn', 'gom', 'gu', 'ha', 'haw', 'hi', 'hmn', 'hr', 'ht', 'hu', 'hy', 'id', 'ig', 'ilo', 'is', 'it', 'iw', 'ja', 'jw', 'ka', 'kk', 'km', 'kn', 'ko', 'kri', 'ku', 'ky', 'la', 'lb', 'lg', 'ln', 'lo', 'lt', 'lus', 'lv', 'mai', 'mg', 'mi', 'mk', 'ml', 'mn', 'mni-Mtei', 'mr', 'ms', 'mt', 'my', 'ne', 'nl', 'no', 'nso', 'ny', 'om', 'or', 'pa', 'pl', 'ps', 'pt', 'qu', 'ro', 'ru', 'rw', 'sa', 'sd', 'si', 'sk', 'sl', 'sm', 'sn', 'so', 'sq', 'sr', 'st', 'su', 'sv', 'sw', 'ta', 'te', 'tg', 'th', 'ti', 'tk', 'tl', 'tr', 'ts', 'tt', 'ug', 'uk', 'ur', 'uz', 'vi', 'xh', 'yi', 'yo', 'zh-CN', 'zh-TW', 'zu'] }
min_comments = { default = 20, optional = false, nmin = 10, type = "int", explanation = "The minimum number of comments a post should have to be included. default is 20", example = 29, oob_error = "the minimum number of comments should be between 15 and 999999" }
@ -22,8 +22,9 @@ theme = { optional = false, default = "dark", example = "light", options = ["dar
times_to_run = { optional = false, default = 1, example = 2, explanation = "Used if you want to run multiple times. Set to an int e.g. 4 or 29 or 1", type = "int", nmin = 1, oob_error = "It's very hard to run something less than once." }
opacity = { optional = false, default = 0.9, example = 0.8, explanation = "Sets the opacity of the comments when overlayed over the background", type = "float", nmin = 0, nmax = 1, oob_error = "The opacity HAS to be between 0 and 1", input_error = "The opacity HAS to be a decimal number between 0 and 1" }
transition = { optional = true, default = 0.2, example = 0.2, explanation = "Sets the transition time (in seconds) between the comments. Set to 0 if you want to disable it.", type = "float", nmin = 0, nmax = 2, oob_error = "The transition HAS to be between 0 and 2", input_error = "The opacity HAS to be a decimal number between 0 and 2" }
storymode = { optional = true, type = "bool", default = false, example = false, options = [true, false,], explanation = "Only read out title and post content, not yet implemented" }
storymodemethod= { optional = true, default = 1, example = 1, explanation = "style of video for storymode. Set to an 0 for single picture display in whole video set 1 for fancy looking video ", type = "int", nmin = 0, oob_error = "It's very hard to run something less than once." }
storymode = { optional = true, type = "bool", default = false, example = false, options = [true, false,], explanation = "Only read out title and post content, great for subreddits with stories" }
storymodemethod= { optional = true, default = 1, example = 1, explanation = "Style that's used for the storymode. Set to 0 for single picture display in whole video, set to 1 for fancy looking video ", type = "int", nmin = 0, oob_error = "It's very hard to run something less than once.", options = [0, 1] }
fps = { optional = false, default = 30, example = 30, explanation = "Sets the FPS of the video, 30 is default for best performance. 60 FPS is smoother.", type = "int", nmin = 1, nmax = 60, oob_error = "The FPS HAS to be between 1 and 60" }
[settings.background]

@ -14,7 +14,9 @@ def cleanup(id) -> int:
"""
if exists("./assets/temp"):
count = 0
files = [f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()]
files = [
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
count += len(files)
for f in files:
os.remove(f)

@ -50,12 +50,19 @@ def handle_input(
optional=False,
):
if optional:
console.print(message + "\n[green]This is an optional value. Do you want to skip it? (y/n)")
console.print(
message
+ "\n[green]This is an optional value. Do you want to skip it? (y/n)"
)
if input().casefold().startswith("y"):
return default if default is not NotImplemented else ""
if default is not NotImplemented:
console.print(
"[green]" + message + '\n[blue bold]The default value is "' + str(default) + '"\nDo you want to use it?(y/n)'
"[green]"
+ message
+ '\n[blue bold]The default value is "'
+ str(default)
+ '"\nDo you want to use it?(y/n)'
)
if input().casefold().startswith("y"):
return default
@ -68,7 +75,9 @@ def handle_input(
if check_type is not False:
try:
user_input = check_type(user_input)
if (nmin is not None and user_input < nmin) or (nmax is not None and user_input > nmax):
if (nmin is not None and user_input < nmin) or (
nmax is not None and user_input > nmax
):
# FAILSTATE Input out of bounds
console.print("[red]" + oob_error)
continue
@ -78,13 +87,19 @@ def handle_input(
console.print("[red]" + err_message)
continue
elif match != "" and re.match(match, user_input) is None:
console.print("[red]" + err_message + "\nAre you absolutely sure it's correct?(y/n)")
console.print(
"[red]"
+ err_message
+ "\nAre you absolutely sure it's correct?(y/n)"
)
if input().casefold().startswith("y"):
break
continue
else:
# FAILSTATE Input STRING out of bounds
if (nmin is not None and len(user_input) < nmin) or (nmax is not None and len(user_input) > nmax):
if (nmin is not None and len(user_input) < nmin) or (
nmax is not None and len(user_input) > nmax
):
console.print("[red bold]" + oob_error)
continue
break # SUCCESS Input STRING in bounds
@ -98,8 +113,20 @@ def handle_input(
isinstance(eval(user_input), check_type)
return check_type(user_input)
except:
console.print("[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + ".")
console.print(
"[red bold]"
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
)
continue
if user_input in options:
return user_input
console.print("[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + ".")
console.print(
"[red bold]"
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
)

@ -5,63 +5,71 @@ from PIL import Image, ImageDraw, ImageFont
from rich.progress import track
def draw_multiple_line_text(image, text, font, text_color,padding ,wrap=50):
'''
def draw_multiple_line_text(image, text, font, text_color, padding, wrap=50):
"""
Draw multiline text over given image
'''
"""
draw = ImageDraw.Draw(image)
Fontperm= font.getsize(text)
Fontperm = font.getsize(text)
image_width, image_height = image.size
lines = textwrap.wrap(text, width=wrap)
y=(image_height/2)-(((Fontperm[1]+(len(lines)*padding)/len(lines))*len(lines))/2)
y = (image_height / 2) - (
((Fontperm[1] + (len(lines) * padding) / len(lines)) * len(lines)) / 2
)
for line in lines:
line_width, line_height = font.getsize(line)
draw.text(((image_width - line_width) / 2, y),
line, font=font, fill=text_color)
draw.text(((image_width - line_width) / 2, y), line, font=font, fill=text_color)
y += line_height + padding
#theme=bgcolor,reddit_obj=reddit_object,txtclr=txtcolor
def imagemaker( theme,
reddit_obj:dict,
txtclr,
padding=5
):
'''
Render Images for video
'''
title=reddit_obj['thread_title']
texts=reddit_obj['thread_post']
# theme=bgcolor,reddit_obj=reddit_object,txtclr=txtcolor
def imagemaker(theme, reddit_obj: dict, txtclr, padding=5):
"""
Render Images for video
"""
title = reddit_obj["thread_title"]
texts = reddit_obj["thread_post"]
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
tfont=ImageFont.truetype("fonts\\Roboto-Bold.ttf",27) # for title
font=ImageFont.truetype("fonts\\Roboto-Regular.ttf", 20)# for despcription|comments
size=(500,176)
image =Image.new('RGBA',size,theme)
tfont = ImageFont.truetype("fonts\\Roboto-Bold.ttf", 27) # for title
font = ImageFont.truetype(
"fonts\\Roboto-Regular.ttf", 20
) # for despcription|comments
size = (500, 176)
image = Image.new("RGBA", size, theme)
draw = ImageDraw.Draw(image)
# for title
if len(title)>40:
draw_multiple_line_text(image, title,tfont,txtclr ,padding,wrap=30)
if len(title) > 40:
draw_multiple_line_text(image, title, tfont, txtclr, padding, wrap=30)
else:
Fontperm= tfont.getsize(title)
draw.text(((image.size[0]-Fontperm[0])/2,(image.size[1]-Fontperm[1])/2),font=tfont,text=title) #(image.size[1]/2)-(Fontperm[1]/2)
image.save(f'assets/temp/{id}/png/title.png')
Fontperm = tfont.getsize(title)
draw.text(
((image.size[0] - Fontperm[0]) / 2, (image.size[1] - Fontperm[1]) / 2),
font=tfont,
text=title,
) # (image.size[1]/2)-(Fontperm[1]/2)
image.save(f"assets/temp/{id}/png/title.png")
# for comment|description
for idx,text in track(enumerate(texts),"Rendering Image"):
for idx, text in track(enumerate(texts), "Rendering Image"):
image =Image.new('RGBA',size,theme)
image = Image.new("RGBA", size, theme)
draw = ImageDraw.Draw(image)
if len(text)>50:
draw_multiple_line_text(image, text,font, txtclr,padding)
if len(text) > 50:
draw_multiple_line_text(image, text, font, txtclr, padding)
else:
Fontperm= font.getsize(text)
draw.text(((image.size[0]-Fontperm[0])/2,(image.size[1]-Fontperm[1])/2),font=font,text=text) #(image.size[1]/2)-(Fontperm[1]/2)
image.save(f'assets/temp/{id}/png/img{idx}.png')
Fontperm = font.getsize(text)
draw.text(
((image.size[0] - Fontperm[0]) / 2, (image.size[1] - Fontperm[1]) / 2),
font=font,
text=text,
) # (image.size[1]/2)-(Fontperm[1]/2)
image.save(f"assets/temp/{id}/png/img{idx}.png")

@ -1,28 +1,30 @@
import re
from subprocess import Popen
import spacy
from utils.console import print_step
from utils.voice import sanitize_text
#working good
def posttextparser(obj):
text=re.sub("\n", "", obj )
# working good
def posttextparser(obj):
text = re.sub("\n", "", obj)
try:
nlp=spacy.load('en_core_web_sm')
except OSError :
print("dev:please dowload the model with this command \npython -m spacy download en")
nlp = spacy.load('en_core_web_sm')
except OSError:
print_step("The spacy model can't load. You need to install it with \npython -m spacy download en")
exit()
doc= nlp(text)
doc = nlp(text)
newtext:list = []
newtext: list = []
# to check for space str
# to check for space str
for line in doc.sents:
if sanitize_text(line.text):
if sanitize_text(line.text):
newtext.append(line.text)
# print(line)
return newtext
return newtext

@ -34,12 +34,17 @@ def check(value, checks, name):
except:
incorrect = True
if not incorrect and "options" in checks and value not in checks["options"]: # FAILSTATE Value is not one of the options
if (
not incorrect and "options" in checks and value not in checks["options"]
): # FAILSTATE Value is not one of the options
incorrect = True
if (
not incorrect
and "regex" in checks
and ((isinstance(value, str) and re.match(checks["regex"], value) is None) or not isinstance(value, str))
and (
(isinstance(value, str) and re.match(checks["regex"], value) is None)
or not isinstance(value, str)
)
): # FAILSTATE Value doesn't match regex, or has regex but is not a string.
incorrect = True
@ -48,7 +53,11 @@ def check(value, checks, name):
and not hasattr(value, "__iter__")
and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or ("nmax" in checks and checks["nmax"] is not None and value > checks["nmax"])
or (
"nmax" in checks
and checks["nmax"] is not None
and value > checks["nmax"]
)
)
):
incorrect = True
@ -56,8 +65,16 @@ def check(value, checks, name):
not incorrect
and hasattr(value, "__iter__")
and (
("nmin" in checks and checks["nmin"] is not None and len(value) < checks["nmin"])
or ("nmax" in checks and checks["nmax"] is not None and len(value) > checks["nmax"])
(
"nmin" in checks
and checks["nmin"] is not None
and len(value) < checks["nmin"]
)
or (
"nmax" in checks
and checks["nmax"] is not None
and len(value) > checks["nmax"]
)
)
):
incorrect = True
@ -65,9 +82,15 @@ def check(value, checks, name):
if incorrect:
value = handle_input(
message=(
(("[blue]Example: " + str(checks["example"]) + "\n") if "example" in checks else "")
(
("[blue]Example: " + str(checks["example"]) + "\n")
if "example" in checks
else ""
)
+ "[red]"
+ ("Non-optional ", "Optional ")["optional" in checks and checks["optional"] is True]
+ ("Non-optional ", "Optional ")[
"optional" in checks and checks["optional"] is True
]
)
+ "[#C0CAF5 bold]"
+ str(name)
@ -79,7 +102,9 @@ def check(value, checks, name):
err_message=get_check_value("input_error", "Incorrect input"),
nmin=get_check_value("nmin", None),
nmax=get_check_value("nmax", None),
oob_error=get_check_value("oob_error", "Input out of bounds(Value too high/low/long/short)"),
oob_error=get_check_value(
"oob_error", "Input out of bounds(Value too high/low/long/short)"
),
options=get_check_value("options", None),
optional=get_check_value("optional", False),
)
@ -106,7 +131,9 @@ def check_toml(template_file, config_file) -> Tuple[bool, Dict]:
try:
template = toml.load(template_file)
except Exception as error:
console.print(f"[red bold]Encountered error when trying to to load {template_file}: {error}")
console.print(
f"[red bold]Encountered error when trying to to load {template_file}: {error}"
)
return False
try:
config = toml.load(config_file)

@ -19,7 +19,9 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
if not exists("./video_creation/data/videos.json"):
with open("./video_creation/data/videos.json", "w+") as f:
json.dump([], f)
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw)
for submission in submissions:
if already_done(done_videos, submission):
@ -34,14 +36,16 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
if submission.stickied:
print_substep("This post was pinned by moderators. Skipping...")
continue
if submission.num_comments <= int(settings.config["reddit"]["thread"]["min_comments"]):
if submission.num_comments <= int(
settings.config["reddit"]["thread"]["min_comments"]
):
print_substep(
f'This post has under the specified minimum of comments ({settings.config["reddit"]["thread"]["min_comments"]}). Skipping...'
)
continue
if settings.config['settings']['storymode'] :
if not submission.is_self :
continue
if settings.config["settings"]["storymode"]:
if not submission.is_self:
continue
return submission
print("all submissions have been done going by top submission order")
VALID_TIME_FILTERS = [
@ -57,7 +61,10 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
print("all time filters have been checked you absolute madlad ")
return get_subreddit_undone(
subreddit.top(time_filter=VALID_TIME_FILTERS[index], limit=(50 if int(index) == 0 else index + 1 * 50)),
subreddit.top(
time_filter=VALID_TIME_FILTERS[index],
limit=(50 if int(index) == 0 else index + 1 * 50),
),
subreddit,
times_checked=index,
) # all the videos in hot have already been done

@ -3,13 +3,19 @@ import requests
from utils.console import print_step
def checkversion(__VERSION__:str):
response = requests.get("https://api.github.com/repos/elebumm/RedditVideoMakerBot/releases/latest")
def checkversion(__VERSION__: str):
response = requests.get(
"https://api.github.com/repos/elebumm/RedditVideoMakerBot/releases/latest"
)
latestversion = response.json()["tag_name"]
if __VERSION__ == latestversion:
print_step(f"You are using the newest version ({__VERSION__}) of the bot")
return True
else:
elif __VERSION__ < latestversion:
print_step(
f"You are using an older version ({__VERSION__}) of the bot. Download the newest version ({latestversion}) from https://github.com/elebumm/RedditVideoMakerBot/releases/latest"
)
else:
print_step(
f"Welcome to the test version ({__VERSION__}) of the bot. Thanks for testing and feel free to report any bugs you find."
)

@ -36,15 +36,28 @@ class Video:
im.save(path)
return ImageClip(path)
def add_watermark(self, text, redditid, opacity=0.5, duration: int | float = 5, position: Tuple = (0.7, 0.9), fontsize=15):
def add_watermark(
self,
text,
redditid,
opacity=0.5,
duration: int | float = 5,
position: Tuple = (0.7, 0.9),
fontsize=15,
):
compensation = round(
(position[0] / ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])),
(
position[0]
/ ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])
),
ndigits=2,
)
position = (compensation, position[1])
# print(f'{compensation=}')
# print(f'{position=}')
img_clip = self._create_watermark(text, redditid, fontsize=fontsize, opacity=opacity)
img_clip = self._create_watermark(
text, redditid, fontsize=fontsize, opacity=opacity
)
img_clip = img_clip.set_opacity(opacity).set_duration(duration)
img_clip = img_clip.set_position(
position, relative=True

@ -19,7 +19,9 @@ def check_done(
Returns:
Submission|None: Reddit object in args
"""
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw)
for video in done_videos:
if video["id"] == str(redditobj):
@ -33,7 +35,9 @@ def check_done(
return redditobj
def save_data(subreddit: str, filename: str, reddit_title: str, reddit_id: str, credit: str):
def save_data(
subreddit: str, filename: str, reddit_title: str, reddit_id: str, credit: str
):
"""Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json
Args:

@ -40,7 +40,9 @@ def sleep_until(time):
if sys.version_info[0] >= 3 and time.tzinfo:
end = time.astimezone(timezone.utc).timestamp()
else:
zoneDiff = pytime.time() - (datetime.now() - datetime(1970, 1, 1)).total_seconds()
zoneDiff = (
pytime.time() - (datetime.now() - datetime(1970, 1, 1)).total_seconds()
)
end = (time - datetime(1970, 1, 1)).total_seconds() + zoneDiff
# Type check

@ -45,7 +45,9 @@ def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int
def get_background_config():
"""Fetch the background/s configuration"""
try:
choice = str(settings.config["settings"]["background"]["background_choice"]).casefold()
choice = str(
settings.config["settings"]["background"]["background_choice"]
).casefold()
except AttributeError:
print_substep("No background selected. Picking random background'")
choice = None
@ -65,16 +67,20 @@ def download_background(background_config: Tuple[str, str, str, Any]):
uri, filename, credit, _ = background_config
if Path(f"assets/backgrounds/{credit}-{filename}").is_file():
return
print_step("We need to download the backgrounds videos. they are fairly large but it's only done once. 😎")
print_step(
"We need to download the backgrounds videos. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds videos... please be patient 🙏 ")
print_substep(f"Downloading {filename} from {uri}")
YouTube(uri, on_progress_callback=on_progress).streams.filter(res="1080p").first().download(
"assets/backgrounds", filename=f"{credit}-{filename}"
)
YouTube(uri, on_progress_callback=on_progress).streams.filter(
res="1080p"
).first().download("assets/backgrounds", filename=f"{credit}-{filename}")
print_substep("Background video downloaded successfully! 🎉", style="bold green")
def chop_background_video(background_config: Tuple[str, str, str, Any], video_length: int, reddit_object: dict):
def chop_background_video(
background_config: Tuple[str, str, str, Any], video_length: int, reddit_object: dict
):
"""Generates the background footage to be used in the video and writes it to assets/temp/background.mp4
Args:

@ -21,7 +21,7 @@ from utils.videos import save_data
from utils import settings
console = Console()
W, H = 1080,1920
W, H = 1080, 1920
def name_normalize(name: str) -> str:
@ -75,19 +75,26 @@ def make_final_video(
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
# Gather all audio clips
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [AudioFileClip(f"assets/temp/{id}/mp3/title.mp3")]
audio_clips.insert(1,AudioFileClip(f"assets/temp/{id}/mp3/postaudio.mp3"))
audio_clips.insert(1, AudioFileClip(f"assets/temp/{id}/mp3/postaudio.mp3"))
elif settings.config["settings"]["storymodemethod"] == 1:
audio_clips = [AudioFileClip(f"assets/temp/{id}/mp3/postaudio-{i}.mp3") \
for i in track(range(number_of_clips + 1), "Collecting the audio files...")]
audio_clips = [
AudioFileClip(f"assets/temp/{id}/mp3/postaudio-{i}.mp3")
for i in track(
range(number_of_clips + 1), "Collecting the audio files..."
)
]
audio_clips.insert(0, AudioFileClip(f"assets/temp/{id}/mp3/title.mp3"))
else:
audio_clips = [AudioFileClip(f"assets/temp/{id}/mp3/{i}.mp3") for i in range(number_of_clips)]
audio_clips = [
AudioFileClip(f"assets/temp/{id}/mp3/{i}.mp3")
for i in range(number_of_clips)
]
audio_clips.insert(0, AudioFileClip(f"assets/temp/{id}/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
@ -97,7 +104,9 @@ def make_final_video(
image_clips = []
# Gather all images
new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity)
new_transition = 0 if transition is None or float(transition) > 2 else float(transition)
new_transition = (
0 if transition is None or float(transition) > 2 else float(transition)
)
image_clips.insert(
0,
ImageClip(f"assets/temp/{id}/png/title.png")
@ -109,7 +118,7 @@ def make_final_video(
)
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
image_clips.insert(
image_clips.insert(
1,
ImageClip(f"assets/temp/{id}/png/story_content.png")
.set_duration(audio_clips[1].duration)
@ -118,16 +127,18 @@ def make_final_video(
.set_opacity(float(opacity)),
)
elif settings.config["settings"]["storymodemethod"] == 1:
for i in track(range(0, number_of_clips+1),"Collecting the image files..."):
image_clips.append(
ImageClip(f"assets/temp/{id}/png/img{i}.png")
.set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
# .crossfadein(new_transition)
# .crossfadeout(new_transition)
)
else :
for i in track(
range(0, number_of_clips + 1), "Collecting the image files..."
):
image_clips.append(
ImageClip(f"assets/temp/{id}/png/img{i}.png")
.set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
# .crossfadein(new_transition)
# .crossfadeout(new_transition)
)
else:
for i in range(0, number_of_clips):
image_clips.append(
ImageClip(f"assets/temp/{id}/png/comment_{i}.png")
@ -137,9 +148,7 @@ def make_final_video(
.crossfadein(new_transition)
.crossfadeout(new_transition)
)
img_clip_pos = background_config[3]
image_concat = concatenate_videoclips(image_clips).set_position(
img_clip_pos
@ -166,30 +175,30 @@ def make_final_video(
# if
final = Video(final).add_watermark(
text=f"Background credit: {background_config[2]}", opacity=0.4, redditid=reddit_obj
)
)
final.write_videofile(
f"assets/temp/{id}/temp.mp4",
fps=30,
fps=int(settings.config["settings"]["fps"]),
audio_codec="aac",
audio_bitrate="192k",
verbose=False,
threads=multiprocessing.cpu_count(),
# preset="ultrafast" #for testings
)
ffmpeg_extract_subclip(
f"assets/temp/{id}/temp.mp4",
0,
length,
targetname=f"results/{subreddit}/{filename}",
)
)
save_data(subreddit, filename, title, idx, background_config[2])
print_step("Removing temporary files 🗑")
cleanups = cleanup(id)
print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!")
print_step(f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}')
print_step(
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'
)

@ -4,8 +4,7 @@ from pathlib import Path
from typing import Dict
import translators as ts
from playwright.async_api import \
async_playwright # pylint: disable=unused-import
from playwright.async_api import async_playwright # pylint: disable=unused-import
from playwright.sync_api import ViewportSize, sync_playwright
from rich.progress import track
@ -16,9 +15,6 @@ from utils.imagenarator import imagemaker
# do not remove the above line
def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
"""Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png
@ -30,20 +26,20 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
print_step("Downloading screenshots of reddit posts...")
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
# ! Make sure the reddit screenshots folder exists
Path(f"assets/temp/{id}/png").mkdir(parents=True, exist_ok=True)
def download(cookie_file,num=None):
screenshot_num=num
def download(cookie_file, num=None):
screenshot_num = num
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
browser = p.chromium.launch() #headless=False #to check for chrome view
browser = p.chromium.launch() # headless=False #to check for chrome view
context = browser.new_context()
cookies = json.load(cookie_file)
context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot
page = context.new_page()
@ -77,17 +73,19 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
else:
print_substep("Skipping translation...")
postcontentpath = f"assets/temp/{id}/png/title.png"
page.locator('[data-test-id="post-content"]').screenshot(path=postcontentpath)
page.locator('[data-test-id="post-content"]').screenshot(
path=postcontentpath
)
if settings.config["settings"]["storymode"]:
try : #new change
try: # new change
page.locator('[data-click-id="text"]').first.screenshot(
path=f"assets/temp/{id}/png/story_content.png"
)
except:
exit
if not settings.config["settings"]["storymode"] :
if not settings.config["settings"]["storymode"]:
for idx, comment in enumerate(
track(reddit_object["comments"], "Downloading screenshots...")
):
@ -105,7 +103,9 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
if settings.config["reddit"]["thread"]["post_lang"]:
comment_tl = ts.google(
comment["comment_body"],
to_language=settings.config["reddit"]["thread"]["post_lang"],
to_language=settings.config["reddit"]["thread"][
"post_lang"
],
)
page.evaluate(
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',
@ -121,20 +121,28 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
print("TimeoutError: Skipping screenshot...")
continue
print_substep("Screenshots downloaded Successfully.", style="bold green")
# story=False
theme=settings.config["settings"]["theme"]
theme = settings.config["settings"]["theme"]
if theme == "dark":
cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8")
bgcolor=(33,33,36,255)
txtcolor=(240,240,240)
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240)
else:
cookie_file = open("./video_creation/data/cookie-light-mode.json", encoding="utf-8")
bgcolor=(255,255,255,255)
txtcolor=(0,0,0)
if settings.config["settings"]["storymode"] :
if settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
imagemaker(theme=bgcolor,reddit_obj=reddit_object,txtclr=txtcolor)
if settings.config["settings"]["storymodemethod"] == 0 or not settings.config["settings"]["storymode"] :
download(cookie_file, screenshot_num)
cookie_file = open(
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
bgcolor = (255, 255, 255, 255)
txtcolor = (0, 0, 0)
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
imagemaker(theme=bgcolor, reddit_obj=reddit_object, txtclr=txtcolor)
if (
settings.config["settings"]["storymodemethod"] == 0
or not settings.config["settings"]["storymode"]
):
download(cookie_file, screenshot_num)

@ -36,7 +36,9 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
voice = settings.config["settings"]["tts"]["voice_choice"]
if str(voice).casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, voice), reddit_obj
)
else:
while True:
print_step("Please choose one of the following TTS providers: ")
@ -45,12 +47,18 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break
print("Unknown Choice")
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj)
text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, choice), reddit_obj
)
return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key):
return next(
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
(
value
for dict_key, value in input_dict.items()
if dict_key.lower() == key.lower()
),
None,
)

Loading…
Cancel
Save