Merge pull request #729 from elebumm/feat/reformat-refactor-files

Reformat and Refactor Files
pull/731/head
Callum Leslie 2 years ago committed by GitHub
commit ec5826e153
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -60,7 +60,7 @@ ignored-modules=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
init-hook='import sys; sys.path.append("/")'
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use.

@ -28,7 +28,7 @@ voices = [
class StreamlabsPolly:
def __init__(self):
self.url = "https://streamlabs.com/polly/speak"
self.max_chars = 349
self.max_chars = 550
self.voices = voices
def run(self, text, filepath, random_voice: bool = False):

@ -1,18 +1,18 @@
#!/usr/bin/env python
from subprocess import Popen
from dotenv import load_dotenv
from os import getenv, name
from dotenv import load_dotenv
from reddit.subreddit import get_subreddit_threads
from utils.cleanup import cleanup
from utils.console import print_markdown, print_step
from utils.checker import check_env
# from utils.checker import envUpdate
from video_creation.background import download_background, chop_background_video
from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3
from utils.checker import check_env
VERSION = 2.1
print(
@ -37,14 +37,12 @@ def main():
load_dotenv()
cleanup()
reddit_object = get_subreddit_threads()
length, number_of_comments = save_text_to_mp3(reddit_object)
download_screenshots_of_reddit_posts(reddit_object, number_of_comments)
download_background()
chop_background_video(length)
make_final_video(number_of_comments, length)
make_final_video(number_of_comments, length, reddit_object)
def run_many(times):

@ -1,32 +1,21 @@
import re
from os import getenv, environ
from os import getenv
import praw
from praw.models import MoreComments
from utils.console import print_step, print_substep
from utils.subreddit import get_subreddit_undone
from utils.videos import check_done
from praw.models import MoreComments
TEXT_WHITELIST = set("abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ 1234567890")
def textify(text):
return "".join(filter(TEXT_WHITELIST.__contains__, text))
def try_env(param, backup):
try:
return environ[param]
except KeyError:
return backup
def get_subreddit_threads():
"""
Returns a list of threads from the AskReddit subreddit.
"""
global submission
submission = None
print_substep("Logging into Reddit.")
content = {}
@ -48,9 +37,8 @@ def get_subreddit_threads():
passkey=passkey,
check_for_async=False,
)
"""
Ask user for subreddit input
"""
# Ask user for subreddit input
print_step("Getting subreddit threads...")
if not getenv(
"SUBREDDIT"
@ -89,29 +77,27 @@ def get_subreddit_threads():
print_substep(f"Thread has {upvotes} upvotes", style="bold blue")
print_substep(f"Thread has a upvote ratio of {ratio}%", style="bold blue")
print_substep(f"Thread has {num_comments} comments", style="bold blue")
environ["VIDEO_TITLE"] = str(
textify(submission.title)
) # todo use global instend of env vars
environ["VIDEO_ID"] = str(textify(submission.id))
content["thread_url"] = f"https://reddit.com{submission.permalink}"
content["thread_title"] = submission.title
content["thread_post"] = submission.selftext
content["thread_id"] = submission.id
content["comments"] = []
for top_level_comment in submission.comments:
if isinstance(top_level_comment, MoreComments):
continue
if top_level_comment.body in ["[removed]", "[deleted]"]:
continue # # see https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/78
if not top_level_comment.stickied:
if len(top_level_comment.body) <= int(try_env("MAX_COMMENT_LENGTH", 500)):
if not top_level_comment.author == None:
content["comments"].append(
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id,
}
)
if len(top_level_comment.body) <= int(getenv("MAX_COMMENT_LENGTH", "500")):
if not top_level_comment.author is None:
content["comments"].append(
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id,
}
)
print_substep("Received subreddit threads Successfully.", style="bold green")
return content

@ -5,11 +5,10 @@
# Imports
import os
import subprocess
import re
from utils.console import print_markdown
from utils.console import print_step
from rich.console import Console
from utils.loader import Loader
from utils.console import print_markdown
from utils.console import print_step
from utils.console import handle_input
console = Console()
@ -142,7 +141,7 @@ theme = handle_input(
loader = Loader("Attempting to save your credentials...", "Done!").start()
# you can also put a while loop here, e.g. while VideoIsBeingMade == True: ...
console.print("Writing to the .env file...")
with open(".env", "w") as f:
with open(".env", "w", encoding="utf-8") as f:
f.write(
f"""REDDIT_CLIENT_ID="{client_id}"
REDDIT_CLIENT_SECRET="{client_sec}"
@ -155,7 +154,7 @@ OPACITY={opacity}
"""
)
with open(".setup-done-before", "w") as f:
with open(".setup-done-before", "w", encoding="utf-8") as f:
f.write(
"This file blocks the setup assistant from running again. Delete this file to run setup again."
)

@ -15,16 +15,16 @@ def check_env() -> bool:
Returns:
bool: Whether or not everything was put in properly
"""
"""
if not os.path.exists(".env.template"):
console.print("[red]Couldn't find .env.template. Unable to check variables.")
return True
if not os.path.exists(".env"):
console.print("[red]Couldn't find the .env file, creating one now.")
with open(".env", "x") as file:
with open(".env", "x", encoding="utf-8") as file:
file.write("")
success = True
with open(".env.template", "r") as template:
with open(".env.template", "r", encoding="utf-8") as template:
# req_envs = [env.split("=")[0] for env in template.readlines() if "=" in env]
matching = {}
explanations = {}
@ -35,7 +35,11 @@ def check_env() -> bool:
req_envs = []
var_optional = False
for line in template.readlines():
if line.startswith("#") is not True and "=" in line and var_optional is not True:
if (
line.startswith("#") is not True
and "=" in line
and var_optional is not True
):
req_envs.append(line.split("=")[0])
if "#" in line:
examples[line.split("=")[0]] = "#".join(line.split("#")[1:]).strip()
@ -56,7 +60,9 @@ def check_env() -> bool:
)
var_optional = False
elif line.startswith("#MATCH_TYPE "):
types[req_envs[-1]] = eval(line.removeprefix("#MATCH_TYPE ")[:-1].split()[0])
types[req_envs[-1]] = eval(
line.removeprefix("#MATCH_TYPE ")[:-1].split()[0]
)
var_optional = False
elif line.startswith("#EXPLANATION "):
explanations[req_envs[-1]] = line.removeprefix("#EXPLANATION ")[:-1]
@ -82,9 +88,9 @@ def check_env() -> bool:
try:
temp = types[env](value)
if env in bounds.keys():
(bounds[env][0] <= temp or incorrect.add(env)) and len(bounds[env]) > 1 and (
bounds[env][1] >= temp or incorrect.add(env)
)
(bounds[env][0] <= temp or incorrect.add(env)) and len(
bounds[env]
) > 1 and (bounds[env][1] >= temp or incorrect.add(env))
except ValueError:
incorrect.add(env)
@ -107,11 +113,17 @@ def check_env() -> bool:
for env in missing:
table.add_row(
env,
explanations[env] if env in explanations.keys() else "No explanation given",
explanations[env]
if env in explanations.keys()
else "No explanation given",
examples[env] if env in examples.keys() else "",
str(bounds[env][0]) if env in bounds.keys() and bounds[env][1] is not None else "",
str(bounds[env][0])
if env in bounds.keys() and bounds[env][1] is not None
else "",
str(bounds[env][1])
if env in bounds.keys() and len(bounds[env]) > 1 and bounds[env][1] is not None
if env in bounds.keys()
and len(bounds[env]) > 1
and bounds[env][1] is not None
else "",
)
console.print(table)
@ -128,7 +140,9 @@ def check_env() -> bool:
title_style="#C0CAF5 bold",
)
table.add_column("Variable", justify="left", style="#7AA2F7 bold", no_wrap=True)
table.add_column("Current value", justify="left", style="#F7768E", no_wrap=False)
table.add_column(
"Current value", justify="left", style="#F7768E", no_wrap=False
)
table.add_column("Explanation", justify="left", style="#BB9AF7", no_wrap=False)
table.add_column("Example", justify="center", style="#F7768E", no_wrap=True)
table.add_column("Min", justify="right", style="#F7768E", no_wrap=True)
@ -137,10 +151,14 @@ def check_env() -> bool:
table.add_row(
env,
os.getenv(env),
explanations[env] if env in explanations.keys() else "No explanation given",
explanations[env]
if env in explanations.keys()
else "No explanation given",
str(types[env].__name__) if env in types.keys() else "str",
str(bounds[env][0]) if env in bounds.keys() else "None",
str(bounds[env][1]) if env in bounds.keys() and len(bounds[env]) > 1 else "None",
str(bounds[env][1])
if env in bounds.keys() and len(bounds[env]) > 1
else "None",
)
missing.add(env)
console.print(table)
@ -154,7 +172,7 @@ def check_env() -> bool:
console.print("[red]Aborting: Unresolved missing variables")
return False
if len(incorrect):
with open(".env", "r+") as env_file:
with open(".env", "r+", encoding="utf-8") as env_file:
lines = []
for line in env_file.readlines():
line.split("=")[0].strip() not in incorrect and lines.append(line)
@ -162,7 +180,7 @@ def check_env() -> bool:
env_file.write("\n".join(lines))
env_file.truncate()
console.print("[green]Successfully removed incorrectly set variables from .env")
with open(".env", "a") as env_file:
with open(".env", "a", encoding="utf-8") as env_file:
for env in missing:
env_file.write(
env
@ -177,11 +195,17 @@ def check_env() -> bool:
if env in explanations.keys()
else "Incorrect input. Try again.",
bounds[env][0] if env in bounds.keys() else None,
bounds[env][1] if env in bounds.keys() and len(bounds[env]) > 1 else None,
oob_errors[env] if env in oob_errors.keys() else "Input too long/short.",
bounds[env][1]
if env in bounds.keys() and len(bounds[env]) > 1
else None,
oob_errors[env]
if env in oob_errors.keys()
else "Input too long/short.",
extra_info="[#C0CAF5 bold]⮶ "
+ (
explanations[env] if env in explanations.keys() else "No info available"
explanations[env]
if env in explanations.keys()
else "No info available"
),
)
)

@ -7,10 +7,12 @@ def cleanup() -> int:
Returns:
int: How many files were deleted
"""
"""
if exists("./assets/temp"):
count = 0
files = [f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()]
files = [
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
count += len(files)
for f in files:
os.remove(f)

@ -64,10 +64,14 @@ def handle_input(
except ValueError:
console.print("[red]" + err_message) # Type conversion failed
continue
if nmin is not None and len(user_input) < nmin: # Check if string is long enough
if (
nmin is not None and len(user_input) < nmin
): # Check if string is long enough
console.print("[red]" + oob_error)
continue
if nmax is not None and len(user_input) > nmax: # Check if string is not too long
if (
nmax is not None and len(user_input) > nmax
): # Check if string is not too long
console.print("[red]" + oob_error)
continue
break

@ -1,4 +1,3 @@
from typing import List
import json
from os import getenv
from utils.console import print_substep
@ -9,15 +8,16 @@ def get_subreddit_undone(submissions: list, subreddit):
Args:
submissions (list): List of posts that are going to potentially be generated into a video
subreddit (praw.Reddit.SubredditHelper): Chosen subreddit
subreddit (praw.Reddit.SubredditHelper): Chosen subreddit
Returns:
Any: The submission that has not been done
"""
"""
recursively checks if the top submission in the list was already done.
"""
with open("./video_creation/data/videos.json", "r") as done_vids_raw:
# recursively checks if the top submission in the list was already done.
with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw)
for submission in submissions:
if already_done(done_videos, submission):
@ -36,8 +36,8 @@ def get_subreddit_undone(submissions: list, subreddit):
) # all of the videos in hot have already been done
def already_done(done_videos: list, submission)->bool:
"""Checks to see if the given submission is in the list of videos
def already_done(done_videos: list, submission) -> bool:
"""Checks to see if the given submission is in the list of videos
Args:
done_videos (list): Finished videos
@ -45,7 +45,7 @@ def already_done(done_videos: list, submission)->bool:
Returns:
Boolean: Whether the video was found in the list
"""
"""
for video in done_videos:
if video["id"] == str(submission):

@ -1,12 +1,14 @@
import json
from typing import Union
from os import getenv
from utils.console import print_step
def check_done(
redditobj:dict[str],
)->dict[str]|None: # don't set this to be run anyplace that isn't subreddit.py bc of inspect stack
redditobj: dict[str],
) -> Union[dict[str], None]:
# don't set this to be run anyplace that isn't subreddit.py bc of inspect stack
"""Checks if the chosen post has already been generated
Args:
@ -16,7 +18,9 @@ def check_done(
dict[str]|None: Reddit object in args
"""
with open("./video_creation/data/videos.json", "r") as done_vids_raw:
with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw)
for video in done_videos:
if video["id"] == str(redditobj):

@ -2,7 +2,7 @@ import re
def sanitize_text(text: str) -> str:
"""Sanitizes the text for tts.
r"""Sanitizes the text for tts.
What gets removed:
- following characters`^_~@!&;#:-%“”‘"%*/{}[]()\|<>?=+`
- any http or https links

@ -10,7 +10,7 @@ from pytube import YouTube
from utils.console import print_step, print_substep
def get_start_and_end_times(video_length:int, length_of_clip:int)->tuple[int,int]:
def get_start_and_end_times(video_length: int, length_of_clip: int) -> tuple[int, int]:
"""Generates a random interval of time to be used as the beckground of the video.
Args:
@ -19,7 +19,7 @@ def get_start_and_end_times(video_length:int, length_of_clip:int)->tuple[int,int
Returns:
tuple[int,int]: Start and end time of the randomized interval
"""
"""
random_time = randrange(180, int(length_of_clip) - int(video_length))
return random_time, random_time + video_length
@ -37,7 +37,7 @@ def download_background():
]
# note: make sure the file name doesn't include an - in it
if not len(listdir("./assets/backgrounds")) >= len(
background_options
background_options
): # if there are any background videos not installed
print_step(
"We need to download the backgrounds videos. they are fairly large but it's only done once. 😎"
@ -56,12 +56,12 @@ def download_background():
)
def chop_background_video(video_length:int):
def chop_background_video(video_length: int):
"""Generates the background footage to be used in the video and writes it to assets/temp/background.mp4
Args:
video_length (int): Length of the clip where the background footage is to be taken out of
"""
"""
print_step("Finding a spot in the backgrounds video to chop...✂️")
choice = random.choice(listdir("assets/backgrounds"))
environ["background_credit"] = choice.split("-")[0]

@ -1,8 +1,9 @@
#!/usr/bin/env python3
import json
import os
import time
import multiprocessing
import re
import os
from os.path import exists
from moviepy.editor import (
@ -17,7 +18,6 @@ from moviepy.editor import (
from moviepy.video.io import ffmpeg_tools
from rich.console import Console
from reddit import subreddit
from utils.cleanup import cleanup
from utils.console import print_step, print_substep
@ -26,13 +26,13 @@ console = Console()
W, H = 1080, 1920
def make_final_video(number_of_clips:int, length:int):
def make_final_video(number_of_clips: int, length: int, reddit_obj: dict[str]):
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args:
number_of_clips (int): Index to end at when going through the screenshots
length (int): Length of the video
"""
"""
print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H)
@ -116,15 +116,14 @@ def make_final_video(number_of_clips:int, length:int):
)
image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
filename = f"{title}.mp4"
filename = f"{get_video_title()}.mp4"
save_data(filename)
save_data(filename, title, idx)
if not exists("./results"):
print_substep("the results folder didn't exist so I made it")
print_substep("The results folder didn't exist so I made it")
os.mkdir("./results")
final.write_videofile(
@ -146,38 +145,27 @@ def make_final_video(number_of_clips:int, length:int):
print_substep("See result in the results folder!")
print_step(
f"Reddit title: {os.getenv('VIDEO_TITLE')} \n Background Credit: {os.getenv('background_credit')}"
f'Reddit title: { reddit_obj["thread_title"] } \n Background Credit: {os.getenv("background_credit")}'
)
def save_data(filename:str):
def save_data(filename: str, reddit_title: str, reddit_id: str):
"""Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json
Args:
filename (str): The finished video title name
"""
with open("./video_creation/data/videos.json", "r+") as raw_vids:
"""
with open("./video_creation/data/videos.json", "r+", encoding="utf-8") as raw_vids:
done_vids = json.load(raw_vids)
if str(subreddit.submission.id) in [video["id"] for video in done_vids]:
if reddit_id in [video["id"] for video in done_vids]:
return # video already done but was specified to continue anyway in the .env file
payload = {
"id": str(os.getenv("VIDEO_ID")),
"id": reddit_id,
"time": str(int(time.time())),
"background_credit": str(os.getenv("background_credit")),
"reddit_title": str(os.getenv("VIDEO_TITLE")),
"reddit_title": reddit_title,
"filename": filename,
}
done_vids.append(payload)
raw_vids.seek(0)
json.dump(done_vids, raw_vids, ensure_ascii=False, indent=4)
def get_video_title() -> str:
"""Gets video title from env variable or gives it the name "final_video"
Returns:
str: Video title
"""
title = os.getenv("VIDEO_TITLE") or "final_video"
if len(title) <= 35:
return title
else:
return title[0:30] + "..."

@ -1,31 +1,29 @@
import json
from os import getenv
import os
from os import getenv
from pathlib import Path
from playwright.async_api import async_playwright # do not remove this line
from playwright.sync_api import sync_playwright, ViewportSize
from rich.progress import track
from playwright.async_api import async_playwright # pylint: disable=unused-import
from utils.console import print_step, print_substep
import json
from rich.console import Console
# do not remove the above line
from playwright.sync_api import sync_playwright, ViewportSize
from rich.progress import track
import translators as ts
console = Console()
from utils.console import print_step, print_substep
storymode = False
def download_screenshots_of_reddit_posts(reddit_object:dict[str], screenshot_num:int):
def download_screenshots_of_reddit_posts(reddit_object: dict[str], screenshot_num: int):
"""Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png
Args:
reddit_object (dict[str]): Reddit object received from reddit/subreddit.py
screenshot_num (int): Number of screenshots to downlaod
"""
"""
print_step("Downloading screenshots of reddit posts...")
# ! Make sure the reddit screenshots folder exists
@ -38,9 +36,13 @@ def download_screenshots_of_reddit_posts(reddit_object:dict[str], screenshot_num
context = browser.new_context()
if getenv("THEME").upper() == "DARK":
cookie_file = open("./video_creation/data/cookie-dark-mode.json")
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
else:
cookie_file = open("./video_creation/data/cookie-light-mode.json")
cookie_file = open(
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
cookies = json.load(cookie_file)
context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot
@ -60,10 +62,13 @@ def download_screenshots_of_reddit_posts(reddit_object:dict[str], screenshot_num
if getenv("POSTLANG"):
print_substep("Translating post...")
texts_in_tl = ts.google(reddit_object["thread_title"], to_language=os.getenv("POSTLANG"))
texts_in_tl = ts.google(
reddit_object["thread_title"], to_language=os.getenv("POSTLANG")
)
page.evaluate(
'tl_content => document.querySelector(\'[data-test-id="post-content"] > div:nth-child(3) > div > div\').textContent = tl_content', texts_in_tl
"tl_content => document.querySelector('[data-test-id=\"post-content\"] > div:nth-child(3) > div > div').textContent = tl_content",
texts_in_tl,
)
else:
print_substep("Skipping translation...")
@ -92,9 +97,12 @@ def download_screenshots_of_reddit_posts(reddit_object:dict[str], screenshot_num
# translate code
if getenv("POSTLANG"):
comment_tl = ts.google(comment["comment_body"], to_language=os.getenv("POSTLANG"))
comment_tl = ts.google(
comment["comment_body"], to_language=os.getenv("POSTLANG")
)
page.evaluate(
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content', [comment_tl, comment['comment_id']]
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',
[comment_tl, comment["comment_id"]],
)
page.locator(f"#t1_{comment['comment_id']}").screenshot(

@ -25,7 +25,7 @@ TTSProviders = {
VIDEO_LENGTH: int = 40 # secs
def save_text_to_mp3(reddit_obj:dict[str])->tuple[int,int]:
def save_text_to_mp3(reddit_obj: dict[str]) -> tuple[int, int]:
"""Saves text to MP3 files. Goes through the reddit_obj and generates the title MP3 file and a certain number of comments until the total amount of time exceeds VIDEO_LENGTH seconds.
Args:
@ -34,7 +34,7 @@ def save_text_to_mp3(reddit_obj:dict[str])->tuple[int,int]:
Returns:
tuple[int,int]: (total length of the audio, the number of comments audio was generated for)
"""
env = os.getenv("TTSCHOICE", "")
if env.casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(

Loading…
Cancel
Save