Some refactors, removed unnecessary f-strings and never referenced variables

pull/548/head
CordlessCoder 3 years ago
parent c65a315186
commit 8b0be93302

@ -7,7 +7,7 @@ class GTTS:
req_text: str = "Google Text To Speech", req_text: str = "Google Text To Speech",
filename: str = "title.mp3", filename: str = "title.mp3",
random_speaker=False, random_speaker=False,
censer=False, censor=False,
): ):
tts = gTTS(text=req_text, lang="en", slow=False) tts = gTTS(text=req_text, lang="en", slow=False)
tts.save(f"{filename}") tts.save(f"{filename}")

@ -8,7 +8,23 @@ from moviepy.audio.AudioClip import concatenate_audioclips, CompositeAudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip from moviepy.audio.io.AudioFileClip import AudioFileClip
from requests.exceptions import JSONDecodeError from requests.exceptions import JSONDecodeError
voices = ['Brian', 'Emma', 'Russell', 'Joey', 'Matthew', 'Joanna', 'Kimberly', 'Amy', 'Geraint', 'Nicole', 'Justin', 'Ivy', 'Kendra', 'Salli', 'Raveena'] voices = [
"Brian",
"Emma",
"Russell",
"Joey",
"Matthew",
"Joanna",
"Kimberly",
"Amy",
"Geraint",
"Nicole",
"Justin",
"Ivy",
"Kendra",
"Salli",
"Raveena",
]
# valid voices https://lazypy.ro/tts/ # valid voices https://lazypy.ro/tts/
@ -16,29 +32,33 @@ voices = ['Brian', 'Emma', 'Russell', 'Joey', 'Matthew', 'Joanna', 'Kimberly', '
class POLLY: class POLLY:
def __init__(self): def __init__(self):
self.url = 'https://streamlabs.com/polly/speak' self.url = "https://streamlabs.com/polly/speak"
def tts( def tts(
self, self,
req_text: str = "Amazon Text To Speech", req_text: str = "Amazon Text To Speech",
filename: str = "title.mp3", filename: str = "title.mp3",
random_speaker=False, random_speaker=False,
censer=False, censor=False,
): ):
if random_speaker: if random_speaker:
voice = self.randomvoice() voice = self.randomvoice()
else: else:
if not os.getenv('VOICE'): if not os.getenv("VOICE"):
return ValueError('Please set the environment variable VOICE to a valid voice. options are: {}'.format(voices)) return ValueError(
"Please set the environment variable VOICE to a valid voice. options are: {}".format(
voices
)
)
voice = str(os.getenv("VOICE")).capitalize() voice = str(os.getenv("VOICE")).capitalize()
body = {'voice': voice, 'text': req_text, 'service': 'polly'} body = {"voice": voice, "text": req_text, "service": "polly"}
response = requests.post(self.url, data=body) response = requests.post(self.url, data=body)
try: try:
voice_data = requests.get(response.json()['speak_url']) voice_data = requests.get(response.json()["speak_url"])
with open(filename, 'wb') as f: with open(filename, "wb") as f:
f.write(voice_data.content) f.write(voice_data.content)
except (KeyError, JSONDecodeError): except (KeyError, JSONDecodeError):
if response.json()['error'] == 'Text length is too long!': if response.json()["error"] == "Text length is too long!":
chunks = [ chunks = [
m.group().strip() for m in re.finditer(r" *((.{0,499})(\.|.$))", req_text) m.group().strip() for m in re.finditer(r" *((.{0,499})(\.|.$))", req_text)
] ]
@ -48,9 +68,9 @@ class POLLY:
chunkId = 0 chunkId = 0
for chunk in chunks: for chunk in chunks:
body = {'voice': voice, 'text': chunk, 'service': 'polly'} body = {"voice": voice, "text": chunk, "service": "polly"}
resp = requests.post(self.url, data=body) resp = requests.post(self.url, data=body)
voice_data = requests.get(resp.json()['speak_url']) voice_data = requests.get(resp.json()["speak_url"])
with open(filename.replace(".mp3", f"-{chunkId}.mp3"), "wb") as out: with open(filename.replace(".mp3", f"-{chunkId}.mp3"), "wb") as out:
out.write(voice_data.content) out.write(voice_data.content)
@ -63,12 +83,14 @@ class POLLY:
cbn.build(audio_clips, filename, "concatenate") cbn.build(audio_clips, filename, "concatenate")
else: else:
os.rename(audio_clips[0], filename) os.rename(audio_clips[0], filename)
except (sox.core.SoxError, except (
FileNotFoundError): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339 sox.core.SoxError,
FileNotFoundError,
): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339
for clip in audio_clips: for clip in audio_clips:
i = audio_clips.index(clip) # get the index of the clip i = audio_clips.index(clip) # get the index of the clip
audio_clips = ( audio_clips = (
audio_clips[:i] + [AudioFileClip(clip)] + audio_clips[i + 1:] audio_clips[:i] + [AudioFileClip(clip)] + audio_clips[i + 1 :]
) # replace the clip with an AudioFileClip ) # replace the clip with an AudioFileClip
audio_concat = concatenate_audioclips(audio_clips) audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat]) audio_composite = CompositeAudioClip([audio_concat])
@ -79,7 +101,7 @@ class POLLY:
Amazon Polly fails to read some symbols properly such as '& (and)'. Amazon Polly fails to read some symbols properly such as '& (and)'.
So we normalize input text before passing it to the service So we normalize input text before passing it to the service
""" """
text = text.replace('&', 'and') text = text.replace("&", "and")
return text return text
def randomvoice(self): def randomvoice(self):

@ -74,21 +74,17 @@ class TikTok: # TikTok Text-to-Speech Wrapper
req_text: str = "TikTok Text To Speech", req_text: str = "TikTok Text To Speech",
filename: str = "title.mp3", filename: str = "title.mp3",
random_speaker: bool = False, random_speaker: bool = False,
censer=False, censor=False,
): ):
req_text = req_text.replace("+", "plus").replace(" ", "+").replace("&", "and") req_text = req_text.replace("+", "plus").replace(" ", "+").replace("&", "and")
if censer: if censor:
# req_text = pf.censor(req_text) # req_text = pf.censor(req_text)
pass pass
voice = ( voice = (
self.randomvoice() self.randomvoice() if random_speaker else (os.getenv("VOICE") or random.choice(human))
if random_speaker
else (os.getenv("VOICE") or random.choice(human))
) )
chunks = [ chunks = [m.group().strip() for m in re.finditer(r" *((.{0,299})(\.|.$))", req_text)]
m.group().strip() for m in re.finditer(r" *((.{0,299})(\.|.$))", req_text)
]
audio_clips = [] audio_clips = []
cbn = sox.Combiner() cbn = sox.Combiner()
@ -97,9 +93,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper
chunkId = 0 chunkId = 0
for chunk in chunks: for chunk in chunks:
try: try:
r = requests.post( r = requests.post(f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0")
f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0"
)
except requests.exceptions.SSLError: except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611 # https://stackoverflow.com/a/47475019/18516611
session = requests.Session() session = requests.Session()
@ -107,9 +101,7 @@ class TikTok: # TikTok Text-to-Speech Wrapper
adapter = HTTPAdapter(max_retries=retry) adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter) session.mount("http://", adapter)
session.mount("https://", adapter) session.mount("https://", adapter)
r = session.post( r = session.post(f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0")
f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0"
)
print(r.text) print(r.text)
vstr = [r.json()["data"]["v_str"]][0] vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr) b64d = base64.b64decode(vstr)
@ -126,7 +118,10 @@ class TikTok: # TikTok Text-to-Speech Wrapper
cbn.build(audio_clips, filename, "concatenate") cbn.build(audio_clips, filename, "concatenate")
else: else:
os.rename(audio_clips[0], filename) os.rename(audio_clips[0], filename)
except (sox.core.SoxError, FileNotFoundError): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339 except (
sox.core.SoxError,
FileNotFoundError,
): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339
for clip in audio_clips: for clip in audio_clips:
i = audio_clips.index(clip) # get the index of the clip i = audio_clips.index(clip) # get the index of the clip
audio_clips = ( audio_clips = (

@ -11,7 +11,8 @@ from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3 from video_creation.voices import save_text_to_mp3
banner = """ print(
"""
@ -19,7 +20,7 @@ banner = """
""" """
print(banner) )
load_dotenv() load_dotenv()
# Modified by JasonLovesDoggo # Modified by JasonLovesDoggo
print_markdown( print_markdown(
@ -47,7 +48,7 @@ def main():
download_screenshots_of_reddit_posts(reddit_object, number_of_comments) download_screenshots_of_reddit_posts(reddit_object, number_of_comments)
download_background() download_background()
chop_background_video(length) chop_background_video(length)
final_video = make_final_video(number_of_comments, length) make_final_video(number_of_comments, length)
def run_many(times): def run_many(times):

@ -23,9 +23,7 @@ def get_subreddit_threads():
content = {} content = {}
if str(getenv("REDDIT_2FA")).casefold() == "yes": if str(getenv("REDDIT_2FA")).casefold() == "yes":
print( print("\nEnter your two-factor authentication code from your authenticator app.\n")
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
code = input("> ") code = input("> ")
print() print()
pw = getenv("REDDIT_PASSWORD") pw = getenv("REDDIT_PASSWORD")
@ -51,9 +49,7 @@ def get_subreddit_threads():
input("What subreddit would you like to pull from? ") input("What subreddit would you like to pull from? ")
) # if the env isnt set, ask user ) # if the env isnt set, ask user
else: else:
print_substep( print_substep(f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config")
f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config"
)
subreddit = reddit.subreddit( subreddit = reddit.subreddit(
getenv("SUBREDDIT") getenv("SUBREDDIT")
) # Allows you to specify in .env. Done for automation purposes. ) # Allows you to specify in .env. Done for automation purposes.
@ -71,14 +67,10 @@ def get_subreddit_threads():
num_comments = submission.num_comments num_comments = submission.num_comments
print_substep(f"Video will be: {submission.title} :thumbsup:", style="bold green") print_substep(f"Video will be: {submission.title} :thumbsup:", style="bold green")
print_substep(f"Thread has " + str(upvotes) + " upvotes", style="bold blue") print_substep(f"Thread has {upvotes} upvotes", style="bold blue")
print_substep( print_substep(f"Thread has a upvote ratio of {ratio}%", style="bold blue")
f"Thread has a upvote ratio of " + str(ratio) + "%", style="bold blue" print_substep(f"Thread has {num_comments} comments", style="bold blue")
) environ["VIDEO_TITLE"] = str(textify(submission.title)) # todo use global instend of env vars
print_substep(f"Thread has " + str(num_comments) + " comments", style="bold blue")
environ["VIDEO_TITLE"] = str(
textify(submission.title)
) # todo use global instend of env vars
environ["VIDEO_ID"] = str(textify(submission.id)) environ["VIDEO_ID"] = str(textify(submission.id))
content["thread_url"] = f"https://reddit.com{submission.permalink}" content["thread_url"] = f"https://reddit.com{submission.permalink}"

@ -41,7 +41,7 @@ def make_final_video(number_of_clips, length):
audio_clips = [] audio_clips = []
for i in range(0, number_of_clips): for i in range(0, number_of_clips):
audio_clips.append(AudioFileClip(f"assets/temp/mp3/{i}.mp3")) audio_clips.append(AudioFileClip(f"assets/temp/mp3/{i}.mp3"))
audio_clips.insert(0, AudioFileClip(f"assets/temp/mp3/title.mp3")) audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips) audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat]) audio_composite = CompositeAudioClip([audio_concat])
@ -55,12 +55,10 @@ def make_final_video(number_of_clips, length):
# add title to video # add title to video
image_clips = [] image_clips = []
# Gather all images # Gather all images
if ( if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE
opacity is None or float(opacity) >= 1
): # opacity not set or is set to one OR MORE
image_clips.insert( image_clips.insert(
0, 0,
ImageClip(f"assets/temp/png/title.png") ImageClip("assets/temp/png/title.png")
.set_duration(audio_clips[0].duration) .set_duration(audio_clips[0].duration)
.set_position("center") .set_position("center")
.resize(width=W - 100) .resize(width=W - 100)
@ -69,15 +67,14 @@ def make_final_video(number_of_clips, length):
else: else:
image_clips.insert( image_clips.insert(
0, 0,
ImageClip(f"assets/temp/png/title.png") ImageClip("assets/temp/png/title.png")
.set_duration(audio_clips[0].duration) .set_duration(audio_clips[0].duration)
.set_position("center") .set_position("center")
.resize(width=W - 100)) .resize(width=W - 100),
)
for i in range(0, number_of_clips): for i in range(0, number_of_clips):
if ( if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE
opacity is None or float(opacity) >= 1
): # opacity not set or is set to one OR MORE
image_clips.append( image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png") ImageClip(f"assets/temp/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration) .set_duration(audio_clips[i + 1].duration)
@ -103,9 +100,7 @@ def make_final_video(number_of_clips, length):
# .set_opacity(float(opacity)), # .set_opacity(float(opacity)),
# ) # )
# else: # else:
image_concat = concatenate_videoclips(image_clips).set_position( image_concat = concatenate_videoclips(image_clips).set_position(("center", "center"))
("center", "center")
)
image_concat.audio = audio_composite image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat]) final = CompositeVideoClip([background_clip, image_concat])
@ -139,9 +134,7 @@ def make_final_video(number_of_clips, length):
print_substep("the results folder didn't exist so I made it") print_substep("the results folder didn't exist so I made it")
os.mkdir("./results") os.mkdir("./results")
final.write_videofile( final.write_videofile("assets/temp/temp.mp4", fps=30, audio_codec="aac", audio_bitrate="192k")
"assets/temp/temp.mp4", fps=30, audio_codec="aac", audio_bitrate="192k"
)
ffmpeg_tools.ffmpeg_extract_subclip( ffmpeg_tools.ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 0, length, targetname=f"results/{filename}" "assets/temp/temp.mp4", 0, length, targetname=f"results/{filename}"
) )
@ -150,7 +143,7 @@ def make_final_video(number_of_clips, length):
print_step("Removing temporary files 🗑") print_step("Removing temporary files 🗑")
cleanups = cleanup() cleanups = cleanup()
print_substep(f"Removed {cleanups} temporary files 🗑") print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep(f"See result in the results folder!") print_substep("See result in the results folder!")
print_step( print_step(
f"Reddit title: {os.getenv('VIDEO_TITLE')} \n Background Credit: {os.getenv('background_credit')}" f"Reddit title: {os.getenv('VIDEO_TITLE')} \n Background Credit: {os.getenv('background_credit')}"

@ -10,11 +10,12 @@ from rich.progress import track
from TTS.swapper import TTS from TTS.swapper import TTS
console = Console()
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.voice import sanitize_text from utils.voice import sanitize_text
console = Console()
VIDEO_LENGTH: int = 40 # secs VIDEO_LENGTH: int = 40 # secs
@ -31,23 +32,24 @@ def save_text_to_mp3(reddit_obj):
TextToSpeech = TTS() TextToSpeech = TTS()
TextToSpeech.tts( TextToSpeech.tts(
sanitize_text(reddit_obj["thread_title"]), sanitize_text(reddit_obj["thread_title"]),
filename=f"assets/temp/mp3/title.mp3", filename="assets/temp/mp3/title.mp3",
random_speaker=False, random_speaker=False,
) )
try: try:
length += MP3(f"assets/temp/mp3/title.mp3").info.length length += MP3("assets/temp/mp3/title.mp3").info.length
except HeaderNotFoundError: # note to self AudioFileClip except HeaderNotFoundError: # note to self AudioFileClip
length += sox.file_info.duration(f"assets/temp/mp3/title.mp3") length += sox.file_info.duration("assets/temp/mp3/title.mp3")
if getenv("STORYMODE").casefold() == "true": if getenv("STORYMODE").casefold() == "true":
TextToSpeech.tts( TextToSpeech.tts(
sanitize_text(reddit_obj["thread_content"]), sanitize_text(reddit_obj["thread_content"]),
filename=f"assets/temp/mp3/story_content.mp3", filename="assets/temp/mp3/story_content.mp3",
random_speaker=False, random_speaker=False,
) )
# 'story_content' # 'story_content'
com = 0 com = 0
for comment in track((reddit_obj["comments"]), "Saving..."): for comment in track((reddit_obj["comments"]), "Saving..."):
# ! Stop creating mp3 files if the length is greater than VIDEO_LENGTH seconds. This can be longer, but this is just a good_voices starting point # ! Stop creating mp3 files if the length is greater than VIDEO_LENGTH seconds. This can be longer
# but this is just a good_voices starting point
if length > VIDEO_LENGTH: if length > VIDEO_LENGTH:
break break

Loading…
Cancel
Save