Merge pull request #1137 from elebumm/develop

2.4.1
pull/1141/head 2.4.1
Callum Leslie 2 years ago committed by GitHub
commit 7540dbeae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,6 +1,7 @@
# Import the server module
import http.server
import webbrowser
# Set the hostname
HOST = "localhost"
# Set the port number
@ -9,15 +10,16 @@ PORT = 4000
# Define class to display the index page of the web server
class PythonServer(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/GUI':
self.path = 'index.html'
if self.path == "/GUI":
self.path = "index.html"
return http.server.SimpleHTTPRequestHandler.do_GET(self)
# Declare object of the class
webServer = http.server.HTTPServer((HOST, PORT), PythonServer)
# Print the URL of the webserver, new =2 opens in a new tab
print(f"Server started at http://{HOST}:{PORT}/GUI/")
webbrowser.open(f'http://{HOST}:{PORT}/GUI/', new = 2)
webbrowser.open(f"http://{HOST}:{PORT}/GUI/", new=2)
print("Website opened in new tab")
print("Press Ctrl+C to quit")
try:
@ -27,4 +29,4 @@ except KeyboardInterrupt:
# Stop the web server
webServer.server_close()
print("The server is stopped.")
exit()
exit()

@ -34,13 +34,14 @@ class TTSEngine:
self,
tts_module,
reddit_object: dict,
path: str = "assets/temp/mp3",
path: str = "assets/temp/",
max_length: int = DEFAULT_MAX_LENGTH,
last_clip_length: int = 0,
):
self.tts_module = tts_module()
self.reddit_object = reddit_object
self.path = path
self.redditid = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
self.path = path + self.redditid + "/mp3"
self.max_length = max_length
self.length = 0
self.last_clip_length = last_clip_length

@ -19,9 +19,7 @@ class pyttsx:
if voice_id == "" or voice_num == "":
voice_id = 2
voice_num = 3
raise ValueError(
"set pyttsx values to a valid value, switching to defaults"
)
raise ValueError("set pyttsx values to a valid value, switching to defaults")
else:
voice_id = int(voice_id)
voice_num = int(voice_num)

@ -1,5 +1,6 @@
#!/usr/bin/env python
import math
import re
from subprocess import Popen
from os import name
@ -7,8 +8,10 @@ from prawcore import ResponseException
from reddit.subreddit import get_subreddit_threads
from utils.cleanup import cleanup
from utils.console import print_markdown, print_step
from utils.console import print_markdown, print_step, print_substep
from utils import settings
from utils.id import id
from utils.version import checkversion
from video_creation.background import (
download_background,
@ -19,8 +22,7 @@ from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3
__VERSION__ = "2.4"
__BRANCH__ = "master"
__VERSION__ = "2.4.1"
print(
"""
@ -36,18 +38,19 @@ print(
print_markdown(
"### Thanks for using this tool! [Feel free to contribute to this project on GitHub!](https://lewismenelaws.com) If you have any questions, feel free to reach out to me on Twitter or submit a GitHub issue. You can find solutions to many common problems in the [Documentation](https://luka-hietala.gitbook.io/documentation-for-the-reddit-bot/)"
)
print_step(f"You are using v{__VERSION__} of the bot")
checkversion(__VERSION__)
def main(POST_ID=None):
cleanup()
reddit_object = get_subreddit_threads(POST_ID)
global redditid
redditid = id(reddit_object)
length, number_of_comments = save_text_to_mp3(reddit_object)
length = math.ceil(length)
download_screenshots_of_reddit_posts(reddit_object, number_of_comments)
bg_config = get_background_config()
download_background(bg_config)
chop_background_video(bg_config, length)
chop_background_video(bg_config, length, reddit_object)
make_final_video(number_of_comments, length, reddit_object, bg_config)
@ -62,10 +65,15 @@ def run_many(times):
def shutdown():
print_markdown("## Clearing temp files")
cleanup()
print("Exiting...")
exit()
try:
redditid
except NameError:
print("Exiting...")
exit()
else:
cleanup(redditid)
print("Exiting...")
exit()
if __name__ == "__main__":
config = settings.check_toml("utils/.config.template.toml", "config.toml")

@ -1,5 +1,7 @@
import re
from prawcore.exceptions import ResponseException
from utils import settings
import praw
from praw.models import MoreComments
@ -29,14 +31,21 @@ def get_subreddit_threads(POST_ID: str):
username = settings.config["reddit"]["creds"]["username"]
if str(username).casefold().startswith("u/"):
username = username[2:]
reddit = praw.Reddit(
client_id=settings.config["reddit"]["creds"]["client_id"],
client_secret=settings.config["reddit"]["creds"]["client_secret"],
user_agent="Accessing Reddit threads",
username=username,
passkey=passkey,
check_for_async=False,
)
try:
reddit = praw.Reddit(
client_id=settings.config["reddit"]["creds"]["client_id"],
client_secret=settings.config["reddit"]["creds"]["client_secret"],
user_agent="Accessing Reddit threads",
username=username,
passkey=passkey,
check_for_async=False,
)
except ResponseException as e:
match e.response.status_code:
case 401:
print("Invalid credentials - please check them in config.toml")
except:
print("Something went wrong...")
# Ask user for subreddit input
print_step("Getting subreddit threads...")
@ -57,9 +66,7 @@ def get_subreddit_threads(POST_ID: str):
subreddit_choice = sub
if str(subreddit_choice).casefold().startswith("r/"): # removes the r/ from the input
subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit(
subreddit_choice
) # Allows you to specify in .env. Done for automation purposes.
subreddit = reddit.subreddit(subreddit_choice)
if POST_ID: # would only be called if there are multiple queued posts
submission = reddit.submission(id=POST_ID)

@ -9,5 +9,5 @@ requests==2.28.1
rich==12.5.1
toml==0.10.2
translators==5.3.1
pyttsx3==2.90
Pillow~=9.1.1

@ -8,7 +8,7 @@ password = { optional = false, nmin = 8, explanation = "The password of your red
[reddit.thread]
random = { optional = true, options = [true, false,], default = false, type = "bool", explanation = "If set to no, it will ask you a thread link to extract the thread, if yes it will randomize it. Default: 'False'", example = "True" }
subreddit = { optional = false, regex = "[_0-9a-zA-Z]+$", nmin = 3, explanation = "What subreddit to pull posts from, the name of the sub, not the URL", example = "AskReddit", oob_error = "A subreddit name HAS to be between 3 and 20 characters" }
subreddit = { optional = false, regex = "[_0-9a-zA-Z]+$", nmin = 3, explanation = "What subreddit to pull posts from, the name of the sub, not the URL. You can have multiple subreddits, add an + with no spaces.", example = "AskReddit+Redditdev", oob_error = "A subreddit name HAS to be between 3 and 20 characters" }
post_id = { optional = true, default = "", regex = "^((?!://|://)[+a-zA-Z0-9])*$", explanation = "Used if you want to use a specific post.", example = "urdtfx" }
max_comment_length = { default = 500, optional = false, nmin = 10, nmax = 10000, type = "int", explanation = "max number of characters a comment can have. default is 500", example = 500, oob_error = "the max comment length should be between 10 and 10000" }
post_lang = { default = "", optional = true, explanation = "The language you would like to translate to.", example = "es-cr" }

@ -6,7 +6,7 @@ def _listdir(d): # listdir with full path
return [os.path.join(d, f) for f in os.listdir(d)]
def cleanup() -> int:
def cleanup(id) -> int:
"""Deletes all temporary assets in assets/temp
Returns:
@ -18,7 +18,7 @@ def cleanup() -> int:
count += len(files)
for f in files:
os.remove(f)
REMOVE_DIRS = ["./assets/temp/mp3/", "./assets/temp/png/"]
REMOVE_DIRS = [f"./assets/temp/{id}/mp3/", f"./assets/temp/{id}/png/"]
files_to_remove = list(map(_listdir, REMOVE_DIRS))
for directory in files_to_remove:
for file in directory:

@ -118,4 +118,3 @@ def handle_input(
console.print(
"[red bold]" + err_message + "\nValid options are: " + ", ".join(map(str, options)) + "."
)

@ -0,0 +1,10 @@
import re
from utils import print_substep
def id(reddit_obj: dict):
"""
This function takes a reddit object and returns the post id
"""
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
print_substep(f"Thread ID is {id}", style="bold blue")
return id

@ -0,0 +1,16 @@
import requests
from utils.console import print_step
def checkversion(__VERSION__):
response = requests.get(
"https://api.github.com/repos/elebumm/RedditVideoMakerBot/releases/latest"
)
latestversion = response.json()["tag_name"]
if __VERSION__ == latestversion:
print_step(f"You are using the newest version ({__VERSION__}) of the bot")
return True
else:
print_step(
f"You are using an older version ({__VERSION__}) of the bot. Download the newest version ({latestversion}) from https://github.com/elebumm/RedditVideoMakerBot/releases/latest"
)

@ -1,4 +1,6 @@
from __future__ import annotations
from ast import Str
import re
from typing import Tuple
@ -14,8 +16,9 @@ class Video:
self.duration = self.video.duration
@staticmethod
def _create_watermark(text, fontsize, opacity=0.5):
path = "./assets/temp/png/watermark.png"
def _create_watermark(text, redditid, fontsize, opacity=0.5):
id = re.sub(r"[^\w\s-]", "", redditid["thread_id"])
path = f"./assets/temp/{id}/png/watermark.png"
width = int(fontsize * len(text))
height = int(fontsize * len(text) / 2)
white = (255, 255, 255)
@ -35,7 +38,7 @@ class Video:
return ImageClip(path)
def add_watermark(
self, text, opacity=0.5, duration: int | float = 5, position: Tuple = (0.7, 0.9), fontsize=15
self, text, redditid, opacity=0.5, duration: int | float = 5, position: Tuple = (0.7, 0.9), fontsize=15
):
compensation = round(
(position[0] / ((len(text) * (fontsize / 5) / 1.5) / 100 + position[0] * position[0])),
@ -44,7 +47,7 @@ class Video:
position = (compensation, position[1])
# print(f'{compensation=}')
# print(f'{position=}')
img_clip = self._create_watermark(text, opacity=opacity, fontsize=fontsize)
img_clip = self._create_watermark(text, redditid, fontsize=fontsize, opacity=opacity)
img_clip = img_clip.set_opacity(opacity).set_duration(duration)
img_clip = img_clip.set_position(
position, relative=True

@ -1,6 +1,7 @@
from pathlib import Path
import random
from random import randrange
import re
from typing import Any, Tuple
@ -62,7 +63,7 @@ def download_background(background_config: Tuple[str, str, str, Any]):
print_substep("Background video downloaded successfully! 🎉", style="bold green")
def chop_background_video(background_config: Tuple[str, str, str, Any], video_length: int):
def chop_background_video(background_config: Tuple[str, str, str, Any], video_length: int, reddit_object: dict):
"""Generates the background footage to be used in the video and writes it to assets/temp/background.mp4
Args:
@ -72,7 +73,7 @@ def chop_background_video(background_config: Tuple[str, str, str, Any], video_le
print_step("Finding a spot in the backgrounds video to chop...✂️")
choice = f"{background_config[2]}-{background_config[1]}"
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
background = VideoFileClip(f"assets/backgrounds/{choice}")
start_time, end_time = get_start_and_end_times(video_length, background.duration)
@ -81,12 +82,12 @@ def chop_background_video(background_config: Tuple[str, str, str, Any], video_le
f"assets/backgrounds/{choice}",
start_time,
end_time,
targetname="assets/temp/background.mp4",
targetname=f"assets/temp/{id}/background.mp4",
)
except (OSError, IOError): # ffmpeg issue see #348
print_substep("FFMPEG issue. Trying again...")
with VideoFileClip(f"assets/backgrounds/{choice}") as video:
new = video.subclip(start_time, end_time)
new.write_videofile("assets/temp/background.mp4")
new.write_videofile(f"assets/temp/{id}/background.mp4")
print_substep("Background video chopped successfully!", style="bold green")
return background_config[2]

@ -62,21 +62,22 @@ def make_final_video(
# except (TypeError, KeyError):
# print('No background audio volume found in config.toml. Using default value of 1.')
# VOLUME_MULTIPLIER = 1
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = settings.config["settings"]["opacity"]
transition = settings.config["settings"]["transition"]
background_clip = (
VideoFileClip("assets/temp/background.mp4")
VideoFileClip(f"assets/temp/{id}/background.mp4")
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
# Gather all audio clips
audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)]
audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3"))
audio_clips = [AudioFileClip(f"assets/temp/{id}/mp3/{i}.mp3") for i in range(number_of_clips)]
audio_clips.insert(0, AudioFileClip(f"assets/temp/{id}/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
@ -88,7 +89,7 @@ def make_final_video(
new_transition = 0 if transition is None or float(transition) > 2 else float(transition)
image_clips.insert(
0,
ImageClip("assets/temp/png/title.png")
ImageClip(f"assets/temp/{id}/png/title.png")
.set_duration(audio_clips[0].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
@ -98,7 +99,7 @@ def make_final_video(
for i in range(0, number_of_clips):
image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png")
ImageClip(f"assets/temp/{id}/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
@ -140,10 +141,10 @@ def make_final_video(
# # VOLUME_MULTIPLIER) # lower volume by background_audio_volume, use with fx
# final.set_audio(final_audio)
final = Video(final).add_watermark(
text=f"Background credit: {background_config[2]}", opacity=0.4
)
text=f"Background credit: {background_config[2]}", opacity=0.4, redditid=reddit_obj
)
final.write_videofile(
"assets/temp/temp.mp4",
f"assets/temp/{id}/temp.mp4",
fps=30,
audio_codec="aac",
audio_bitrate="192k",
@ -151,14 +152,14 @@ def make_final_video(
threads=multiprocessing.cpu_count(),
)
ffmpeg_extract_subclip(
"assets/temp/temp.mp4",
f"assets/temp/{id}/temp.mp4",
0,
length,
targetname=f"results/{subreddit}/{filename}",
)
save_data(subreddit, filename, title, idx, background_config[2])
print_step("Removing temporary files 🗑")
cleanups = cleanup()
cleanups = cleanup(id)
print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!")

@ -1,6 +1,7 @@
import json
from pathlib import Path
import re
from typing import Dict
from utils import settings
from playwright.async_api import async_playwright # pylint: disable=unused-import
@ -24,9 +25,9 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
screenshot_num (int): Number of screenshots to download
"""
print_step("Downloading screenshots of reddit posts...")
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
# ! Make sure the reddit screenshots folder exists
Path("assets/temp/png").mkdir(parents=True, exist_ok=True)
Path(f"assets/temp/{id}/png").mkdir(parents=True, exist_ok=True)
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
@ -72,11 +73,12 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
else:
print_substep("Skipping translation...")
page.locator('[data-test-id="post-content"]').screenshot(path="assets/temp/png/title.png")
postcontentpath = f"assets/temp/{id}/png/title.png"
page.locator('[data-test-id="post-content"]').screenshot(path= postcontentpath)
if storymode:
page.locator('[data-click-id="text"]').screenshot(
path="assets/temp/png/story_content.png"
path=f"assets/temp/{id}/png/story_content.png"
)
else:
for idx, comment in enumerate(
@ -104,7 +106,7 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
)
try:
page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/png/comment_{idx}.png"
path=f"assets/temp/{id}/png/comment_{idx}.png"
)
except TimeoutError:
del reddit_object["comments"]

Loading…
Cancel
Save