Bot can run multiple instances at the same time (#1091)

* Bot can run multiple instances at the same time

* Delete TTS/__pycache__ directory

* Delete video_creation/__pycache__ directory

* Delete videos.json

* Delete utils/__pycache__ directory

* Create videos.json

* Moved id to utils

* Added cleanup in shutdown and fixed bug

* Removed watermark todo and fixed it

* Delete final_video.py

* Update final_video.py

* Delete video.py

* Update video.py

* Delete id.py

* feat: meaningful error message for bad credentials

* chore: remove comment reference to .env

Co-authored-by: Callum Leslie <git@cleslie.uk>
pull/1137/head
Simon 2 years ago committed by GitHub
parent 7658bd1071
commit 5a8db46178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -34,13 +34,14 @@ class TTSEngine:
self, self,
tts_module, tts_module,
reddit_object: dict, reddit_object: dict,
path: str = "assets/temp/mp3", path: str = "assets/temp/",
max_length: int = DEFAULT_MAX_LENGTH, max_length: int = DEFAULT_MAX_LENGTH,
last_clip_length: int = 0, last_clip_length: int = 0,
): ):
self.tts_module = tts_module() self.tts_module = tts_module()
self.reddit_object = reddit_object self.reddit_object = reddit_object
self.path = path self.redditid = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
self.path = path + self.redditid + "/mp3"
self.max_length = max_length self.max_length = max_length
self.length = 0 self.length = 0
self.last_clip_length = last_clip_length self.last_clip_length = last_clip_length

@ -1,5 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
import math import math
import re
from subprocess import Popen from subprocess import Popen
from os import name from os import name
@ -7,8 +8,9 @@ from prawcore import ResponseException
from reddit.subreddit import get_subreddit_threads from reddit.subreddit import get_subreddit_threads
from utils.cleanup import cleanup from utils.cleanup import cleanup
from utils.console import print_markdown, print_step from utils.console import print_markdown, print_step, print_substep
from utils import settings from utils import settings
from utils.id import id
from utils.version import checkversion from utils.version import checkversion
from video_creation.background import ( from video_creation.background import (
@ -40,14 +42,15 @@ checkversion(__VERSION__)
def main(POST_ID=None): def main(POST_ID=None):
cleanup()
reddit_object = get_subreddit_threads(POST_ID) reddit_object = get_subreddit_threads(POST_ID)
global redditid
redditid = id(reddit_object)
length, number_of_comments = save_text_to_mp3(reddit_object) length, number_of_comments = save_text_to_mp3(reddit_object)
length = math.ceil(length) length = math.ceil(length)
download_screenshots_of_reddit_posts(reddit_object, number_of_comments) download_screenshots_of_reddit_posts(reddit_object, number_of_comments)
bg_config = get_background_config() bg_config = get_background_config()
download_background(bg_config) download_background(bg_config)
chop_background_video(bg_config, length) chop_background_video(bg_config, length, reddit_object)
make_final_video(number_of_comments, length, reddit_object, bg_config) make_final_video(number_of_comments, length, reddit_object, bg_config)
@ -62,10 +65,15 @@ def run_many(times):
def shutdown(): def shutdown():
print_markdown("## Clearing temp files") print_markdown("## Clearing temp files")
cleanup() try:
redditid
except NameError:
print("Exiting...")
exit()
else:
cleanup(redditid)
print("Exiting...") print("Exiting...")
exit() exit()
if __name__ == "__main__": if __name__ == "__main__":
config = settings.check_toml("utils/.config.template.toml", "config.toml") config = settings.check_toml("utils/.config.template.toml", "config.toml")

@ -6,7 +6,7 @@ def _listdir(d): # listdir with full path
return [os.path.join(d, f) for f in os.listdir(d)] return [os.path.join(d, f) for f in os.listdir(d)]
def cleanup() -> int: def cleanup(id) -> int:
"""Deletes all temporary assets in assets/temp """Deletes all temporary assets in assets/temp
Returns: Returns:
@ -18,7 +18,7 @@ def cleanup() -> int:
count += len(files) count += len(files)
for f in files: for f in files:
os.remove(f) os.remove(f)
REMOVE_DIRS = ["./assets/temp/mp3/", "./assets/temp/png/"] REMOVE_DIRS = [f"./assets/temp/{id}/mp3/", f"./assets/temp/{id}/png/"]
files_to_remove = list(map(_listdir, REMOVE_DIRS)) files_to_remove = list(map(_listdir, REMOVE_DIRS))
for directory in files_to_remove: for directory in files_to_remove:
for file in directory: for file in directory:

@ -0,0 +1,10 @@
import re
from utils import print_substep
def id(reddit_obj: dict):
"""
This function takes a reddit object and returns the post id
"""
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
print_substep(f"Thread ID is {id}", style="bold blue")
return id

@ -1,4 +1,6 @@
from __future__ import annotations from __future__ import annotations
from ast import Str
import re
from typing import Tuple from typing import Tuple
@ -14,8 +16,9 @@ class Video:
self.duration = self.video.duration self.duration = self.video.duration
@staticmethod @staticmethod
def _create_watermark(text, fontsize, opacity=0.5): def _create_watermark(text, redditid, fontsize, opacity=0.5):
path = "./assets/temp/png/watermark.png" id = re.sub(r"[^\w\s-]", "", redditid["thread_id"])
path = f"./assets/temp/{id}/png/watermark.png"
width = int(fontsize * len(text)) width = int(fontsize * len(text))
height = int(fontsize * len(text) / 2) height = int(fontsize * len(text) / 2)
white = (255, 255, 255) white = (255, 255, 255)
@ -35,7 +38,7 @@ class Video:
return ImageClip(path) return ImageClip(path)
def add_watermark( def add_watermark(
self, text, opacity=0.5, duration: int | float = 5, position: Tuple = (0.7, 0.9), fontsize=15 self, text, redditid, opacity=0.5, duration: int | float = 5, position: Tuple = (0.7, 0.9), fontsize=15
): ):
compensation = round( compensation = round(
(position[0] / ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])), (position[0] / ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])),
@ -44,7 +47,7 @@ class Video:
position = (compensation, position[1]) position = (compensation, position[1])
# print(f'{compensation=}') # print(f'{compensation=}')
# print(f'{position=}') # print(f'{position=}')
img_clip = self._create_watermark(text, opacity=opacity, fontsize=fontsize) img_clip = self._create_watermark(text, redditid, fontsize=fontsize, opacity=opacity)
img_clip = img_clip.set_opacity(opacity).set_duration(duration) img_clip = img_clip.set_opacity(opacity).set_duration(duration)
img_clip = img_clip.set_position( img_clip = img_clip.set_position(
position, relative=True position, relative=True

@ -1,6 +1,7 @@
from pathlib import Path from pathlib import Path
import random import random
from random import randrange from random import randrange
import re
from typing import Any, Tuple from typing import Any, Tuple
@ -62,7 +63,7 @@ def download_background(background_config: Tuple[str, str, str, Any]):
print_substep("Background video downloaded successfully! 🎉", style="bold green") print_substep("Background video downloaded successfully! 🎉", style="bold green")
def chop_background_video(background_config: Tuple[str, str, str, Any], video_length: int): def chop_background_video(background_config: Tuple[str, str, str, Any], video_length: int, reddit_object: dict):
"""Generates the background footage to be used in the video and writes it to assets/temp/background.mp4 """Generates the background footage to be used in the video and writes it to assets/temp/background.mp4
Args: Args:
@ -72,7 +73,7 @@ def chop_background_video(background_config: Tuple[str, str, str, Any], video_le
print_step("Finding a spot in the backgrounds video to chop...✂️") print_step("Finding a spot in the backgrounds video to chop...✂️")
choice = f"{background_config[2]}-{background_config[1]}" choice = f"{background_config[2]}-{background_config[1]}"
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
background = VideoFileClip(f"assets/backgrounds/{choice}") background = VideoFileClip(f"assets/backgrounds/{choice}")
start_time, end_time = get_start_and_end_times(video_length, background.duration) start_time, end_time = get_start_and_end_times(video_length, background.duration)
@ -81,12 +82,12 @@ def chop_background_video(background_config: Tuple[str, str, str, Any], video_le
f"assets/backgrounds/{choice}", f"assets/backgrounds/{choice}",
start_time, start_time,
end_time, end_time,
targetname="assets/temp/background.mp4", targetname=f"assets/temp/{id}/background.mp4",
) )
except (OSError, IOError): # ffmpeg issue see #348 except (OSError, IOError): # ffmpeg issue see #348
print_substep("FFMPEG issue. Trying again...") print_substep("FFMPEG issue. Trying again...")
with VideoFileClip(f"assets/backgrounds/{choice}") as video: with VideoFileClip(f"assets/backgrounds/{choice}") as video:
new = video.subclip(start_time, end_time) new = video.subclip(start_time, end_time)
new.write_videofile("assets/temp/background.mp4") new.write_videofile(f"assets/temp/{id}/background.mp4")
print_substep("Background video chopped successfully!", style="bold green") print_substep("Background video chopped successfully!", style="bold green")
return background_config[2] return background_config[2]

@ -62,21 +62,22 @@ def make_final_video(
# except (TypeError, KeyError): # except (TypeError, KeyError):
# print('No background audio volume found in config.toml. Using default value of 1.') # print('No background audio volume found in config.toml. Using default value of 1.')
# VOLUME_MULTIPLIER = 1 # VOLUME_MULTIPLIER = 1
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
print_step("Creating the final video 🎥") print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W) VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H) VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = settings.config["settings"]["opacity"] opacity = settings.config["settings"]["opacity"]
transition = settings.config["settings"]["transition"] transition = settings.config["settings"]["transition"]
background_clip = ( background_clip = (
VideoFileClip("assets/temp/background.mp4") VideoFileClip(f"assets/temp/{id}/background.mp4")
.without_audio() .without_audio()
.resize(height=H) .resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920) .crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
) )
# Gather all audio clips # Gather all audio clips
audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)] audio_clips = [AudioFileClip(f"assets/temp/{id}/mp3/{i}.mp3") for i in range(number_of_clips)]
audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3")) audio_clips.insert(0, AudioFileClip(f"assets/temp/{id}/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips) audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat]) audio_composite = CompositeAudioClip([audio_concat])
@ -88,7 +89,7 @@ def make_final_video(
new_transition = 0 if transition is None or float(transition) > 2 else float(transition) new_transition = 0 if transition is None or float(transition) > 2 else float(transition)
image_clips.insert( image_clips.insert(
0, 0,
ImageClip("assets/temp/png/title.png") ImageClip(f"assets/temp/{id}/png/title.png")
.set_duration(audio_clips[0].duration) .set_duration(audio_clips[0].duration)
.resize(width=W - 100) .resize(width=W - 100)
.set_opacity(new_opacity) .set_opacity(new_opacity)
@ -98,7 +99,7 @@ def make_final_video(
for i in range(0, number_of_clips): for i in range(0, number_of_clips):
image_clips.append( image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png") ImageClip(f"assets/temp/{id}/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration) .set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100) .resize(width=W - 100)
.set_opacity(new_opacity) .set_opacity(new_opacity)
@ -140,10 +141,10 @@ def make_final_video(
# # VOLUME_MULTIPLIER) # lower volume by background_audio_volume, use with fx # # VOLUME_MULTIPLIER) # lower volume by background_audio_volume, use with fx
# final.set_audio(final_audio) # final.set_audio(final_audio)
final = Video(final).add_watermark( final = Video(final).add_watermark(
text=f"Background credit: {background_config[2]}", opacity=0.4 text=f"Background credit: {background_config[2]}", opacity=0.4, redditid=reddit_obj
) )
final.write_videofile( final.write_videofile(
"assets/temp/temp.mp4", f"assets/temp/{id}/temp.mp4",
fps=30, fps=30,
audio_codec="aac", audio_codec="aac",
audio_bitrate="192k", audio_bitrate="192k",
@ -151,14 +152,14 @@ def make_final_video(
threads=multiprocessing.cpu_count(), threads=multiprocessing.cpu_count(),
) )
ffmpeg_extract_subclip( ffmpeg_extract_subclip(
"assets/temp/temp.mp4", f"assets/temp/{id}/temp.mp4",
0, 0,
length, length,
targetname=f"results/{subreddit}/{filename}", targetname=f"results/{subreddit}/{filename}",
) )
save_data(subreddit, filename, title, idx, background_config[2]) save_data(subreddit, filename, title, idx, background_config[2])
print_step("Removing temporary files 🗑") print_step("Removing temporary files 🗑")
cleanups = cleanup() cleanups = cleanup(id)
print_substep(f"Removed {cleanups} temporary files 🗑") print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!") print_substep("See result in the results folder!")

@ -1,6 +1,7 @@
import json import json
from pathlib import Path from pathlib import Path
import re
from typing import Dict from typing import Dict
from utils import settings from utils import settings
from playwright.async_api import async_playwright # pylint: disable=unused-import from playwright.async_api import async_playwright # pylint: disable=unused-import
@ -24,9 +25,9 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
screenshot_num (int): Number of screenshots to download screenshot_num (int): Number of screenshots to download
""" """
print_step("Downloading screenshots of reddit posts...") print_step("Downloading screenshots of reddit posts...")
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
# ! Make sure the reddit screenshots folder exists # ! Make sure the reddit screenshots folder exists
Path("assets/temp/png").mkdir(parents=True, exist_ok=True) Path(f"assets/temp/{id}/png").mkdir(parents=True, exist_ok=True)
with sync_playwright() as p: with sync_playwright() as p:
print_substep("Launching Headless Browser...") print_substep("Launching Headless Browser...")
@ -72,11 +73,12 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
else: else:
print_substep("Skipping translation...") print_substep("Skipping translation...")
page.locator('[data-test-id="post-content"]').screenshot(path="assets/temp/png/title.png") postcontentpath = f"assets/temp/{id}/png/title.png"
page.locator('[data-test-id="post-content"]').screenshot(path= postcontentpath)
if storymode: if storymode:
page.locator('[data-click-id="text"]').screenshot( page.locator('[data-click-id="text"]').screenshot(
path="assets/temp/png/story_content.png" path=f"assets/temp/{id}/png/story_content.png"
) )
else: else:
for idx, comment in enumerate( for idx, comment in enumerate(
@ -104,7 +106,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
) )
try: try:
page.locator(f"#t1_{comment['comment_id']}").screenshot( page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/png/comment_{idx}.png" path=f"assets/temp/{id}/png/comment_{idx}.png"
) )
except TimeoutError: except TimeoutError:
del reddit_object["comments"] del reddit_object["comments"]

Loading…
Cancel
Save