style: reformatted

pull/885/head
Jason 3 years ago
parent 0d63fb60c0
commit fc414b57b7

@ -62,7 +62,9 @@ noneng = [
class TikTok: # TikTok Text-to-Speech Wrapper class TikTok: # TikTok Text-to-Speech Wrapper
def __init__(self): def __init__(self):
self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker=" self.URI_BASE = (
"https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
)
self.max_chars = 300 self.max_chars = 300
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng} self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng}
@ -79,9 +81,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper
) )
) )
try: try:
r = requests.post( r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
except requests.exceptions.SSLError: except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611 # https://stackoverflow.com/a/47475019/18516611
session = requests.Session() session = requests.Session()
@ -89,9 +89,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper
adapter = HTTPAdapter(max_retries=retry) adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter) session.mount("http://", adapter)
session.mount("https://", adapter) session.mount("https://", adapter)
r = session.post( r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
# print(r.text) # print(r.text)
vstr = [r.json()["data"]["v_str"]][0] vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr) b64d = base64.b64decode(vstr)

@ -39,9 +39,7 @@ class AWSPolly:
return ValueError( return ValueError(
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}" f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}"
) )
voice = str( voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize()
settings.config["settings"]["tts"]["aws_polly_voice"]
).capitalize()
try: try:
# Request speech synthesis # Request speech synthesis
response = polly.synthesize_speech( response = polly.synthesize_speech(

@ -63,9 +63,7 @@ class TTSEngine:
self.call_tts("posttext", self.reddit_object["thread_post"]) self.call_tts("posttext", self.reddit_object["thread_post"])
idx = None idx = None
for idx, comment in track( for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
enumerate(self.reddit_object["comments"]), "Saving..."
):
# ! Stop creating mp3 files if the length is greater than max length. # ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length: if self.length > self.max_length:
break break
@ -81,9 +79,7 @@ class TTSEngine:
split_files = [] split_files = []
split_text = [ split_text = [
x.group().strip() x.group().strip()
for x in re.finditer( for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text)
rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text
)
] ]
idy = None idy = None
@ -106,9 +102,7 @@ class TTSEngine:
# Path(f"{self.path}/{idx}-{i}.part.mp3").unlink() # Path(f"{self.path}/{idx}-{i}.part.mp3").unlink()
def call_tts(self, filename: str, text: str): def call_tts(self, filename: str, text: str):
self.tts_module.run( self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3")
text=process_text(text), filepath=f"{self.path}/{filename}.mp3"
)
# try: # try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length # self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError): # except (MutagenError, HeaderNotFoundError):

@ -39,9 +39,7 @@ class StreamlabsPolly:
return ValueError( return ValueError(
f"Please set the config variable STREAMLABS_VOICE to a valid voice. options are: {voices}" f"Please set the config variable STREAMLABS_VOICE to a valid voice. options are: {voices}"
) )
voice = str( voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize()
settings.config["settings"]["tts"]["streamlabs_polly_voice"]
).capitalize()
body = {"voice": voice, "text": text, "service": "polly"} body = {"voice": voice, "text": text, "service": "polly"}
response = requests.post(self.url, data=body) response = requests.post(self.url, data=body)
try: try:

@ -8,7 +8,11 @@ from utils.console import print_markdown, print_step
from utils import settings from utils import settings
# from utils.checker import envUpdate # from utils.checker import envUpdate
from video_creation.background import download_background, chop_background_video, get_background_config from video_creation.background import (
download_background,
chop_background_video,
get_background_config,
)
from video_creation.final_video import make_final_video from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3 from video_creation.voices import save_text_to_mp3
@ -60,9 +64,7 @@ if __name__ == "__main__":
run_many(config["settings"]["times_to_run"]) run_many(config["settings"]["times_to_run"])
elif len(config["reddit"]["thread"]["post_id"].split("+")) > 1: elif len(config["reddit"]["thread"]["post_id"].split("+")) > 1:
for index, post_id in enumerate( for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")):
config["reddit"]["thread"]["post_id"].split("+")
):
index += 1 index += 1
print_step( print_step(
f'on the {index}{("st" if index%10 == 1 else ("nd" if index%10 == 2 else ("rd" if index%10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}' f'on the {index}{("st" if index%10 == 1 else ("nd" if index%10 == 2 else ("rd" if index%10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'

@ -18,9 +18,7 @@ def get_subreddit_threads(POST_ID: str):
content = {} content = {}
if settings.config["reddit"]["creds"]["2fa"] == True: if settings.config["reddit"]["creds"]["2fa"] == True:
print( print("\nEnter your two-factor authentication code from your authenticator app.\n")
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
code = input("> ") code = input("> ")
print() print()
pw = settings.config["reddit"]["creds"]["password"] pw = settings.config["reddit"]["creds"]["password"]
@ -46,9 +44,7 @@ def get_subreddit_threads(POST_ID: str):
]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython") ]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
try: try:
subreddit = reddit.subreddit( subreddit = reddit.subreddit(
re.sub( re.sub(r"r\/", "", input("What subreddit would you like to pull from? "))
r"r\/", "", input("What subreddit would you like to pull from? ")
)
# removes the r/ from the input # removes the r/ from the input
) )
except ValueError: except ValueError:
@ -58,9 +54,7 @@ def get_subreddit_threads(POST_ID: str):
sub = settings.config["reddit"]["thread"]["subreddit"] sub = settings.config["reddit"]["thread"]["subreddit"]
print_substep(f"Using subreddit: r/{sub} from TOML config") print_substep(f"Using subreddit: r/{sub} from TOML config")
subreddit_choice = sub subreddit_choice = sub
if subreddit_choice.casefold().startswith( if subreddit_choice.casefold().startswith("r/"): # removes the r/ from the input
"r/"
): # removes the r/ from the input
subreddit_choice = subreddit_choice[2:] subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit( subreddit = reddit.subreddit(
subreddit_choice subreddit_choice
@ -72,9 +66,7 @@ def get_subreddit_threads(POST_ID: str):
settings.config["reddit"]["thread"]["post_id"] settings.config["reddit"]["thread"]["post_id"]
and len(settings.config["reddit"]["thread"]["post_id"].split("+")) == 1 and len(settings.config["reddit"]["thread"]["post_id"].split("+")) == 1
): ):
submission = reddit.submission( submission = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"])
id=settings.config["reddit"]["thread"]["post_id"]
)
else: else:
threads = subreddit.hot(limit=25) threads = subreddit.hot(limit=25)

@ -10,9 +10,7 @@ def cleanup() -> int:
""" """
if exists("./assets/temp"): if exists("./assets/temp"):
count = 0 count = 0
files = [ files = [f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()]
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
count += len(files) count += len(files)
for f in files: for f in files:
os.remove(f) os.remove(f)

@ -49,10 +49,7 @@ def handle_input(
optional=False, optional=False,
): ):
if optional: if optional:
console.print( console.print(message + "\n[green]This is an optional value. Do you want to skip it? (y/n)")
message
+ "\n[green]This is an optional value. Do you want to skip it? (y/n)"
)
if input().casefold().startswith("y"): if input().casefold().startswith("y"):
return default if default is not NotImplemented else "" return default if default is not NotImplemented else ""
if default is not NotImplemented: if default is not NotImplemented:
@ -86,11 +83,7 @@ def handle_input(
console.print("[red]" + err_message) console.print("[red]" + err_message)
continue continue
elif match != "" and re.match(match, user_input) is None: elif match != "" and re.match(match, user_input) is None:
console.print( console.print("[red]" + err_message + "\nAre you absolutely sure it's correct?(y/n)")
"[red]"
+ err_message
+ "\nAre you absolutely sure it's correct?(y/n)"
)
if input().casefold().startswith("y"): if input().casefold().startswith("y"):
break break
continue continue
@ -123,9 +116,5 @@ def handle_input(
if user_input in options: if user_input in options:
return user_input return user_input
console.print( console.print(
"[red bold]" "[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + "."
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
) )

@ -15,9 +15,7 @@ def get_subreddit_undone(submissions: list, subreddit):
""" """
# recursively checks if the top submission in the list was already done. # recursively checks if the top submission in the list was already done.
with open( with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for submission in submissions: for submission in submissions:
if already_done(done_videos, submission): if already_done(done_videos, submission):

@ -20,9 +20,7 @@ def check_done(
Returns: Returns:
Submission|None: Reddit object in args Submission|None: Reddit object in args
""" """
with open( with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for video in done_videos: for video in done_videos:
if video["id"] == str(redditobj): if video["id"] == str(redditobj):

@ -24,28 +24,29 @@ background_options = {
"https://www.youtube.com/watch?v=vw5L4xCPy9Q", "https://www.youtube.com/watch?v=vw5L4xCPy9Q",
"bike-parkour-gta.mp4", "bike-parkour-gta.mp4",
"Achy Gaming", "Achy Gaming",
lambda t: ('center', 480 + t) lambda t: ("center", 480 + t),
), ),
"rocket-league": ( # Rocket League "rocket-league": ( # Rocket League
"https://www.youtube.com/watch?v=2X9QGY__0II", "https://www.youtube.com/watch?v=2X9QGY__0II",
"rocket_league.mp4", "rocket_league.mp4",
"Orbital Gameplay", "Orbital Gameplay",
"top" "top",
), ),
"minecraft": ( # Minecraft parkour "minecraft": ( # Minecraft parkour
"https://www.youtube.com/watch?v=n_Dv4JMiwK8", "https://www.youtube.com/watch?v=n_Dv4JMiwK8",
"parkour.mp4", "parkour.mp4",
"bbswitzer", "bbswitzer",
"center" "center",
), ),
"gta": ( # GTA Stunt Race "gta": ( # GTA Stunt Race
"https://www.youtube.com/watch?v=qGa9kWREOnE", "https://www.youtube.com/watch?v=qGa9kWREOnE",
"gta-stunt-race.mp4", "gta-stunt-race.mp4",
"Achy Gaming", "Achy Gaming",
lambda t: ('center', 480 + t) lambda t: ("center", 480 + t),
) ),
} }
def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int, int]: def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int, int]:
"""Generates a random interval of time to be used as the background of the video. """Generates a random interval of time to be used as the background of the video.
@ -59,10 +60,11 @@ def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int
random_time = randrange(180, int(length_of_clip) - int(video_length)) random_time = randrange(180, int(length_of_clip) - int(video_length))
return random_time, random_time + video_length return random_time, random_time + video_length
def get_background_config(): def get_background_config():
"""Fetch the background/s configuration""" """Fetch the background/s configuration"""
try: try:
choice = str(settings.config['settings']['background_choice']).casefold() choice = str(settings.config["settings"]["background_choice"]).casefold()
except AttributeError: except AttributeError:
print_substep("No background selected. Picking random background'") print_substep("No background selected. Picking random background'")
choice = None choice = None
@ -90,8 +92,7 @@ def download_background(background_config: Tuple[str, str, str, Any]):
YouTube(uri, on_progress_callback=on_progress).streams.filter(res="1080p").first().download( YouTube(uri, on_progress_callback=on_progress).streams.filter(res="1080p").first().download(
"assets/backgrounds", filename=f"{credit}-{filename}" "assets/backgrounds", filename=f"{credit}-{filename}"
) )
print_substep("Background videos downloaded successfully! 🎉", print_substep("Background videos downloaded successfully! 🎉", style="bold green")
style="bold green")
def chop_background_video(background_config: Tuple[str, str, str, Any], video_length: int): def chop_background_video(background_config: Tuple[str, str, str, Any], video_length: int):
@ -107,8 +108,7 @@ def chop_background_video(background_config: Tuple[str, str, str, Any], video_le
background = VideoFileClip(f"assets/backgrounds/{choice}") background = VideoFileClip(f"assets/backgrounds/{choice}")
start_time, end_time = get_start_and_end_times( start_time, end_time = get_start_and_end_times(video_length, background.duration)
video_length, background.duration)
try: try:
ffmpeg_extract_subclip( ffmpeg_extract_subclip(
f"assets/backgrounds/{choice}", f"assets/backgrounds/{choice}",

@ -35,13 +35,9 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
context = browser.new_context() context = browser.new_context()
if settings.config["settings"]["theme"] == "dark": if settings.config["settings"]["theme"] == "dark":
cookie_file = open( cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8")
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
else: else:
cookie_file = open( cookie_file = open("./video_creation/data/cookie-light-mode.json", encoding="utf-8")
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
cookies = json.load(cookie_file) cookies = json.load(cookie_file)
context.add_cookies(cookies) # load preference cookies context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot # Get the thread screenshot
@ -73,9 +69,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
else: else:
print_substep("Skipping translation...") print_substep("Skipping translation...")
page.locator('[data-test-id="post-content"]').screenshot( page.locator('[data-test-id="post-content"]').screenshot(path="assets/temp/png/title.png")
path="assets/temp/png/title.png"
)
if storymode: if storymode:
page.locator('[data-click-id="text"]').screenshot( page.locator('[data-click-id="text"]').screenshot(

@ -35,9 +35,7 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
voice = settings.config["settings"]["tts"]["choice"] voice = settings.config["settings"]["tts"]["choice"]
if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): if voice.casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine( text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
get_case_insensitive_key_value(TTSProviders, voice), reddit_obj
)
else: else:
while True: while True:
print_step("Please choose one of the following TTS providers: ") print_step("Please choose one of the following TTS providers: ")
@ -46,19 +44,13 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break break
print("Unknown Choice") print("Unknown Choice")
text_to_mp3 = TTSEngine( text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj)
get_case_insensitive_key_value(TTSProviders, choice), reddit_obj
)
return text_to_mp3.run() return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key): def get_case_insensitive_key_value(input_dict, key):
return next( return next(
( (value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
value
for dict_key, value in input_dict.items()
if dict_key.lower() == key.lower()
),
None, None,
) )

Loading…
Cancel
Save