Pending changes exported from your codespace

pull/1715/head
Rafael Soley R 2 years ago
parent 3b0b4abc3e
commit 52820dbff5

@ -186,6 +186,7 @@ def process_text(text: str, clean: bool = True):
new_text = sanitize_text(text) if clean else text new_text = sanitize_text(text) if clean else text
if lang: if lang:
print_substep("Translating Text...") print_substep("Translating Text...")
translated_text = translators.google(text, to_language=lang) #translated_text = translators.google(text, )
translated_text = translators.translate_text(text, translator='google', to_language='en')
new_text = sanitize_text(translated_text) new_text = sanitize_text(translated_text)
return new_text return new_text

@ -1,431 +1,192 @@
import multiprocessing
import os import os
import re import re
from os.path import exists # Needs to be imported specifically from pathlib import Path
from typing import Final from typing import Tuple
from typing import Tuple, Any, Dict
import ffmpeg import numpy as np
import translators import translators
from PIL import Image from moviepy.audio.AudioClip import AudioClip
from rich.console import Console from moviepy.audio.fx.volumex import volumex
from moviepy.editor import AudioFileClip
from rich.progress import track from rich.progress import track
from utils.cleanup import cleanup
from utils.console import print_step, print_substep
from utils.thumbnail import create_thumbnail
from utils.videos import save_data
from utils import settings from utils import settings
from utils.console import print_step, print_substep
import tempfile from utils.voice import sanitize_text
import threading
import time
console = Console()
class ProgressFfmpeg(threading.Thread):
def __init__(self, vid_duration_seconds, progress_update_callback):
threading.Thread.__init__(self, name="ProgressFfmpeg")
self.stop_event = threading.Event()
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
self.vid_duration_seconds = vid_duration_seconds
self.progress_update_callback = progress_update_callback
def run(self):
while not self.stop_event.is_set():
latest_progress = self.get_latest_ms_progress()
if latest_progress is not None:
completed_percent = latest_progress / self.vid_duration_seconds
self.progress_update_callback(completed_percent)
time.sleep(1)
def get_latest_ms_progress(self):
lines = self.output_file.readlines()
if lines:
for line in lines:
if "out_time_ms" in line:
out_time_ms = line.split("=")[1]
return int(out_time_ms) / 1000000.0
return None
def stop(self):
self.stop_event.set()
def __enter__(self):
self.start()
return self
def __exit__(self, *args, **kwargs):
self.stop()
def name_normalize(name: str) -> str: DEFAULT_MAX_LENGTH: int = 50 # Video length variable, edit this on your own risk. It should work, but it's not supported
name = re.sub(r'[?\\"%*:|<>]', "", name)
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name)
name = re.sub(r"( [w,W]\s?\/)", r" with", name)
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name)
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name)
name = re.sub(r"\/", r"", name)
lang = settings.config["reddit"]["thread"]["post_lang"]
if lang:
print_substep("Translating filename...")
translated_name = translators.google(name, to_language=lang)
return translated_name
else:
return name
def prepare_background(reddit_id: str, W: int, H: int) -> str: class TTSEngine:
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
output = (
ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4")
.filter("crop", f"ih*({W}/{H})", "ih")
.output(
output_path,
an=None,
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
)
.overwrite_output()
)
try:
output.run(quiet=True)
except ffmpeg.Error as e:
print(e.stderr.decode("utf8"))
exit(1)
return output_path
"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.
def merge_background_audio(audio: ffmpeg, reddit_id: str):
"""Gather an audio and merge with assets/backgrounds/background.mp3
Args: Args:
audio (ffmpeg): The TTS final audio but without background. tts_module : The TTS module. Your module should handle the TTS itself and saving to the given path under the run method.
reddit_id (str): The ID of subreddit reddit_object : The reddit object that contains the posts to read.
""" path (Optional) : The unix style path to save the mp3 files to. This must not have leading or trailing slashes.
background_audio_volume = settings.config["settings"]["background"][ max_length (Optional) : The maximum length of the mp3 files in total.
"background_audio_volume"
]
if background_audio_volume == 0:
return audio # Return the original audio
else:
# sets volume to config
bg_audio = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp3").filter(
"volume",
background_audio_volume,
)
# Merges audio and background_audio
merged_audio = ffmpeg.filter([audio, bg_audio], "amix", duration="longest")
return merged_audio # Return merged audio
Notes:
def make_final_video( tts_module must take the arguments text and filepath.
number_of_clips: int,
length: int,
reddit_obj: dict,
background_config: Dict[str, Tuple],
):
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args:
number_of_clips (int): Index to end at when going through the screenshots'
length (int): Length of the video
reddit_obj (dict): The reddit object that contains the posts to read.
background_config (Tuple[str, str, str, Any]): The background config to use.
""" """
# settings values
W: Final[int] = int(settings.config["settings"]["resolution_w"])
H: Final[int] = int(settings.config["settings"]["resolution_h"])
opacity = settings.config["settings"]["opacity"]
reddit_id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
allowOnlyTTSFolder: bool = (
settings.config["settings"]["background"]["enable_extra_audio"]
and settings.config["settings"]["background"]["background_audio_volume"] != 0
)
print_step("Creating the final video 🎥")
background_clip = ffmpeg.input(prepare_background(reddit_id, W=W, H=H))
# Gather all audio clips
audio_clips = list()
if number_of_clips == 0 and settings.config["settings"]["storymode"] == "false":
print(
"No audio clips to gather. Please use a different TTS or post."
) # This is to fix the TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
exit()
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert(
1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")
)
elif settings.config["settings"]["storymodemethod"] == 1:
audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
for i in track(
range(number_of_clips + 1), "Collecting the audio files..."
)
]
audio_clips.insert(
0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")
)
else:
audio_clips = [
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3")
for i in range(number_of_clips)
]
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips_durations = [ def __init__(
float( self,
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"][ tts_module,
"duration" reddit_object: dict,
] path: str = "assets/temp/",
max_length: int = DEFAULT_MAX_LENGTH,
last_clip_length: int = 0,
):
self.tts_module = tts_module()
self.reddit_object = reddit_object
self.redditid = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
self.path = path + self.redditid + "/mp3"
self.max_length = max_length
self.length = 0
self.last_clip_length = last_clip_length
def add_periods(
self,
): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences
for comment in self.reddit_object["comments"]:
# remove links
regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*"
comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"])
comment["comment_body"] = comment["comment_body"].replace("\n", ". ")
comment["comment_body"] = re.sub(r"\bAI\b", "A.I", comment["comment_body"])
comment["comment_body"] = re.sub(
r"\bAGI\b", "A.G.I", comment["comment_body"]
) )
for i in range(number_of_clips) if comment["comment_body"][-1] != ".":
] comment["comment_body"] += "."
audio_clips_durations.insert( comment["comment_body"] = comment["comment_body"].replace(". . .", ".")
0, comment["comment_body"] = comment["comment_body"].replace(".. . ", ".")
float( comment["comment_body"] = comment["comment_body"].replace(". . ", ".")
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][ comment["comment_body"] = re.sub(r'\."\.', '".', comment["comment_body"])
"duration"
] def run(self) -> Tuple[int, int]:
), Path(self.path).mkdir(parents=True, exist_ok=True)
) print_step("Saving Text to MP3 files...")
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
ffmpeg.output( self.add_periods()
audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"} self.call_tts("title", process_text(self.reddit_object["thread_title"]))
).overwrite_output().run(quiet=True) # processed_text = ##self.reddit_object["thread_post"] != ""
idx = 0
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
if settings.config["settings"]["storymode"]:
screenshot_width = int((W * 45) // 100) if settings.config["settings"]["storymodemethod"] == 0:
audio = ffmpeg.input(f"assets/temp/{reddit_id}/audio.mp3") if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars:
final_audio = merge_background_audio(audio, reddit_id) self.split_post(self.reddit_object["thread_post"], "postaudio")
else:
image_clips = list() self.call_tts(
"postaudio", process_text(self.reddit_object["thread_post"])
image_clips.insert( )
0, elif settings.config["settings"]["storymodemethod"] == 1:
ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter( for idx, text in track(enumerate(self.reddit_object["thread_post"])):
"scale", screenshot_width, -1 self.call_tts(f"postaudio-{idx}", process_text(text))
),
)
current_time = 0 else:
if settings.config["settings"]["storymode"]: for idx, comment in track(
audio_clips_durations = [ enumerate(self.reddit_object["comments"]), "Saving..."
float( ):
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")[ # ! Stop creating mp3 files if the length is greater than max length.
"format" if self.length > self.max_length and idx > 1:
]["duration"] self.length -= self.last_clip_length
idx -= 1
break
if (
len(comment["comment_body"]) > self.tts_module.max_chars
): # Split the comment if it is too long
self.split_post(comment["comment_body"], idx) # Split the comment
else: # If the comment is not too long, just call the tts engine
self.call_tts(f"{idx}", process_text(comment["comment_body"]))
print_substep("Saved Text to MP3 files successfully.", style="bold green")
return self.length, idx
def split_post(self, text: str, idx):
split_files = []
split_text = [
x.group().strip()
for x in re.finditer(
r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text
) )
for i in range(number_of_clips)
] ]
audio_clips_durations.insert( self.create_silence_mp3()
0,
float( idy = None
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][ for idy, text_cut in enumerate(split_text):
"duration" newtext = process_text(text_cut)
] # print(f"{idx}-{idy}: {newtext}\n")
),
) if not newtext or newtext.isspace():
if settings.config["settings"]["storymodemethod"] == 0: print("newtext was blank because sanitized split text resulted in none")
image_clips.insert( continue
1, else:
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter( self.call_tts(f"{idx}-{idy}.part", newtext)
"scale", screenshot_width, -1 with open(f"{self.path}/list.txt", "w") as f:
), for idz in range(0, len(split_text)):
) f.write("file " + f"'{idx}-{idz}.part.mp3'" + "\n")
background_clip = background_clip.overlay( split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3"))
image_clips[1], f.write("file " + f"'silence.mp3'" + "\n")
enable=f"between(t,{current_time},{current_time + audio_clips_durations[1]})",
x="(main_w-overlay_w)/2", os.system(
y="(main_h-overlay_h)/2", "ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 "
) + "-i "
current_time += audio_clips_durations[1] + f"{self.path}/list.txt "
elif settings.config["settings"]["storymodemethod"] == 1: + "-c copy "
for i in track( + f"{self.path}/{idx}.mp3"
range(0, number_of_clips + 1), "Collecting the image files..."
):
image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
"scale", screenshot_width, -1
)
)
background_clip = background_clip.overlay(
image_clips[i],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
) )
current_time += audio_clips_durations[i] try:
else: for i in range(0, len(split_files)):
for i in range(0, number_of_clips + 1): os.unlink(split_files[i])
image_clips.append( except FileNotFoundError as e:
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")[ print("File not found: " + e.filename)
"v" except OSError:
].filter("scale", screenshot_width, -1) print("OSError")
)
image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity) def call_tts(self, filename: str, text: str):
background_clip = background_clip.overlay( self.tts_module.run(
image_overlay, text,
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})", filepath=f"{self.path}/{filename}.mp3",
x="(main_w-overlay_w)/2", random_voice=settings.config["settings"]["tts"]["random_voice"],
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[i]
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
title_thumb = reddit_obj["thread_title"]
filename = f"{name_normalize(title)[:251]}"
subreddit = settings.config["reddit"]["thread"]["subreddit"]
if not exists(f"./results/{subreddit}"):
print_substep(
"The 'results' folder could not be found so it was automatically created."
) )
os.makedirs(f"./results/{subreddit}") # try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder: # except (MutagenError, HeaderNotFoundError):
print_substep( # self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
"The 'OnlyTTS' folder could not be found so it was automatically created." try:
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.last_clip_length = clip.duration
self.length += clip.duration
clip.close()
except:
self.length = 0
def create_silence_mp3(self):
silence_duration = settings.config["settings"]["tts"]["silence_duration"]
silence = AudioClip(
make_frame=lambda t: np.sin(440 * 2 * np.pi * t),
duration=silence_duration,
fps=44100,
) )
os.makedirs(f"./results/{subreddit}/OnlyTTS") silence = volumex(silence, 0)
silence.write_audiofile(
# create a thumbnail for the video f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None
settingsbackground = settings.config["settings"]["background"]
if settingsbackground["background_thumbnail"]:
if not exists(f"./results/{subreddit}/thumbnails"):
print_substep(
"The 'results/thumbnails' folder could not be found so it was automatically created."
)
os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next(
(
file
for file in os.listdir("assets/backgrounds")
if file.endswith(".png")
),
None,
) )
if first_image is None:
print_substep("No png files found in assets/backgrounds", "red")
else:
font_family = settingsbackground["background_thumbnail_font_family"]
font_size = settingsbackground["background_thumbnail_font_size"]
font_color = settingsbackground["background_thumbnail_font_color"]
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
width, height = thumbnail.size
thumbnailSave = create_thumbnail(
thumbnail,
font_family,
font_size,
font_color,
width,
height,
title_thumb,
)
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(
f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png"
)
text = f"Background by {background_config['video'][2]}"
background_clip = ffmpeg.drawtext(
background_clip,
text=text,
x=f"(w-text_w)",
y=f"(h-text_h)",
fontsize=5,
fontcolor="White",
fontfile=os.path.join("fonts", "Roboto-Regular.ttf"),
)
background_clip = background_clip.filter("scale", W, H)
print_step("Rendering the video 🎥")
from tqdm import tqdm
pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %")
def on_update_example(progress) -> None:
status = round(progress * 100, 2)
old_percentage = pbar.n
pbar.update(status - old_percentage)
defaultPath = f"results/{subreddit}"
with ProgressFfmpeg(length, on_update_example) as progress:
path = defaultPath + f"/{filename}"
path = (
path[:251] + ".mp4"
) # Prevent a error by limiting the path length, do not change this.
ffmpeg.output(
background_clip,
final_audio,
path,
f="mp4",
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
).overwrite_output().global_args("-progress", progress.output_file.name).run(
quiet=True,
overwrite_output=True,
capture_stdout=False,
capture_stderr=False,
)
old_percentage = pbar.n
pbar.update(100 - old_percentage)
if allowOnlyTTSFolder:
path = defaultPath + f"/OnlyTTS/{filename}"
path = (
path[:251] + ".mp4"
) # Prevent a error by limiting the path length, do not change this.
print_step("Rendering the Only TTS Video 🎥")
with ProgressFfmpeg(length, on_update_example) as progress:
try:
ffmpeg.output(
background_clip,
audio,
path,
f="mp4",
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
).overwrite_output().global_args("-progress", progress.output_file.name).run(
quiet=True,
overwrite_output=True,
capture_stdout=False,
capture_stderr=False,
)
except ffmpeg.Error as e:
print(e.stderr.decode("utf8"))
exit(1)
old_percentage = pbar.n def process_text(text: str, clean: bool = True):
pbar.update(100 - old_percentage) lang = settings.config["reddit"]["thread"]["post_lang"]
pbar.close() new_text = sanitize_text(text) if clean else text
save_data(subreddit, filename + ".mp4", title, idx, background_config["video"][2]) if lang:
print_step("Removing temporary files 🗑") print_substep("Translating Text...")
cleanups = cleanup(reddit_id) #translated_text = translators.google(text, )
print_substep(f"Removed {cleanups} temporary files 🗑") translated_text = translators.translate_text(text, translator='google', to_language='en')
print_step("Done! 🎉 The video is in the results folder 📁") new_text = sanitize_text(translated_text)
return new_text

@ -168,10 +168,11 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
if lang: if lang:
print_substep("Translating post...") print_substep("Translating post...")
texts_in_tl = translators.google( #texts_in_tl = translators.google(
reddit_object["thread_title"], # reddit_object["thread_title"],
to_language=lang, # ,
) #)
texts_in_tl = translators.translate_text(reddit_object["thread_title"], translator='google', to_language='en')
page.evaluate( page.evaluate(
"tl_content => document.querySelector('[data-adclicklocation=\"title\"] > div > div > h1').textContent = tl_content", "tl_content => document.querySelector('[data-adclicklocation=\"title\"] > div > div > h1').textContent = tl_content",
@ -240,10 +241,11 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# translate code # translate code
if settings.config["reddit"]["thread"]["post_lang"]: if settings.config["reddit"]["thread"]["post_lang"]:
comment_tl = translators.google( #comment_tl = translators.google(
comment["comment_body"], # comment["comment_body"],
to_language=settings.config["reddit"]["thread"]["post_lang"], # to_language=settings.config["reddit"]["thread"]["post_lang"],
) #)
comment_tl = translators.translate_text(comment["comment_body"], translator='google', to_language='en')
page.evaluate( page.evaluate(
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content', '([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',
[comment_tl, comment["comment_id"]], [comment_tl, comment["comment_id"]],

Loading…
Cancel
Save