import multiprocessing import os import re import shutil import subprocess import tempfile import textwrap import threading import time from os.path import exists # Needs to be imported specifically from pathlib import Path from typing import Dict, Final, Tuple import ffmpeg def _get_video_codec() -> str: try: result = subprocess.run( ["ffmpeg", "-encoders"], capture_output=True, text=True, timeout=10 ) if "h264_nvenc" in result.stdout: return "h264_nvenc" except (subprocess.TimeoutExpired, FileNotFoundError): pass return "libx264" VIDEO_CODEC = _get_video_codec() import translators from PIL import Image, ImageDraw, ImageFont from rich.console import Console from rich.progress import track from utils import settings from utils.cleanup import cleanup from utils.console import print_step, print_substep from utils.fonts import getheight from utils.id import extract_id from utils.thumbnail import create_thumbnail from utils.videos import save_data console = Console() class ProgressFfmpeg(threading.Thread): def __init__(self, vid_duration_seconds, progress_update_callback): threading.Thread.__init__(self, name="ProgressFfmpeg") self.stop_event = threading.Event() self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) self.vid_duration_seconds = vid_duration_seconds self.progress_update_callback = progress_update_callback def run(self): while not self.stop_event.is_set(): latest_progress = self.get_latest_ms_progress() if latest_progress is not None: completed_percent = latest_progress / self.vid_duration_seconds self.progress_update_callback(completed_percent) time.sleep(1) def get_latest_ms_progress(self): lines = self.output_file.readlines() if lines: for line in lines: if "out_time_ms" in line: out_time_ms_str = line.split("=")[1].strip() if out_time_ms_str.isnumeric(): return float(out_time_ms_str) / 1000000.0 else: # Handle the case when "N/A" is encountered return None return None def stop(self): self.stop_event.set() def __enter__(self): self.start() return self def __exit__(self, *args, **kwargs): self.stop() def name_normalize(name: str) -> str: name = re.sub(r'[?\\"%*:|<>]', "", name) name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name) name = re.sub(r"( [w,W]\s?\/)", r" with", name) name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name) name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name) name = re.sub(r"\/", r"", name) lang = settings.config["reddit"]["thread"]["post_lang"] if lang: print_substep("Translating filename...") translated_name = translators.translate_text(name, translator="google", to_language=lang) return translated_name else: return name def prepare_background(reddit_id: str, W: int, H: int) -> str: output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4" output = ( ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4") .filter("crop", f"ih*({W}/{H})", "ih") .output( output_path, an=None, **{ "c:v": VIDEO_CODEC, "b:v": "20M", "b:a": "192k", "threads": multiprocessing.cpu_count(), }, ) .overwrite_output() ) try: output.run(quiet=True) except ffmpeg.Error as e: print(e.stderr.decode("utf8")) exit(1) return output_path def get_text_height(draw, text, font, max_width): lines = textwrap.wrap(text, width=max_width) total_height = 0 for line in lines: _, _, _, height = draw.textbbox((0, 0), line, font=font) total_height += height return total_height def create_fancy_thumbnail(image, text, text_color, padding, wrap=35): """ It will take the 1px from the middle of the template and will be resized (stretched) vertically to accommodate the extra height needed for the title. """ print_step(f"Creating fancy thumbnail for: {text}") font_title_size = 47 font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), font_title_size) image_width, image_height = image.size # Calculate text height to determine new image height draw = ImageDraw.Draw(image) text_height = get_text_height(draw, text, font, wrap) lines = textwrap.wrap(text, width=wrap) # This is -50 to reduce the empty space at the bottom of the image, # change it as per your requirement if needed otherwise leave it. new_image_height = image_height + text_height + padding * (len(lines) - 1) - 50 # Separate the image into top, middle (1px), and bottom parts top_part_height = image_height // 2 middle_part_height = 1 # 1px height middle section bottom_part_height = image_height - top_part_height - middle_part_height top_part = image.crop((0, 0, image_width, top_part_height)) middle_part = image.crop((0, top_part_height, image_width, top_part_height + middle_part_height)) bottom_part = image.crop((0, top_part_height + middle_part_height, image_width, image_height)) # Stretch the middle part new_middle_height = new_image_height - top_part_height - bottom_part_height middle_part = middle_part.resize((image_width, new_middle_height)) # Create new image with the calculated height new_image = Image.new("RGBA", (image_width, new_image_height)) # Paste the top, stretched middle, and bottom parts into the new image new_image.paste(top_part, (0, 0)) new_image.paste(middle_part, (0, top_part_height)) new_image.paste(bottom_part, (0, top_part_height + new_middle_height)) # Draw the title text on the new image draw = ImageDraw.Draw(new_image) y = top_part_height + padding for line in lines: draw.text((120, y), line, font=font, fill=text_color, align="left") y += get_text_height(draw, line, font, wrap) + padding # Draw the username "PlotPulse" at the specific position username_font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 30) draw.text( (205, 825), settings.config["settings"]["channel_name"], font=username_font, fill=text_color, align="left", ) return new_image def merge_background_audio(audio: ffmpeg, reddit_id: str): """Gather an audio and merge with assets/backgrounds/background.mp3 Args: audio (ffmpeg): The TTS final audio but without background. reddit_id (str): The ID of subreddit """ background_audio_volume = settings.config["settings"]["background"]["background_audio_volume"] if background_audio_volume == 0: return audio # Return the original audio else: # sets volume to config bg_audio = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp3").filter( "volume", background_audio_volume, ) # Merges audio and background_audio merged_audio = ffmpeg.filter([audio, bg_audio], "amix", duration="longest") return merged_audio # Return merged audio def make_final_video( number_of_clips: int, length: int, reddit_obj: dict, background_config: Dict[str, Tuple], ): """Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp Args: number_of_clips (int): Index to end at when going through the screenshots' length (int): Length of the video reddit_obj (dict): The reddit object that contains the posts to read. background_config (Tuple[str, str, str, Any]): The background config to use. """ # settings values W: Final[int] = int(settings.config["settings"]["resolution_w"]) H: Final[int] = int(settings.config["settings"]["resolution_h"]) opacity = settings.config["settings"]["opacity"] reddit_id = extract_id(reddit_obj) allowOnlyTTSFolder: bool = ( settings.config["settings"]["background"]["enable_extra_audio"] and settings.config["settings"]["background"]["background_audio_volume"] != 0 ) print_step("Creating the final video 🎥") background_clip = ffmpeg.input(prepare_background(reddit_id, W=W, H=H)) # Gather all audio clips audio_clips = list() if number_of_clips == 0 and settings.config["settings"]["storymode"] == "false": print( "No audio clips to gather. Please use a different TTS or post." ) # This is to fix the TypeError: unsupported operand type(s) for +: 'int' and 'NoneType' exit() if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymodemethod"] == 0: audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")] audio_clips.insert(1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")) elif settings.config["settings"]["storymodemethod"] == 1: audio_clips = [ ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3") for i in track(range(number_of_clips + 1), "Collecting the audio files...") ] audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")) else: audio_clips = [ ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips) ] audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")) audio_clips_durations = [ float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"]["duration"]) for i in range(number_of_clips) ] audio_clips_durations.insert( 0, float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]), ) audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0) ffmpeg.output( audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"} ).overwrite_output().run(quiet=True) console.log(f"[bold green] Video Will Be: {length} Seconds Long") screenshot_width = int((W * 45) // 100) audio = ffmpeg.input(f"assets/temp/{reddit_id}/audio.mp3") final_audio = merge_background_audio(audio, reddit_id) image_clips = list() Path(f"assets/temp/{reddit_id}/png").mkdir(parents=True, exist_ok=True) # Credits to tim (beingbored) # get the title_template image and draw a text in the middle part of it with the title of the thread title_template = Image.open("assets/title_template.png") title = reddit_obj["thread_title"] title = name_normalize(title) font_color = "#000000" padding = 5 # create_fancy_thumbnail(image, text, text_color, padding title_img = create_fancy_thumbnail(title_template, title, font_color, padding) title_img.save(f"assets/temp/{reddit_id}/png/title.png") image_clips.insert( 0, ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter( "scale", screenshot_width, -1 ), ) current_time = 0 if settings.config["settings"]["storymode"]: audio_clips_durations = [ float( ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")["format"]["duration"] ) for i in range(number_of_clips) ] audio_clips_durations.insert( 0, float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]), ) if settings.config["settings"]["storymodemethod"] == 0: image_clips.insert( 1, ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter( "scale", screenshot_width, -1 ), ) background_clip = background_clip.overlay( image_clips[0], enable=f"between(t,{current_time},{current_time + audio_clips_durations[0]})", x="(main_w-overlay_w)/2", y="(main_h-overlay_h)/2", ) current_time += audio_clips_durations[0] elif settings.config["settings"]["storymodemethod"] == 1: for i in track(range(0, number_of_clips + 1), "Collecting the image files..."): image_clips.append( ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter( "scale", screenshot_width, -1 ) ) background_clip = background_clip.overlay( image_clips[i], enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})", x="(main_w-overlay_w)/2", y="(main_h-overlay_h)/2", ) current_time += audio_clips_durations[i] else: for i in range(0, number_of_clips + 1): image_clips.append( ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")["v"].filter( "scale", screenshot_width, -1 ) ) image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity) assert ( audio_clips_durations is not None ), "Please make a GitHub issue if you see this. Ping @JasonLovesDoggo on GitHub." background_clip = background_clip.overlay( image_overlay, enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})", x="(main_w-overlay_w)/2", y="(main_h-overlay_h)/2", ) current_time += audio_clips_durations[i] title = extract_id(reddit_obj, "thread_title") idx = extract_id(reddit_obj) title_thumb = reddit_obj["thread_title"] filename = f"{name_normalize(title)[:251]}" subreddit = settings.config["reddit"]["thread"]["subreddit"] if not exists(f"./results/{subreddit}"): print_substep("The 'results' folder could not be found so it was automatically created.") os.makedirs(f"./results/{subreddit}") if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder: print_substep("The 'OnlyTTS' folder could not be found so it was automatically created.") os.makedirs(f"./results/{subreddit}/OnlyTTS") # create a thumbnail for the video settingsbackground = settings.config["settings"]["background"] if settingsbackground["background_thumbnail"]: if not exists(f"./results/{subreddit}/thumbnails"): print_substep( "The 'results/thumbnails' folder could not be found so it was automatically created." ) os.makedirs(f"./results/{subreddit}/thumbnails") # get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail first_image = next( (file for file in os.listdir("assets/backgrounds") if file.endswith(".png")), None, ) if first_image is None: print_substep("No png files found in assets/backgrounds", "red") else: font_family = settingsbackground["background_thumbnail_font_family"] font_size = settingsbackground["background_thumbnail_font_size"] font_color = settingsbackground["background_thumbnail_font_color"] thumbnail = Image.open(f"assets/backgrounds/{first_image}") width, height = thumbnail.size thumbnailSave = create_thumbnail( thumbnail, font_family, font_size, font_color, width, height, title_thumb, ) thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png") print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png") text = f"Background by {background_config['video'][2]}" background_clip = ffmpeg.drawtext( background_clip, text=text, x=f"(w-text_w)", y=f"(h-text_h)", fontsize=5, fontcolor="White", fontfile=os.path.join("fonts", "Roboto-Regular.ttf"), ) background_clip = background_clip.filter("scale", W, H) print_step("Rendering the video 🎥") from tqdm import tqdm pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %") def on_update_example(progress) -> None: status = round(progress * 100, 2) old_percentage = pbar.n pbar.update(status - old_percentage) defaultPath = f"results/{subreddit}" with ProgressFfmpeg(length, on_update_example) as progress: path = defaultPath + f"/{filename}" path = ( path[:251] + ".mp4" ) # Prevent a error by limiting the path length, do not change this. try: ffmpeg.output( background_clip, final_audio, path, f="mp4", **{ "c:v": VIDEO_CODEC, "b:v": "20M", "b:a": "192k", "threads": multiprocessing.cpu_count(), }, ).overwrite_output().global_args("-progress", progress.output_file.name).run( quiet=True, overwrite_output=True, capture_stdout=False, capture_stderr=False, ) except ffmpeg.Error as e: print(e.stderr.decode("utf8")) exit(1) old_percentage = pbar.n pbar.update(100 - old_percentage) if allowOnlyTTSFolder: path = defaultPath + f"/OnlyTTS/{filename}" path = ( path[:251] + ".mp4" ) # Prevent a error by limiting the path length, do not change this. print_step("Rendering the Only TTS Video 🎥") with ProgressFfmpeg(length, on_update_example) as progress: try: ffmpeg.output( background_clip, audio, path, f="mp4", **{ "c:v": VIDEO_CODEC, "b:v": "20M", "b:a": "192k", "threads": multiprocessing.cpu_count(), }, ).overwrite_output().global_args("-progress", progress.output_file.name).run( quiet=True, overwrite_output=True, capture_stdout=False, capture_stderr=False, ) except ffmpeg.Error as e: print(e.stderr.decode("utf8")) exit(1) old_percentage = pbar.n pbar.update(100 - old_percentage) pbar.close() save_data(subreddit, filename + ".mp4", title, idx, background_config["video"][2]) print_step("Removing temporary files 🗑") cleanups = cleanup(reddit_id) print_substep(f"Removed {cleanups} temporary files 🗑") print_step("Done! 🎉 The video is in the results folder 📁")