reformatted

pull/1145/head
Jason 3 years ago
parent c10a5d3c2f
commit f65a797ec5

@ -64,9 +64,7 @@ noneng = [
class TikTok: # TikTok Text-to-Speech Wrapper class TikTok: # TikTok Text-to-Speech Wrapper
def __init__(self): def __init__(self):
self.URI_BASE = ( self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
"https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
)
self.max_chars = 300 self.max_chars = 300
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng} self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng}
@ -77,10 +75,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper
voice = ( voice = (
self.randomvoice() self.randomvoice()
if random_voice if random_voice
else ( else (settings.config["settings"]["tts"]["tiktok_voice"] or random.choice(self.voices["human"]))
settings.config["settings"]["tts"]["tiktok_voice"]
or random.choice(self.voices["human"])
)
) )
try: try:
r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")

@ -39,15 +39,11 @@ class AWSPolly:
voice = self.randomvoice() voice = self.randomvoice()
else: else:
if not settings.config["settings"]["tts"]["aws_polly_voice"]: if not settings.config["settings"]["tts"]["aws_polly_voice"]:
raise ValueError( raise ValueError(f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}")
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}"
)
voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize() voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize()
try: try:
# Request speech synthesis # Request speech synthesis
response = polly.synthesize_speech( response = polly.synthesize_speech(Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural")
Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural"
)
except (BotoCoreError, ClientError) as error: except (BotoCoreError, ClientError) as error:
# The service returned an error, exit gracefully # The service returned an error, exit gracefully
print(error) print(error)

@ -71,9 +71,7 @@ class TTSEngine:
self.length -= self.last_clip_length self.length -= self.last_clip_length
idx -= 1 idx -= 1
break break
if ( if len(comment["comment_body"]) > self.tts_module.max_chars: # Split the comment if it is too long
len(comment["comment_body"]) > self.tts_module.max_chars
): # Split the comment if it is too long
self.split_post(comment["comment_body"], idx) # Split the comment self.split_post(comment["comment_body"], idx) # Split the comment
else: # If the comment is not too long, just call the tts engine else: # If the comment is not too long, just call the tts engine
self.call_tts(f"{idx}", process_text(comment["comment_body"])) self.call_tts(f"{idx}", process_text(comment["comment_body"]))
@ -84,10 +82,7 @@ class TTSEngine:
def split_post(self, text: str, idx: int): def split_post(self, text: str, idx: int):
split_files = [] split_files = []
split_text = [ split_text = [
x.group().strip() x.group().strip() for x in re.finditer(r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text)
for x in re.finditer(
r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text
)
] ]
offset = 0 offset = 0
for idy, text_cut in enumerate(split_text): for idy, text_cut in enumerate(split_text):

@ -32,9 +32,7 @@ class pyttsx:
voice_id = self.randomvoice() voice_id = self.randomvoice()
engine = pyttsx3.init() engine = pyttsx3.init()
voices = engine.getProperty("voices") voices = engine.getProperty("voices")
engine.setProperty( engine.setProperty("voice", voices[voice_id].id) # changing index changes voices but ony 0 and 1 are working here
"voice", voices[voice_id].id
) # changing index changes voices but ony 0 and 1 are working here
engine.save_to_file(text, f"{filepath}") engine.save_to_file(text, f"{filepath}")
engine.runAndWait() engine.runAndWait()

@ -39,9 +39,7 @@ class StreamlabsPolly:
voice = self.randomvoice() voice = self.randomvoice()
else: else:
if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]: if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]:
raise ValueError( raise ValueError(f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}")
f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}"
)
voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize() voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize()
body = {"voice": voice, "text": text, "service": "polly"} body = {"voice": voice, "text": text, "service": "polly"}
response = requests.post(self.url, data=body) response = requests.post(self.url, data=body)

@ -74,6 +74,7 @@ def shutdown():
print("Exiting...") print("Exiting...")
exit() exit()
if __name__ == "__main__": if __name__ == "__main__":
assert sys.version_info >= (3, 9), "Python 3.10 or higher is required" assert sys.version_info >= (3, 9), "Python 3.10 or higher is required"
config = settings.check_toml("utils/.config.template.toml", "config.toml") config = settings.check_toml("utils/.config.template.toml", "config.toml")

@ -103,12 +103,9 @@ def get_subreddit_threads(POST_ID: str):
sanitised = sanitize_text(top_level_comment.body) sanitised = sanitize_text(top_level_comment.body)
if not sanitised or sanitised == " ": if not sanitised or sanitised == " ":
continue continue
if len(top_level_comment.body) <= int( if len(top_level_comment.body) <= int(settings.config["reddit"]["thread"]["max_comment_length"]):
settings.config["reddit"]["thread"]["max_comment_length"]
):
if ( if (
top_level_comment.author is not None top_level_comment.author is not None and sanitize_text(top_level_comment.body) is not None
and sanitize_text(top_level_comment.body) is not None
): # if errors occur with this change to if not. ): # if errors occur with this change to if not.
content["comments"].append( content["comments"].append(
{ {

@ -55,11 +55,7 @@ def handle_input(
return default if default is not NotImplemented else "" return default if default is not NotImplemented else ""
if default is not NotImplemented: if default is not NotImplemented:
console.print( console.print(
"[green]" "[green]" + message + '\n[blue bold]The default value is "' + str(default) + '"\nDo you want to use it?(y/n)'
+ message
+ '\n[blue bold]The default value is "'
+ str(default)
+ '"\nDo you want to use it?(y/n)'
) )
if input().casefold().startswith("y"): if input().casefold().startswith("y"):
return default return default
@ -72,9 +68,7 @@ def handle_input(
if check_type is not False: if check_type is not False:
try: try:
user_input = check_type(user_input) user_input = check_type(user_input)
if (nmin is not None and user_input < nmin) or ( if (nmin is not None and user_input < nmin) or (nmax is not None and user_input > nmax):
nmax is not None and user_input > nmax
):
# FAILSTATE Input out of bounds # FAILSTATE Input out of bounds
console.print("[red]" + oob_error) console.print("[red]" + oob_error)
continue continue
@ -90,9 +84,7 @@ def handle_input(
continue continue
else: else:
# FAILSTATE Input STRING out of bounds # FAILSTATE Input STRING out of bounds
if (nmin is not None and len(user_input) < nmin) or ( if (nmin is not None and len(user_input) < nmin) or (nmax is not None and len(user_input) > nmax):
nmax is not None and len(user_input) > nmax
):
console.print("[red bold]" + oob_error) console.print("[red bold]" + oob_error)
continue continue
break # SUCCESS Input STRING in bounds break # SUCCESS Input STRING in bounds
@ -106,16 +98,8 @@ def handle_input(
isinstance(eval(user_input), check_type) isinstance(eval(user_input), check_type)
return check_type(user_input) return check_type(user_input)
except: except:
console.print( console.print("[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + ".")
"[red bold]"
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
)
continue continue
if user_input in options: if user_input in options:
return user_input return user_input
console.print( console.print("[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + ".")
"[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + "."
)

@ -34,17 +34,12 @@ def check(value, checks, name):
except: except:
incorrect = True incorrect = True
if ( if not incorrect and "options" in checks and value not in checks["options"]: # FAILSTATE Value is not one of the options
not incorrect and "options" in checks and value not in checks["options"]
): # FAILSTATE Value is not one of the options
incorrect = True incorrect = True
if ( if (
not incorrect not incorrect
and "regex" in checks and "regex" in checks
and ( and ((isinstance(value, str) and re.match(checks["regex"], value) is None) or not isinstance(value, str))
(isinstance(value, str) and re.match(checks["regex"], value) is None)
or not isinstance(value, str)
)
): # FAILSTATE Value doesn't match regex, or has regex but is not a string. ): # FAILSTATE Value doesn't match regex, or has regex but is not a string.
incorrect = True incorrect = True
@ -84,9 +79,7 @@ def check(value, checks, name):
err_message=get_check_value("input_error", "Incorrect input"), err_message=get_check_value("input_error", "Incorrect input"),
nmin=get_check_value("nmin", None), nmin=get_check_value("nmin", None),
nmax=get_check_value("nmax", None), nmax=get_check_value("nmax", None),
oob_error=get_check_value( oob_error=get_check_value("oob_error", "Input out of bounds(Value too high/low/long/short)"),
"oob_error", "Input out of bounds(Value too high/low/long/short)"
),
options=get_check_value("options", None), options=get_check_value("options", None),
optional=get_check_value("optional", False), optional=get_check_value("optional", False),
) )

@ -54,9 +54,7 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
print("all time filters have been checked you absolute madlad ") print("all time filters have been checked you absolute madlad ")
return get_subreddit_undone( return get_subreddit_undone(
subreddit.top( subreddit.top(time_filter=VALID_TIME_FILTERS[index], limit=(50 if int(index) == 0 else index + 1 * 50)),
time_filter=VALID_TIME_FILTERS[index], limit=(50 if int(index) == 0 else index + 1 * 50)
),
subreddit, subreddit,
times_checked=index, times_checked=index,
) # all the videos in hot have already been done ) # all the videos in hot have already been done

@ -4,9 +4,7 @@ from utils.console import print_step
def checkversion(__VERSION__): def checkversion(__VERSION__):
response = requests.get( response = requests.get("https://api.github.com/repos/elebumm/RedditVideoMakerBot/releases/latest")
"https://api.github.com/repos/elebumm/RedditVideoMakerBot/releases/latest"
)
latestversion = response.json()["tag_name"] latestversion = response.json()["tag_name"]
if __VERSION__ == latestversion: if __VERSION__ == latestversion:
print_step(f"You are using the newest version ({__VERSION__}) of the bot") print_step(f"You are using the newest version ({__VERSION__}) of the bot")

@ -36,9 +36,7 @@ class Video:
im.save(path) im.save(path)
return ImageClip(path) return ImageClip(path)
def add_watermark( def add_watermark(self, text, redditid, opacity=0.5, duration: int | float = 5, position: Tuple = (0.7, 0.9), fontsize=15):
self, text, redditid, opacity=0.5, duration: int | float = 5, position: Tuple = (0.7, 0.9), fontsize=15
):
compensation = round( compensation = round(
(position[0] / ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])), (position[0] / ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])),
ndigits=2, ndigits=2,

@ -51,9 +51,7 @@ def download_background(background_config: Tuple[str, str, str, Any]):
uri, filename, credit, _ = background_config uri, filename, credit, _ = background_config
if Path(f"assets/backgrounds/{credit}-{filename}").is_file(): if Path(f"assets/backgrounds/{credit}-{filename}").is_file():
return return
print_step( print_step("We need to download the backgrounds videos. they are fairly large but it's only done once. 😎")
"We need to download the backgrounds videos. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds videos... please be patient 🙏 ") print_substep("Downloading the backgrounds videos... please be patient 🙏 ")
print_substep(f"Downloading {filename} from {uri}") print_substep(f"Downloading {filename} from {uri}")
YouTube(uri, on_progress_callback=on_progress).streams.filter(res="1080p").first().download( YouTube(uri, on_progress_callback=on_progress).streams.filter(res="1080p").first().download(

@ -1 +1,10 @@
[] [
{
"subreddit": "AskReddit",
"id": "w2orm0",
"time": "1658271429",
"background_credit": "No Copyright Gameplay",
"reddit_title": "What comedian has never ever made you laugh",
"filename": "What comedian has never ever made you laugh.mp4"
}
]

@ -119,9 +119,7 @@ def make_final_video(
# ) # )
# else: story mode stuff # else: story mode stuff
img_clip_pos = background_config[3] img_clip_pos = background_config[3]
image_concat = concatenate_videoclips(image_clips).set_position( image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos) # note transition kwarg for delay in imgs
img_clip_pos
) # note transition kwarg for delay in imgs
image_concat.audio = audio_composite image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat]) final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
@ -141,9 +139,7 @@ def make_final_video(
# # lowered_audio = audio_background.multiply_volume( # todo get this to work # # lowered_audio = audio_background.multiply_volume( # todo get this to work
# # VOLUME_MULTIPLIER) # lower volume by background_audio_volume, use with fx # # VOLUME_MULTIPLIER) # lower volume by background_audio_volume, use with fx
# final.set_audio(final_audio) # final.set_audio(final_audio)
final = Video(final).add_watermark( final = Video(final).add_watermark(text=f"Background credit: {background_config[2]}", opacity=0.4, redditid=reddit_obj)
text=f"Background credit: {background_config[2]}", opacity=0.4, redditid=reddit_obj
)
final.write_videofile( final.write_videofile(
f"assets/temp/{id}/temp.mp4", f"assets/temp/{id}/temp.mp4",
fps=30, fps=30,
@ -164,6 +160,4 @@ def make_final_video(
print_substep(f"Removed {cleanups} temporary files 🗑") print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!") print_substep("See result in the results folder!")
print_step( print_step(f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}')
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'
)

@ -51,9 +51,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
page.wait_for_load_state() # Wait for page to fully load page.wait_for_load_state() # Wait for page to fully load
if page.locator('[data-click-id="text"] button').is_visible(): if page.locator('[data-click-id="text"] button').is_visible():
page.locator( page.locator('[data-click-id="text"] button').click() # Remove "Click to see nsfw" Button in Screenshot
'[data-click-id="text"] button'
).click() # Remove "Click to see nsfw" Button in Screenshot
# translate code # translate code
@ -72,16 +70,12 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
print_substep("Skipping translation...") print_substep("Skipping translation...")
postcontentpath = f"assets/temp/{id}/png/title.png" postcontentpath = f"assets/temp/{id}/png/title.png"
page.locator('[data-test-id="post-content"]').screenshot(path= postcontentpath) page.locator('[data-test-id="post-content"]').screenshot(path=postcontentpath)
if storymode: if storymode:
page.locator('[data-click-id="text"]').screenshot( page.locator('[data-click-id="text"]').screenshot(path=f"assets/temp/{id}/png/story_content.png")
path=f"assets/temp/{id}/png/story_content.png"
)
else: else:
for idx, comment in enumerate( for idx, comment in enumerate(track(reddit_object["comments"], "Downloading screenshots...")):
track(reddit_object["comments"], "Downloading screenshots...")
):
# Stop if we have reached the screenshot_num # Stop if we have reached the screenshot_num
if idx >= screenshot_num: if idx >= screenshot_num:
break break
@ -103,9 +97,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
[comment_tl, comment["comment_id"]], [comment_tl, comment["comment_id"]],
) )
try: try:
page.locator(f"#t1_{comment['comment_id']}").screenshot( page.locator(f"#t1_{comment['comment_id']}").screenshot(path=f"assets/temp/{id}/png/comment_{idx}.png")
path=f"assets/temp/{id}/png/comment_{idx}.png"
)
except TimeoutError: except TimeoutError:
del reddit_object["comments"] del reddit_object["comments"]
screenshot_num += 1 screenshot_num += 1

Loading…
Cancel
Save