added fixes

pull/963/head
Drugsosos 3 years ago
parent e19e532ea5
commit 6e4e6527a1
No known key found for this signature in database
GPG Key ID: 8E35176FE617E28D

@ -0,0 +1,10 @@
def audio_length(
path: str,
) -> float | int:
from mutagen.mp3 import MP3
try:
audio = MP3(path)
return audio.info.length
except Exception as e: # TODO add logging
return 0

@ -8,16 +8,16 @@ import re
# from mutagen.mp3 import MP3, HeaderNotFoundError # from mutagen.mp3 import MP3, HeaderNotFoundError
import translators as ts import translators as ts
from rich.progress import track from rich.progress import track
from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips from attr import attrs, attrib
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.voice import sanitize_text from utils.voice import sanitize_text
from utils import settings from utils import settings
from TTS.common import audio_length
DEFUALT_MAX_LENGTH: int = 50 # video length variable
@attrs(auto_attribs=True)
class TTSEngine: class TTSEngine:
"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines. """Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.
Args: Args:
@ -29,94 +29,72 @@ class TTSEngine:
Notes: Notes:
tts_module must take the arguments text and filepath. tts_module must take the arguments text and filepath.
""" """
tts_module: object
reddit_object: dict
path: str = 'assets/temp/mp3'
max_length: int = 50 # TODO move to config
__total_length: int = attrib(
default=0,
kw_only=True
)
def __init__( def run(
self, self
tts_module, ) -> list:
reddit_object: dict,
path: str = "assets/temp/mp3",
max_length: int = DEFUALT_MAX_LENGTH,
):
self.tts_module = tts_module()
self.reddit_object = reddit_object
self.path = path
self.max_length = max_length
self.length = 0
def run(self) -> Tuple[int, int]:
Path(self.path).mkdir(parents=True, exist_ok=True) Path(self.path).mkdir(parents=True, exist_ok=True)
# This file needs to be removed in case this post does not use post text, so that it wont appear in the final video # This file needs to be removed in case this post does not use post text
# so that it won't appear in the final video
try: try:
Path(f"{self.path}/posttext.mp3").unlink() Path(f'{self.path}/posttext.mp3').unlink()
except OSError: except OSError:
pass pass
print_step("Saving Text to MP3 files...") print_step('Saving Text to MP3 files...')
self.call_tts("title", self.reddit_object["thread_title"])
if (
self.reddit_object["thread_post"] != ""
and settings.config["settings"]["storymode"] == True
):
self.call_tts("posttext", self.reddit_object["thread_post"])
idx = None
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
# ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length:
break
if not self.tts_module.max_chars:
self.call_tts(f"{idx}", comment["comment_body"])
else:
self.split_post(comment["comment_body"], idx)
print_substep("Saved Text to MP3 files successfully.", style="bold green")
return self.length, idx
def split_post(self, text: str, idx: int):
split_files = []
split_text = [
x.group().strip()
for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text)
]
idy = None self.call_tts('title', self.reddit_object['thread_title'])
for idy, text_cut in enumerate(split_text):
# print(f"{idx}-{idy}: {text_cut}\n") if self.reddit_object['thread_post'] and settings.config['settings']['storymode']:
self.call_tts(f"{idx}-{idy}.part", text_cut) self.call_tts('posttext', self.reddit_object['thread_post'])
split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy}.part.mp3"))
CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile(
f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None
)
for i in split_files: sync_tasks_primary = [
name = i.filename self.call_tts(str(idx), comment['comment_body'])
i.close() for idx, comment in track(enumerate(self.reddit_object['comments']), description='Saving...')
Path(name).unlink() ]
# for i in range(0, idy + 1): print_substep('Saved Text to MP3 files successfully.', style='bold green')
# print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3") return [
comments for comments, condition in
zip(self.reddit_object['comments'], sync_tasks_primary)
if condition
]
# Path(f"{self.path}/{idx}-{i}.part.mp3").unlink() def call_tts(
self,
filename: str,
text: str
) -> bool:
self.tts_module.run(
text=self.process_text(text),
filepath=f'{self.path}/{filename}.mp3'
)
def call_tts(self, filename: str, text: str): clip_length = audio_length(f'assets/audio/{filename}.mp3')
self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.length += clip.duration
clip.close()
if self.__total_length + clip_length <= self.max_length:
self.max_length += clip_length
return True
return False
def process_text(text: str): @staticmethod
lang = settings.config["reddit"]["thread"]["post_lang"] def process_text(
text: str,
) -> str:
lang = settings.config['reddit']['thread']['post_lang']
new_text = sanitize_text(text) new_text = sanitize_text(text)
if lang: if lang:
print_substep("Translating Text...") print_substep('Translating Text...')
translated_text = ts.google(text, to_language=lang) translated_text = ts.google(text, to_language=lang)
new_text = sanitize_text(translated_text) new_text = sanitize_text(translated_text)
return new_text return new_text

@ -1,6 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
from asyncio import run from asyncio import run
import math
from subprocess import Popen from subprocess import Popen
from os import name from os import name
from reddit.subreddit import get_subreddit_threads from reddit.subreddit import get_subreddit_threads
@ -41,13 +40,10 @@ async def main(
): ):
cleanup() cleanup()
reddit_object = get_subreddit_threads(POST_ID) reddit_object = get_subreddit_threads(POST_ID)
length, number_of_comments = save_text_to_mp3(reddit_object) comments_created = save_text_to_mp3(reddit_object)
length = math.ceil(length) await RedditScreenshot(reddit_object, comments_created).download()
await RedditScreenshot(reddit_object, number_of_comments).download()
bg_config = get_background_config() bg_config = get_background_config()
download_background(bg_config) make_final_video(comments_created, reddit_object, bg_config)
chop_background_video(bg_config, length)
make_final_video(number_of_comments, length, reddit_object, bg_config)
async def run_many(times): async def run_many(times):

@ -3,7 +3,7 @@ import multiprocessing
import os import os
import re import re
from os.path import exists from os.path import exists
from typing import Dict, Tuple, Any from typing import Tuple, Any
import translators as ts import translators as ts
@ -13,7 +13,6 @@ from moviepy.editor import (
ImageClip, ImageClip,
concatenate_videoclips, concatenate_videoclips,
concatenate_audioclips, concatenate_audioclips,
CompositeAudioClip,
CompositeVideoClip, CompositeVideoClip,
) )
from moviepy.video.io.ffmpeg_tools import ffmpeg_merge_video_audio, ffmpeg_extract_subclip from moviepy.video.io.ffmpeg_tools import ffmpeg_merge_video_audio, ffmpeg_extract_subclip
@ -23,24 +22,26 @@ from utils.cleanup import cleanup
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.videos import save_data from utils.videos import save_data
from utils import settings from utils import settings
from video_creation.background import download_background, chop_background_video
console = Console() console = Console()
W, H = 1080, 1920 W, H = 1080, 1920 # TODO move to config
def name_normalize(name: str) -> str: def name_normalize(
name: str
) -> str:
name = re.sub(r'[?\\"%*:|<>]', "", name) name = re.sub(r'[?\\"%*:|<>]', "", name)
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name) name = re.sub(r'( [w,W]\s?\/\s?[o,O,0])', r' without', name)
name = re.sub(r"( [w,W]\s?\/)", r" with", name) name = re.sub(r'( [w,W]\s?\/)', r' with', name)
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name) name = re.sub(r'(\d+)\s?\/\s?(\d+)', r'\1 of \2', name)
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name) name = re.sub(r'(\w+)\s?\/\s?(\w+)', r'\1 or \2', name)
name = re.sub(r"\/", r"", name) name = re.sub(r'\/', '', name)
lang = settings.config["reddit"]["thread"]["post_lang"] lang = settings.config['reddit']['thread']['post_lang']
if lang: if lang:
print_substep("Translating filename...") print_substep('Translating filename...')
translated_name = ts.google(name, to_language=lang) translated_name = ts.google(name, to_language=lang)
return translated_name return translated_name
@ -49,48 +50,46 @@ def name_normalize(name: str) -> str:
def make_final_video( def make_final_video(
number_of_clips: int, length: int, reddit_obj: dict, background_config: Tuple[str, str, str, Any] indexes_of_clips: list,
): reddit_obj: dict,
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp background_config: Tuple[str, str, str, Any],
) -> None:
"""
Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args: Args:
number_of_clips (int): Index to end at when going through the screenshots' indexes_of_clips (list): Indexes with created comments'
length (int): Length of the video
reddit_obj (dict): The reddit object that contains the posts to read. reddit_obj (dict): The reddit object that contains the posts to read.
background_config (Tuple[str, str, str, Any]): The background config to use. background_config (Tuple[str, str, str, Any]): The background config to use.
""" """
print_step("Creating the final video 🎥") print_step('Creating the final video 🎥')
VideoFileClip.reW = lambda clip: clip.resize(width=W) VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H) VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = settings.config["settings"]["opacity"] opacity = settings.config['settings']['opacity']
background_clip = (
VideoFileClip("assets/temp/background.mp4") final_length = 0
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
# Gather all audio clips # Gather all audio clips
audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)] audio_clips = [AudioFileClip(f'assets/temp/mp3/{i}.mp3') for i in indexes_of_clips]
audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3")) audio_clips.insert(0, AudioFileClip('assets/temp/mp3/title.mp3'))
audio_concat = concatenate_audioclips(audio_clips) audio_composite = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
console.log(f"[bold green] Video Will Be: {length} Seconds Long") console.log(f'[bold green] Video Will Be: {audio_composite.length} Seconds Long')
# add title to video # add title to video
image_clips = [] image_clips = []
# Gather all images # Gather all images
new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity) new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity)
image_clips.insert( image_clips.insert(
0, 0,
ImageClip("assets/temp/png/title.png") ImageClip('assets/temp/png/title.png')
.set_duration(audio_clips[0].duration) .set_duration(audio_clips[0].duration)
.resize(width=W - 100) .resize(width=W - 100)
.set_opacity(new_opacity), .set_opacity(new_opacity),
) )
for i in range(0, number_of_clips): for i in indexes_of_clips:
image_clips.append( image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png") ImageClip(f'assets/temp/png/comment_{i}.png')
.set_duration(audio_clips[i + 1].duration) .set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100) .resize(width=W - 100)
.set_opacity(new_opacity) .set_opacity(new_opacity)
@ -109,63 +108,73 @@ def make_final_video(
img_clip_pos = background_config[3] img_clip_pos = background_config[3]
image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos) image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos)
image_concat.audio = audio_composite image_concat.audio = audio_composite
download_background(background_config)
chop_background_video(background_config, final_length)
background_clip = (
VideoFileClip("assets/temp/background.mp4")
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
final = CompositeVideoClip([background_clip, image_concat]) final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) title = re.sub(r'[^\w\s-]', '', reddit_obj['thread_title'])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) idx = re.sub(r'[^\w\s-]', '', reddit_obj['thread_id'])
filename = f"{name_normalize(title)}.mp4" filename = f'{name_normalize(title)}.mp4'
subreddit = settings.config["reddit"]["thread"]["subreddit"] subreddit = settings.config['reddit']['thread']['subreddit']
save_data(subreddit, filename, title, idx, background_config[2]) save_data(subreddit, filename, title, idx, background_config[2])
if not exists(f"./results/{subreddit}"): if not exists(f'./results/{subreddit}'):
print_substep("The results folder didn't exist so I made it") print_substep('The results folder didn\'t exist so I made it')
os.makedirs(f"./results/{subreddit}") os.makedirs(f'./results/{subreddit}')
final.write_videofile( final.write_videofile(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
fps=30, fps=30,
audio_codec="aac", audio_codec='aac',
audio_bitrate="192k", audio_bitrate='192k',
verbose=False, verbose=False,
threads=multiprocessing.cpu_count(), threads=multiprocessing.cpu_count(),
) )
if settings.config["settings"]["background_audio"]: if settings.config['settings']['background_audio']:
print("[bold green] Merging background audio with video") print('[bold green] Merging background audio with video')
if not exists(f"assets/backgrounds/background.mp3"): if not exists('assets/backgrounds/background.mp3'):
print_substep( print_substep(
"Cannot find assets/backgrounds/background.mp3 audio file didn't so skipping." 'Cannot find assets/backgrounds/background.mp3 audio file didn\'t so skipping.'
) )
ffmpeg_extract_subclip( ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
0, 0,
final.duration, final.duration,
targetname=f"results/{subreddit}/{filename}", targetname=f'results/{subreddit}/{filename}',
) )
else: else:
ffmpeg_merge_video_audio( ffmpeg_merge_video_audio(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
"assets/backgrounds/background.mp3", 'assets/backgrounds/background.mp3',
"assets/temp/temp_audio.mp4", 'assets/temp/temp_audio.mp4',
) )
ffmpeg_extract_subclip( # check if this gets run ffmpeg_extract_subclip( # check if this gets run
"assets/temp/temp_audio.mp4", 'assets/temp/temp_audio.mp4',
0, 0,
final.duration, final.duration,
targetname=f"results/{subreddit}/{filename}", targetname=f"results/{subreddit}/{filename}",
) )
else: else:
print("debug duck") print('debug duck')
ffmpeg_extract_subclip( ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
0, 0,
final.duration, final.duration,
targetname=f"results/{subreddit}/{filename}", targetname=f'results/{subreddit}/{filename}',
) )
print_step("Removing temporary files 🗑") print_step('Removing temporary files 🗑')
cleanups = cleanup() cleanups = cleanup()
print_substep(f"Removed {cleanups} temporary files 🗑") print_substep(f'Removed {cleanups} temporary files 🗑')
print_substep("See result in the results folder!") print_substep('See result in the results folder!')
print_step( print_step(
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}' f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'

@ -209,10 +209,10 @@ class RedditScreenshot(Browser, Wait):
""" """
Args: Args:
reddit_object (Dict): Reddit object received from reddit/subreddit.py reddit_object (Dict): Reddit object received from reddit/subreddit.py
screenshot_num (int): Number of screenshots to download screenshot_idx (int): List with indexes of voiced comments
""" """
reddit_object: dict reddit_object: dict
screenshot_num: int = attrib() screenshot_idx: list = attrib()
@screenshot_num.validator @screenshot_num.validator
def validate_screenshot_num(self, attribute, value): def validate_screenshot_num(self, attribute, value):
@ -348,9 +348,8 @@ class RedditScreenshot(Browser, Wait):
) )
async_tasks_primary = [ async_tasks_primary = [
self.__collect_comment(comment, idx) for idx, comment in self.__collect_comment(self.reddit_object['comments'][idx], idx) for idx in
enumerate(self.reddit_object['comments']) self.screenshot_idx
if idx < self.screenshot_num
] ]
for task in track( for task in track(

@ -1,55 +1,50 @@
#!/usr/bin/env python
from typing import Dict, Tuple
from rich.console import Console
from TTS.engine_wrapper import TTSEngine from TTS.engine_wrapper import TTSEngine
from TTS.GTTS import GTTS from TTS.GTTS import GTTS
from TTS.streamlabs_polly import StreamlabsPolly from TTS.streamlabs_polly import StreamlabsPolly
from TTS.aws_polly import AWSPolly from TTS.aws_polly import AWSPolly
from TTS.TikTok import TikTok from TTS.TikTok import TikTok
from utils import settings from utils import settings
from utils.console import print_table, print_step from utils.console import print_table, print_step
console = Console()
TTSProviders = { TTSProviders = {
"GoogleTranslate": GTTS, 'GoogleTranslate': GTTS,
"AWSPolly": AWSPolly, 'AWSPolly': AWSPolly,
"StreamlabsPolly": StreamlabsPolly, 'StreamlabsPolly': StreamlabsPolly,
"TikTok": TikTok, 'TikTok': TikTok,
} }
def save_text_to_mp3(reddit_obj) -> Tuple[int, int]: async def save_text_to_mp3(
reddit_obj: dict,
) -> list:
"""Saves text to MP3 files. """Saves text to MP3 files.
Args: Args:
reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py
Returns: Returns:
tuple[int,int]: (total length of the audio, the number of comments audio was generated for) The number of comments audio was generated for
""" """
voice = settings.config["settings"]["tts"]["choice"] voice = settings.config['settings']['tts']['choice']
if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): if voice.casefold() not in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
else:
while True: while True:
print_step("Please choose one of the following TTS providers: ") print_step('Please choose one of the following TTS providers: ')
print_table(TTSProviders) print_table(TTSProviders)
choice = input("\n") voice = input('\n')
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): if voice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break break
print("Unknown Choice") print('Unknown Choice')
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj) engine_instance = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
return await engine_instance.run()
return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key): def get_case_insensitive_key_value(
input_dict,
key
) -> object:
return next( return next(
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()), (value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
None, None,

Loading…
Cancel
Save