Added a lot of code, needs to be optimized. Watermark almost done!

pull/1409/head
Simon 2 years ago
parent 4eeacc9754
commit 3016ae8e73

@ -1,68 +0,0 @@
from __future__ import annotations
import re
from typing import Tuple
from PIL import ImageFont, Image, ImageDraw, ImageEnhance
from moviepy.video.VideoClip import VideoClip, ImageClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
class Video:
def __init__(self, video: VideoClip, *args, **kwargs):
self.video: VideoClip = video
self.fps = self.video.fps
self.duration = self.video.duration
@staticmethod
def _create_watermark(text, redditid, fontsize, opacity=0.5):
id = re.sub(r"[^\w\s-]", "", redditid["thread_id"])
path = f"./assets/temp/{id}/png/watermark.png"
width = int(fontsize * len(text))
height = int(fontsize * len(text) / 2)
white = (255, 255, 255)
transparent = (0, 0, 0, 0)
font = ImageFont.load_default()
wm = Image.new("RGBA", (width, height), transparent)
im = Image.new("RGBA", (width, height), transparent) # Change this line too.
draw = ImageDraw.Draw(wm)
w, h = draw.textsize(text, font)
draw.text(((width - w) / 2, (height - h) / 2), text, white, font)
en = ImageEnhance.Brightness(wm) # todo allow it to use the fontsize
mask = en.enhance(1 - opacity)
im.paste(wm, (25, 25), mask)
im.save(path)
return ImageClip(path)
def add_watermark(
self,
text,
redditid,
opacity=0.5,
duration: int | float = 5,
position: Tuple = (0.7, 0.9),
fontsize=15,
):
compensation = round(
(
position[0]
/ ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])
),
ndigits=2,
)
position = (compensation, position[1])
# print(f'{compensation=}')
# print(f'{position=}')
img_clip = self._create_watermark(
text, redditid, fontsize=fontsize, opacity=opacity
)
img_clip = img_clip.set_opacity(opacity).set_duration(duration)
img_clip = img_clip.set_position(
position, relative=True
) # todo get dara from utils/CONSTANTS.py and adapt position accordingly
# Overlay the img clip on the first video clip
self.video = CompositeVideoClip([self.video, img_clip])
return self.video

@ -1,35 +1,67 @@
import multiprocessing
import os import os
import re import re
import multiprocessing
from os.path import exists
from typing import Tuple, Any, Final
import translators as ts
import shutil import shutil
from os.path import exists
from typing import Final
from typing import Tuple, Any from typing import Tuple, Any
from PIL import Image
from moviepy.audio.AudioClip import concatenate_audioclips, CompositeAudioClip import ffmpeg
from moviepy.audio.io.AudioFileClip import AudioFileClip import translators as ts
from moviepy.video.VideoClip import ImageClip from PIL import Image
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.compositing.concatenate import concatenate_videoclips
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
from rich.console import Console from rich.console import Console
from rich.progress import track from rich.progress import track
import ffmpeg from utils import settings
from utils.cleanup import cleanup from utils.cleanup import cleanup
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.video import Video
from utils.videos import save_data
from utils.thumbnail import create_thumbnail
from utils import settings
from utils.thumbnail import create_thumbnail from utils.thumbnail import create_thumbnail
from utils.videos import save_data
console = Console() console = Console()
import tempfile
import threading
import time
class ProgressFfmpeg(threading.Thread):
def __init__(self, vid_duration_seconds, progress_update_callback):
threading.Thread.__init__(self, name='ProgressFfmpeg')
self.stop_event = threading.Event()
self.output_file = tempfile.NamedTemporaryFile(mode='w+', delete=False)
self.vid_duration_seconds = vid_duration_seconds
self.progress_update_callback = progress_update_callback
def run(self):
while not self.stop_event.is_set():
latest_progress = self.get_latest_ms_progress()
if latest_progress is not None:
completed_percent = latest_progress / self.vid_duration_seconds
self.progress_update_callback(completed_percent)
time.sleep(1)
def get_latest_ms_progress(self):
lines = self.output_file.readlines()
if lines:
for line in lines:
if 'out_time_ms' in line:
out_time_ms = line.split('=')[1]
return int(out_time_ms) / 1000000.0
return None
def stop(self):
self.stop_event.set()
def __enter__(self):
self.start()
return self
def __exit__(self, *args, **kwargs):
self.stop()
def name_normalize(name: str) -> str: def name_normalize(name: str) -> str:
name = re.sub(r'[?\\"%*:|<>]', "", name) name = re.sub(r'[?\\"%*:|<>]', "", name)
@ -50,10 +82,10 @@ def name_normalize(name: str) -> str:
def prepare_background(reddit_id: str, W: int, H: int) -> str: def prepare_background(reddit_id: str, W: int, H: int) -> str:
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4" output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
output = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4").filter('crop', "ih*(9/16)", "ih").output( output = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4").filter('crop', f"ih*({W}/{H})", "ih").output(
output_path, an=None, output_path, an=None,
**{"c:v": "h264", "b:v": "20M", "b:a": "192k", "threads": multiprocessing.cpu_count()}).overwrite_output() **{"c:v": "h264", "b:v": "20M", "b:a": "192k", "threads": multiprocessing.cpu_count()}).overwrite_output()
output.run() output.run(quiet=True)
return output_path return output_path
@ -98,11 +130,14 @@ def make_final_video(
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips)] audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips)]
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")) audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
audio_clips_durations = [float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")['format']['duration']) for i in audio_clips_durations = [float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")['format']['duration']) for i
in
range(number_of_clips)] range(number_of_clips)]
audio_clips_durations.insert(0, float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")['format']['duration'])) audio_clips_durations.insert(0, float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")['format']['duration']))
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0) audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
ffmpeg.output(audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}).overwrite_output().run() ffmpeg.output(audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}).overwrite_output().run(
quiet=True)
console.log(f"[bold green] Video Will Be: {length} Seconds Long") console.log(f"[bold green] Video Will Be: {length} Seconds Long")
# Create a screenshot_width variable to scale the screenshots to the correct size, the calculation is int((W * 90) // 100) # Create a screenshot_width variable to scale the screenshots to the correct size, the calculation is int((W * 90) // 100)
@ -120,9 +155,11 @@ def make_final_video(
current_time = 0 current_time = 0
if settings.config["settings"]["storymode"]: if settings.config["settings"]["storymode"]:
audio_clips_durations = [float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")['format']['duration']) for i in audio_clips_durations = [
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")['format']['duration']) for i in
range(number_of_clips)] range(number_of_clips)]
audio_clips_durations.insert(0, float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")['format']['duration'])) audio_clips_durations.insert(0, float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")['format']['duration']))
if settings.config["settings"]["storymodemethod"] == 0: if settings.config["settings"]["storymodemethod"] == 0:
image_clips.insert( image_clips.insert(
1, 1,
@ -130,7 +167,7 @@ def make_final_video(
.filter('scale', screenshot_width, -1) .filter('scale', screenshot_width, -1)
) )
background_clip = background_clip.overlay(image_clips[1], background_clip = background_clip.overlay(image_clips[1],
enable=f'between(t,{current_time},{current_time + audio_clips_durations[i]})', enable=f'between(t,{current_time},{current_time + audio_clips_durations[1]})',
x='(main_w-overlay_w)/2', y='(main_h-overlay_h)/2') x='(main_w-overlay_w)/2', y='(main_h-overlay_h)/2')
current_time += audio_clips_durations[1] current_time += audio_clips_durations[1]
elif settings.config["settings"]["storymodemethod"] == 1: elif settings.config["settings"]["storymodemethod"] == 1:
@ -163,18 +200,11 @@ def make_final_video(
filename = f"{name_normalize(title)[:251]}" filename = f"{name_normalize(title)[:251]}"
subreddit = settings.config["reddit"]["thread"]["subreddit"] subreddit = settings.config["reddit"]["thread"]["subreddit"]
final = ffmpeg.output(background_clip, audio, f"results/{subreddit}/{filename}.mp4", f='mp4',
**{"c:v": "h264", "b:v": "20M", "b:a": "192k",
"threads": multiprocessing.cpu_count()}).overwrite_output()
if not exists(f"./results/{subreddit}"): if not exists(f"./results/{subreddit}"):
print_substep("The results folder didn't exist so I made it") print_substep("The results folder didn't exist so I made it")
os.makedirs(f"./results/{subreddit}") os.makedirs(f"./results/{subreddit}")
# create a tumbnail for the video # create a thumbnail for the video
settingsbackground = settings.config["settings"]["background"] settingsbackground = settings.config["settings"]["background"]
if settingsbackground["background_thumbnail"]: if settingsbackground["background_thumbnail"]:
@ -194,7 +224,7 @@ def make_final_video(
if first_image is None: if first_image is None:
print_substep("No png files found in assets/backgrounds", "red") print_substep("No png files found in assets/backgrounds", "red")
if settingsbackground["background_thumbnail"] and first_image: else:
font_family = settingsbackground["background_thumbnail_font_family"] font_family = settingsbackground["background_thumbnail_font_family"]
font_size = settingsbackground["background_thumbnail_font_size"] font_size = settingsbackground["background_thumbnail_font_size"]
font_color = settingsbackground["background_thumbnail_font_color"] font_color = settingsbackground["background_thumbnail_font_color"]
@ -204,17 +234,32 @@ def make_final_video(
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png") thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png") print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
# create a tumbnail for the video text = f"Background by {background_config[2]}"
settingsbackground = settings.config["settings"]["background"]
print_step("Rendering the video 🎥")
from tqdm import tqdm
pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %")
def on_update_example(progress):
status = round(progress * 100, 2)
old_percentage = pbar.n
pbar.update(status - old_percentage)
position = W // 2, H - 20
ffmpeg.filter(filter_name='drawtext', stream_spec=background_clip, text=text, fontfile='fonts/Roboto-Regular.ttf', fontsize=12,
fontcolor='white', x=position[0], y=position[1], box=1)
with ProgressFfmpeg(length, on_update_example) as progress:
ffmpeg.output(background_clip, audio, f"results/{subreddit}/{filename}.mp4", f='mp4',
**{"c:v": "h264", "b:v": "20M", "b:a": "192k",
"threads": multiprocessing.cpu_count()}).overwrite_output().global_args('-progress',
progress.output_file.name).run(
quiet=True, overwrite_output=True, capture_stdout=False, capture_stderr=False)
old_percentage = pbar.n
pbar.update(100 - old_percentage)
pbar.close()
# final = Video(final).add_watermark(
# text=f"Background credit: {background_config[2]}",
# opacity=0.4,
# redditid=reddit_obj,
# )
#
# from utils.video import Video.Video.add_watermark()
if settingsbackground["background_thumbnail"]: if settingsbackground["background_thumbnail"]:
if not exists(f"./results/{subreddit}/thumbnails"): if not exists(f"./results/{subreddit}/thumbnails"):
@ -243,7 +288,6 @@ def make_final_video(
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png") thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png") print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
final.run()
# get the thumbnail image from assets/temp/id/thumbnail.png and save it in results/subreddit/thumbnails # get the thumbnail image from assets/temp/id/thumbnail.png and save it in results/subreddit/thumbnails
if settingsbackground["background_thumbnail"] and exists(f"assets/temp/{reddit_id}/thumbnail.png"): if settingsbackground["background_thumbnail"] and exists(f"assets/temp/{reddit_id}/thumbnail.png"):
shutil.move(f"assets/temp/{reddit_id}/thumbnail.png", f"./results/{subreddit}/thumbnails/{filename}.png") shutil.move(f"assets/temp/{reddit_id}/thumbnail.png", f"./results/{subreddit}/thumbnails/{filename}.png")

Loading…
Cancel
Save