Merge branch 'feat/RedditPostPosition' into feat/background-configs

pull/693/head
Jason 2 years ago committed by GitHub
commit 51831ddbd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,38 @@
[reddit.creds]
client_id = { optional = false, nmin = 12, nmax = 30, explanation = "the ID of your Reddit app of SCRIPT type", example = "fFAGRNJru1FTz70BzhT3Zg", regex = "^[-a-zA-Z0-9._~+/]+=*$", input_error = "The client ID can only contain printable characters.", oob_error = "The ID should be over 12 and under 30 characters, double check your input." }
client_secret = { optional = false, nmin = 20, nmax = 40, explanation = "the SECRET of your Reddit app of SCRIPT type", example = "fFAGRNJru1FTz70BzhT3Zg", regex = "^[-a-zA-Z0-9._~+/]+=*$", input_error = "The client ID can only contain printable characters.", oob_error = "The secret should be over 20 and under 40 characters, double check your input." }
username = { optional = false, nmin = 3, nmax = 20, explanation = "the username of your reddit account", example = "JasonLovesDoggo", regex = "^[-_0-9a-zA-Z]+$", oob_error = "A username HAS to be between 3 and 20 characters" }
password = { optional = false, nmin = 8, explanation = "the password of your reddit account", example = "fFAGRNJru1FTz70BzhT3Zg", oob_error = "Password too short" }
2fa = { optional = true, type = "bool", options = [true,
false,
], default = false, explanation = "Whether you have Reddit 2FA enabled, Valid options are True and False", example = true }
[reddit.thread]
random = { optional = true, options = [true,
false,
], default = false, type = "bool", explanation = "If set to no, it will ask you a thread link to extract the thread, if yes it will randomize it. Default: 'False'", example = "True" }
subreddit = { optional = false, regex = "[_0-9a-zA-Z]+$", nmin = 3, nmax = 21, explanation = "what subreddit to pull posts from, the name of the sub, not the URL", example = "AskReddit", oob_error = "A subreddit name HAS to be between 3 and 20 characters" }
post_id = { optional = true, default = "", regex = "^((?!://|://)[+a-zA-Z])*$", explanation = "Used if you want to use a specific post.", example = "urdtfx" }
max_comment_length = { default = 500, optional = false, nmin = 10, nmax = 10000, type = "int", explanation = "max number of characters a comment can have. default is 500", example = 500, oob_error = "the max comment length should be between 10 and 10000" }
post_lang = { default = "", optional = true, explanation = "The language you would like to translate to.", example = "es-cr" }
[settings]
allow_nsfw = { optional = false, type = "bool", default = false, example = false, options = [true,
false,
], explanation = "Whether to allow NSFW content, True or False" }
theme = { optional = false, default = "dark", example = "light", options = ["dark",
"light",
], explanation = "sets the Reddit theme, either LIGHT or DARK" }
times_to_run = { optional = false, default = 1, example = 2, explanation = "used if you want to run multiple times. set to an int e.g. 4 or 29 or 1", type = "int", nmin = 1, oob_error = "It's very hard to run something less than once." }
opacity = { optional = false, default = 0.9, example = 0.8, explanation = "Sets the opacity of the comments when overlayed over the background", type = "float", nmin = 0, nmax = 1, oob_error = "The opacity HAS to be between 0 and 1", input_error = "The opacity HAS to be a decimal number between 0 and 1" }
storymode = { optional = true, type = "bool", default = false, example = false, options = [true,
false,
] }
background_choice = { optional = true, default = "minecraft", example = "minecraft", options = ["minecraft", "gta", "rocket-league", "motor-gta"], explanation = "Sets the background for the video" }
[settings.tts]
choice = { optional = false, default = "", options = ["streamlabspolly", "tiktok", "googletranslate", "awspolly", ], example = "streamlabspolly", explanation = "The backend used for TTS generation. This can be left blank and you will be prompted to choose at runtime." }
aws_polly_voice = { optional = false, default = "Matthew", example = "Matthew", explanation = "The voice used for AWS Polly" }
streamlabs_polly_voice = { optional = false, default = "Matthew", example = "Matthew", explanation = "The voice used for Streamlabs Polly" }
tiktok_voice = { optional = false, default = "en_us_006", example = "en_us_006", explanation = "The voice used for TikTok TTS" }

@ -14,10 +14,10 @@ name: "CodeQL"
on: on:
push: push:
branches: [ "master" ] branches: [ "master", "develop" ]
pull_request: pull_request:
# The branches below must be a subset of the branches above # The branches below must be a subset of the branches above
branches: [ "master" ] branches: [ "master", "develop" ]
schedule: schedule:
- cron: '16 14 * * 3' - cron: '16 14 * * 3'

@ -0,0 +1,10 @@
name: Lint
on: [push, pull_request]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: psf/black@stable

@ -22,7 +22,7 @@ jobs:
exempt-all-issue-milestones: true exempt-all-issue-milestones: true
operations-per-run: 300 operations-per-run: 300
remove-stale-when-updated: true remove-stale-when-updated: true
- uses: actions/stale@main - uses: actions/stale@main
id: stale-pr id: stale-pr
name: stale-pr name: stale-pr
@ -37,4 +37,3 @@ jobs:
exempt-all-pr-milestones: true exempt-all-pr-milestones: true
operations-per-run: 300 operations-per-run: 300
remove-stale-when-updated: true remove-stale-when-updated: true

2
.gitignore vendored

@ -241,3 +241,5 @@ reddit-bot-351418-5560ebc49cac.json
*.pyc *.pyc
video_creation/data/videos.json video_creation/data/videos.json
video_creation/data/envvars.txt video_creation/data/envvars.txt
config.toml

@ -40,11 +40,13 @@ The only original thing being done is the editing and gathering of all materials
1. Clone this repository 1. Clone this repository
2. Run `pip install -r requirements.txt` 2. Run `pip install -r requirements.txt`
3. Run `playwright install` and `playwright install-deps`. (if this fails try adding python -m to the front of the command) 3. Run `python -m playwright install` and `python -m playwright install-deps`
4. Run `python main.py` 4. Run `python main.py`
required\*\*), visit [the Reddit Apps page.](https://www.reddit.com/prefs/apps) TL;DR set up an app that is a "script". 5. Visit [the Reddit Apps page.](https://www.reddit.com/prefs/apps), and set up an app that is a "script".
5. Enjoy 😎 6. The bot will ask you to fill in your details to connect to the Reddit API, and configure the bot to your liking
7. Enjoy 😎
8. If you need to reconfigure the bot, simply open the `config.toml` file and delete the lines that need to be changed. On the next run of the bot, it will help you reconfigure those options.
(Note if you got an error installing or running the bot try first rerunning the command with a three after the name e.g. python3 or pip3) (Note if you got an error installing or running the bot try first rerunning the command with a three after the name e.g. python3 or pip3)

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import random import random
import os from utils import settings
from gtts import gTTS from gtts import gTTS
max_chars = 0 max_chars = 0
@ -12,7 +12,11 @@ class GTTS:
self.voices = [] self.voices = []
def run(self, text, filepath): def run(self, text, filepath):
tts = gTTS(text=text, lang=os.getenv("POSTLANG") or "en", slow=False) tts = gTTS(
text=text,
lang=settings.config["reddit"]["thread"]["post_lang"] or "en",
slow=False,
)
tts.save(filepath) tts.save(filepath)
def randomvoice(self): def randomvoice(self):

@ -1,5 +1,5 @@
import base64 import base64
import os from utils import settings
import random import random
import requests import requests
from requests.adapters import HTTPAdapter, Retry from requests.adapters import HTTPAdapter, Retry
@ -62,9 +62,7 @@ noneng = [
class TikTok: # TikTok Text-to-Speech Wrapper class TikTok: # TikTok Text-to-Speech Wrapper
def __init__(self): def __init__(self):
self.URI_BASE = ( self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
"https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
)
self.max_chars = 300 self.max_chars = 300
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng} self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng}
@ -75,10 +73,15 @@ class TikTok: # TikTok Text-to-Speech Wrapper
voice = ( voice = (
self.randomvoice() self.randomvoice()
if random_voice if random_voice
else (os.getenv("TIKTOK_VOICE") or random.choice(self.voices["human"])) else (
settings.config["settings"]["tts"]["tiktok_voice"]
or random.choice(self.voices["human"])
)
) )
try: try:
r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") r = requests.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
except requests.exceptions.SSLError: except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611 # https://stackoverflow.com/a/47475019/18516611
session = requests.Session() session = requests.Session()
@ -86,7 +89,9 @@ class TikTok: # TikTok Text-to-Speech Wrapper
adapter = HTTPAdapter(max_retries=retry) adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter) session.mount("http://", adapter)
session.mount("https://", adapter) session.mount("https://", adapter)
r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") r = session.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
# print(r.text) # print(r.text)
vstr = [r.json()["data"]["v_str"]][0] vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr) b64d = base64.b64decode(vstr)

@ -2,7 +2,7 @@
from boto3 import Session from boto3 import Session
from botocore.exceptions import BotoCoreError, ClientError from botocore.exceptions import BotoCoreError, ClientError
import sys import sys
import os from utils import settings
import random import random
voices = [ voices = [
@ -35,11 +35,13 @@ class AWSPolly:
if random_voice: if random_voice:
voice = self.randomvoice() voice = self.randomvoice()
else: else:
if not os.getenv("AWS_VOICE"): if not settings.config["settings"]["tts"]["aws_polly_voice"]:
return ValueError( return ValueError(
f"Please set the environment variable AWS_VOICE to a valid voice. options are: {voices}" f"Please set the environment variable AWS_VOICE to a valid voice. options are: {voices}"
) )
voice = str(os.getenv("AWS_VOICE")).capitalize() voice = str(
settings.config["settings"]["tts"]["aws_polly_voice"]
).capitalize()
try: try:
# Request speech synthesis # Request speech synthesis
response = polly.synthesize_speech( response = polly.synthesize_speech(

@ -2,7 +2,6 @@
from pathlib import Path from pathlib import Path
from typing import Tuple from typing import Tuple
import re import re
from os import getenv
# import sox # import sox
# from mutagen import MutagenError # from mutagen import MutagenError
@ -12,6 +11,7 @@ from rich.progress import track
from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.voice import sanitize_text from utils.voice import sanitize_text
from utils import settings
DEFUALT_MAX_LENGTH: int = 50 # video length variable DEFUALT_MAX_LENGTH: int = 50 # video length variable
@ -56,11 +56,16 @@ class TTSEngine:
print_step("Saving Text to MP3 files...") print_step("Saving Text to MP3 files...")
self.call_tts("title", self.reddit_object["thread_title"]) self.call_tts("title", self.reddit_object["thread_title"])
if self.reddit_object["thread_post"] != "" and getenv("STORYMODE", "").casefold() == "true": if (
self.reddit_object["thread_post"] != ""
and settings.config["settings"]["storymode"] == True
):
self.call_tts("posttext", self.reddit_object["thread_post"]) self.call_tts("posttext", self.reddit_object["thread_post"])
idx = None idx = None
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."): for idx, comment in track(
enumerate(self.reddit_object["comments"]), "Saving..."
):
# ! Stop creating mp3 files if the length is greater than max length. # ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length: if self.length > self.max_length:
break break
@ -76,7 +81,9 @@ class TTSEngine:
split_files = [] split_files = []
split_text = [ split_text = [
x.group().strip() x.group().strip()
for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text) for x in re.finditer(
rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text
)
] ]
idy = None idy = None
@ -94,12 +101,14 @@ class TTSEngine:
Path(name).unlink() Path(name).unlink()
# for i in range(0, idy + 1): # for i in range(0, idy + 1):
# print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3") # print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3")
# Path(f"{self.path}/{idx}-{i}.part.mp3").unlink() # Path(f"{self.path}/{idx}-{i}.part.mp3").unlink()
def call_tts(self, filename: str, text: str): def call_tts(self, filename: str, text: str):
self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3") self.tts_module.run(
text=process_text(text), filepath=f"{self.path}/{filename}.mp3"
)
# try: # try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length # self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError): # except (MutagenError, HeaderNotFoundError):
@ -108,8 +117,9 @@ class TTSEngine:
self.length += clip.duration self.length += clip.duration
clip.close() clip.close()
def process_text(text: str): def process_text(text: str):
lang = getenv("POSTLANG", "") lang = settings.config["reddit"]["thread"]["post_lang"]
new_text = sanitize_text(text) new_text = sanitize_text(text)
if lang: if lang:
print_substep("Translating Text...") print_substep("Translating Text...")

@ -1,7 +1,7 @@
import random import random
import os
import requests import requests
from requests.exceptions import JSONDecodeError from requests.exceptions import JSONDecodeError
from utils import settings
voices = [ voices = [
"Brian", "Brian",
@ -35,11 +35,13 @@ class StreamlabsPolly:
if random_voice: if random_voice:
voice = self.randomvoice() voice = self.randomvoice()
else: else:
if not os.getenv("STREAMLABS_VOICE"): if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]:
return ValueError( return ValueError(
f"Please set the environment variable STREAMLABS_VOICE to a valid voice. options are: {voices}" f"Please set the environment variable STREAMLABS_VOICE to a valid voice. options are: {voices}"
) )
voice = str(os.getenv("STREAMLABS_VOICE")).capitalize() voice = str(
settings.config["settings"]["tts"]["streamlabs_polly_voice"]
).capitalize()
body = {"voice": voice, "text": text, "service": "polly"} body = {"voice": voice, "text": text, "service": "polly"}
response = requests.post(self.url, data=body) response = requests.post(self.url, data=body)
try: try:
@ -55,6 +57,3 @@ class StreamlabsPolly:
def randomvoice(self): def randomvoice(self):
return random.choice(self.voices) return random.choice(self.voices)
# StreamlabsPolly().run(text=str('hi hi ' * 92)[1:], filepath='hello.mp3', random_voice=True)

@ -1,12 +1,11 @@
#!/usr/bin/env python #!/usr/bin/env python
import math import math
from subprocess import Popen from subprocess import Popen
from os import getenv, name from os import name
from dotenv import load_dotenv
from reddit.subreddit import get_subreddit_threads from reddit.subreddit import get_subreddit_threads
from utils.cleanup import cleanup from utils.cleanup import cleanup
from utils.console import print_markdown, print_step from utils.console import print_markdown, print_step
from utils.checker import check_env from utils import settings
# from utils.checker import envUpdate # from utils.checker import envUpdate
from video_creation.background import download_background, chop_background_video, get_background_config from video_creation.background import download_background, chop_background_video, get_background_config
@ -14,7 +13,7 @@ from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3 from video_creation.voices import save_text_to_mp3
VERSION = "2.2.2" VERSION = "2.2.8"
print( print(
""" """
@ -31,11 +30,13 @@ print_markdown(
) )
print_step(f"You are using V{VERSION} of the bot") print_step(f"You are using V{VERSION} of the bot")
def main(POST_ID=None): def main(POST_ID=None):
cleanup() cleanup()
reddit_object = get_subreddit_threads(POST_ID) reddit_object = get_subreddit_threads(POST_ID)
length, number_of_comments = save_text_to_mp3(reddit_object) length, number_of_comments = save_text_to_mp3(reddit_object)
length = math.ceil(length) length = math.ceil(length)
bg_config = get_background_config()
download_screenshots_of_reddit_posts(reddit_object, number_of_comments) download_screenshots_of_reddit_posts(reddit_object, number_of_comments)
bg_config = get_background_config() bg_config = get_background_config()
download_background(bg_config) download_background(bg_config)
@ -46,25 +47,26 @@ def main(POST_ID=None):
def run_many(times): def run_many(times):
for x in range(1, times + 1): for x in range(1, times + 1):
print_step( print_step(
f'on the {x}{("st" if x == 1 else ("nd" if x == 2 else ("rd" if x == 3 else "th")))} iteration of {times}' f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th","th", "th")[x%10]} iteration of {times}'
) # correct 1st 2nd 3rd 4th 5th.... ) # correct 1st 2nd 3rd 4th 5th....
main() main()
Popen("cls" if name == "nt" else "clear", shell=True).wait() Popen("cls" if name == "nt" else "clear", shell=True).wait()
if __name__ == "__main__": if __name__ == "__main__":
if check_env() is not True: config = settings.check_toml(".config.template.toml", "config.toml")
exit() config is False and exit()
load_dotenv()
try: try:
if getenv("TIMES_TO_RUN") and isinstance(int(getenv("TIMES_TO_RUN")), int): if config["settings"]["times_to_run"]:
run_many(int(getenv("TIMES_TO_RUN"))) run_many(config["settings"]["times_to_run"])
elif len(getenv("POST_ID", "").split("+")) > 1: elif len(config["reddit"]["thread"]["post_id"].split("+")) > 1:
for index, post_id in enumerate(getenv("POST_ID", "").split("+")): for index, post_id in enumerate(
config["reddit"]["thread"]["post_id"].split("+")
):
index += 1 index += 1
print_step( print_step(
f'on the {index}{("st" if index == 1 else ("nd" if index == 2 else ("rd" if index == 3 else "th")))} post of {len(getenv("POST_ID", "").split("+"))}' f'on the {index}{("st" if index%10 == 1 else ("nd" if index%10 == 2 else ("rd" if index%10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'
) )
main(post_id) main(post_id)
Popen("cls" if name == "nt" else "clear", shell=True).wait() Popen("cls" if name == "nt" else "clear", shell=True).wait()

@ -1,6 +1,6 @@
import re import re
from os import getenv
from utils import settings
import praw import praw
from praw.models import MoreComments from praw.models import MoreComments
@ -17,20 +17,22 @@ def get_subreddit_threads(POST_ID: str):
print_substep("Logging into Reddit.") print_substep("Logging into Reddit.")
content = {} content = {}
if str(getenv("REDDIT_2FA")).casefold() == "yes": if settings.config["reddit"]["creds"]["2fa"] == True:
print("\nEnter your two-factor authentication code from your authenticator app.\n") print(
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
code = input("> ") code = input("> ")
print() print()
pw = getenv("REDDIT_PASSWORD") pw = settings.config["reddit"]["creds"]["password"]
passkey = f"{pw}:{code}" passkey = f"{pw}:{code}"
else: else:
passkey = getenv("REDDIT_PASSWORD") passkey = settings.config["reddit"]["creds"]["password"]
username = getenv("REDDIT_USERNAME") username = settings.config["reddit"]["creds"]["username"]
if username.casefold().startswith("u/"): if username.casefold().startswith("u/"):
username = username[2:] username = username[2:]
reddit = praw.Reddit( reddit = praw.Reddit(
client_id=getenv("REDDIT_CLIENT_ID"), client_id=settings.config["reddit"]["creds"]["client_id"],
client_secret=getenv("REDDIT_CLIENT_SECRET"), client_secret=settings.config["reddit"]["creds"]["client_secret"],
user_agent="Accessing Reddit threads", user_agent="Accessing Reddit threads",
username=username, username=username,
passkey=passkey, passkey=passkey,
@ -39,21 +41,26 @@ def get_subreddit_threads(POST_ID: str):
# Ask user for subreddit input # Ask user for subreddit input
print_step("Getting subreddit threads...") print_step("Getting subreddit threads...")
if not getenv( if not settings.config["reddit"]["thread"][
"SUBREDDIT" "subreddit"
): # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython") ]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
try: try:
subreddit = reddit.subreddit( subreddit = reddit.subreddit(
re.sub(r"r\/", "", input("What subreddit would you like to pull from? ")) re.sub(
r"r\/", "", input("What subreddit would you like to pull from? ")
)
# removes the r/ from the input # removes the r/ from the input
) )
except ValueError: except ValueError:
subreddit = reddit.subreddit("askreddit") subreddit = reddit.subreddit("askreddit")
print_substep("Subreddit not defined. Using AskReddit.") print_substep("Subreddit not defined. Using AskReddit.")
else: else:
print_substep(f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config") sub = settings.config["reddit"]["thread"]["subreddit"]
subreddit_choice = getenv("SUBREDDIT") print_substep(f"Using subreddit: r/{sub} from TOML config")
if subreddit_choice.casefold().startswith("r/"): # removes the r/ from the input subreddit_choice = sub
if subreddit_choice.casefold().startswith(
"r/"
): # removes the r/ from the input
subreddit_choice = subreddit_choice[2:] subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit( subreddit = reddit.subreddit(
subreddit_choice subreddit_choice
@ -61,8 +68,13 @@ def get_subreddit_threads(POST_ID: str):
if POST_ID: # would only be called if there are multiple queued posts if POST_ID: # would only be called if there are multiple queued posts
submission = reddit.submission(id=POST_ID) submission = reddit.submission(id=POST_ID)
elif getenv("POST_ID") and len(getenv("POST_ID").split("+")) == 1: elif (
submission = reddit.submission(id=getenv("POST_ID")) settings.config["reddit"]["thread"]["post_id"]
and len(settings.config["reddit"]["thread"]["post_id"].split("+")) == 1
):
submission = reddit.submission(
id=settings.config["reddit"]["thread"]["post_id"]
)
else: else:
threads = subreddit.hot(limit=25) threads = subreddit.hot(limit=25)
@ -91,7 +103,9 @@ def get_subreddit_threads(POST_ID: str):
if top_level_comment.body in ["[removed]", "[deleted]"]: if top_level_comment.body in ["[removed]", "[deleted]"]:
continue # # see https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/78 continue # # see https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/78
if not top_level_comment.stickied: if not top_level_comment.stickied:
if len(top_level_comment.body) <= int(getenv("MAX_COMMENT_LENGTH", "500")): if len(top_level_comment.body) <= int(
settings.config["reddit"]["thread"]["max_comment_length"]
):
if ( if (
top_level_comment.author is not None top_level_comment.author is not None
): # if errors occur with this change to if not. ): # if errors occur with this change to if not.

@ -2,11 +2,10 @@ boto3==1.24.12
botocore==1.27.22 botocore==1.27.22
gTTS==2.2.4 gTTS==2.2.4
moviepy==1.0.3 moviepy==1.0.3
mutagen==1.45.1
playwright==1.23.0 playwright==1.23.0
praw==7.6.0 praw==7.6.0
python-dotenv==0.20.0
pytube==12.1.0 pytube==12.1.0
requests==2.28.1 requests==2.28.1
rich==12.4.4 rich==12.4.4
toml==0.10.2
translators==5.3.1 translators==5.3.1

@ -1,193 +0,0 @@
#!/usr/bin/env python
import os
from rich.console import Console
from rich.table import Table
from rich import box
import re
import dotenv
from utils.console import handle_input
console = Console()
def check_env() -> bool:
"""Checks to see what's been put in .env
Returns:
bool: Whether or not everything was put in properly
"""
if not os.path.exists(".env.template"):
console.print("[red]Couldn't find .env.template. Unable to check variables.")
return True
if not os.path.exists(".env"):
console.print("[red]Couldn't find the .env file, creating one now.")
with open(".env", "x", encoding="utf-8") as file:
file.write("")
success = True
with open(".env.template", "r", encoding="utf-8") as template:
# req_envs = [env.split("=")[0] for env in template.readlines() if "=" in env]
matching = {}
explanations = {}
bounds = {}
types = {}
oob_errors = {}
examples = {}
req_envs = []
var_optional = False
for line in template.readlines():
if line.startswith("#") is not True and "=" in line and var_optional is not True:
req_envs.append(line.split("=")[0])
if "#" in line:
examples[line.split("=")[0]] = "#".join(line.split("#")[1:]).strip()
elif "#OPTIONAL" in line:
var_optional = True
elif line.startswith("#MATCH_REGEX "):
matching[req_envs[-1]] = line.removeprefix("#MATCH_REGEX ")[:-1]
var_optional = False
elif line.startswith("#OOB_ERROR "):
oob_errors[req_envs[-1]] = line.removeprefix("#OOB_ERROR ")[:-1]
var_optional = False
elif line.startswith("#RANGE "):
bounds[req_envs[-1]] = tuple(
map(
lambda x: float(x) if x != "None" else None,
line.removeprefix("#RANGE ")[:-1].split(":"),
)
)
var_optional = False
elif line.startswith("#MATCH_TYPE "):
types[req_envs[-1]] = eval(line.removeprefix("#MATCH_TYPE ")[:-1].split()[0])
var_optional = False
elif line.startswith("#EXPLANATION "):
explanations[req_envs[-1]] = line.removeprefix("#EXPLANATION ")[:-1]
var_optional = False
else:
var_optional = False
missing = set()
incorrect = set()
dotenv.load_dotenv()
for env in req_envs:
value = os.getenv(env)
if value is None:
missing.add(env)
continue
if env in matching.keys():
re.match(matching[env], value) is None and incorrect.add(env)
if env in bounds.keys() and env not in types.keys():
len(value) >= bounds[env][0] or (
len(bounds[env]) > 1 and bounds[env][1] >= len(value)
) or incorrect.add(env)
continue
if env in types.keys():
try:
temp = types[env](value)
if env in bounds.keys():
(bounds[env][0] <= temp or incorrect.add(env)) and len(bounds[env]) > 1 and (
bounds[env][1] >= temp or incorrect.add(env)
)
except ValueError:
incorrect.add(env)
if len(missing):
table = Table(
title="Missing variables",
highlight=True,
show_lines=True,
box=box.ROUNDED,
border_style="#414868",
header_style="#C0CAF5 bold",
title_justify="left",
title_style="#C0CAF5 bold",
)
table.add_column("Variable", justify="left", style="#7AA2F7 bold", no_wrap=True)
table.add_column("Explanation", justify="left", style="#BB9AF7", no_wrap=False)
table.add_column("Example", justify="center", style="#F7768E", no_wrap=True)
table.add_column("Min", justify="right", style="#F7768E", no_wrap=True)
table.add_column("Max", justify="left", style="#F7768E", no_wrap=True)
for env in missing:
table.add_row(
env,
explanations[env] if env in explanations.keys() else "No explanation given",
examples[env] if env in examples.keys() else "",
str(bounds[env][0]) if env in bounds.keys() and bounds[env][1] is not None else "",
str(bounds[env][1])
if env in bounds.keys() and len(bounds[env]) > 1 and bounds[env][1] is not None
else "",
)
console.print(table)
success = False
if len(incorrect):
table = Table(
title="Incorrect variables",
highlight=True,
show_lines=True,
box=box.ROUNDED,
border_style="#414868",
header_style="#C0CAF5 bold",
title_justify="left",
title_style="#C0CAF5 bold",
)
table.add_column("Variable", justify="left", style="#7AA2F7 bold", no_wrap=True)
table.add_column("Current value", justify="left", style="#F7768E", no_wrap=False)
table.add_column("Explanation", justify="left", style="#BB9AF7", no_wrap=False)
table.add_column("Example", justify="center", style="#F7768E", no_wrap=True)
table.add_column("Min", justify="right", style="#F7768E", no_wrap=True)
table.add_column("Max", justify="left", style="#F7768E", no_wrap=True)
for env in incorrect:
table.add_row(
env,
os.getenv(env),
explanations[env] if env in explanations.keys() else "No explanation given",
str(types[env].__name__) if env in types.keys() else "str",
str(bounds[env][0]) if env in bounds.keys() else "None",
str(bounds[env][1]) if env in bounds.keys() and len(bounds[env]) > 1 else "None",
)
missing.add(env)
console.print(table)
success = False
if success is True:
return True
console.print(
"[green]Do you want to automatically overwrite incorrect variables and add the missing variables? (y/n)"
)
if not input().casefold().startswith("y"):
console.print("[red]Aborting: Unresolved missing variables")
return False
if len(incorrect):
with open(".env", "r+", encoding="utf-8") as env_file:
lines = []
for line in env_file.readlines():
line.split("=")[0].strip() not in incorrect and lines.append(line)
env_file.seek(0)
env_file.write("\n".join(lines))
env_file.truncate()
console.print("[green]Successfully removed incorrectly set variables from .env")
with open(".env", "a", encoding="utf-8") as env_file:
for env in missing:
env_file.write(
env
+ "="
+ ('"' if env not in types.keys() else "")
+ str(
handle_input(
"[#F7768E bold]" + env + "[#C0CAF5 bold]=",
types[env] if env in types.keys() else False,
matching[env] if env in matching.keys() else ".*",
explanations[env]
if env in explanations.keys()
else "Incorrect input. Try again.",
bounds[env][0] if env in bounds.keys() else None,
bounds[env][1] if env in bounds.keys() and len(bounds[env]) > 1 else None,
oob_errors[env] if env in oob_errors.keys() else "Input too long/short.",
extra_info="[#C0CAF5 bold]⮶ "
+ (explanations[env] if env in explanations.keys() else "No info available"),
)
)
+ ('"' if env not in types.keys() else "")
+ "\n"
)
return True
if __name__ == "__main__":
check_env()

@ -10,7 +10,9 @@ def cleanup() -> int:
""" """
if exists("./assets/temp"): if exists("./assets/temp"):
count = 0 count = 0
files = [f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()] files = [
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
count += len(files) count += len(files)
for f in files: for f in files:
os.remove(f) os.remove(f)

@ -1,46 +0,0 @@
# write a class that takes .env file and parses it into a dictionary
from dotenv import dotenv_values
DEFAULTS = {
"SUBREDDIT": "AskReddit",
"ALLOW_NSFW": "False",
"POST_ID": "",
"THEME": "DARK",
"REDDIT_2FA": "no",
"TIMES_TO_RUN": "",
"MAX_COMMENT_LENGTH": "500",
"OPACITY": "1",
"VOICE": "en_us_001",
"STORYMODE": "False",
}
class Config:
def __init__(self):
self.raw = dotenv_values("../.env")
self.load_attrs()
def __getattr__(self, attr): # code completion for attributes fix.
return getattr(self, attr)
def load_attrs(self):
for key, value in self.raw.items():
self.add_attr(key, value)
def add_attr(self, key, value):
if value is None or value == "":
setattr(self, key, DEFAULTS[key])
else:
setattr(self, key, str(value))
config = Config()
print(config.SUBREDDIT)
# def temp():
# root = ''
# if isinstance(root, praw.models.Submission):
# root_type = 'submission'
# elif isinstance(root, praw.models.Comment):
# root_type = 'comment'
#

@ -24,17 +24,17 @@ def print_step(text):
console.print(panel) console.print(panel)
def print_substep(text, style=""):
"""Prints a rich info message without the panelling."""
console.print(text, style=style)
def print_table(items): def print_table(items):
"""Prints items in a table.""" """Prints items in a table."""
console.print(Columns([Panel(f"[yellow]{item}", expand=True) for item in items])) console.print(Columns([Panel(f"[yellow]{item}", expand=True) for item in items]))
def print_substep(text, style=""):
"""Prints a rich info message without the panelling."""
console.print(text, style=style)
def handle_input( def handle_input(
message: str = "", message: str = "",
check_type=False, check_type=False,
@ -44,33 +44,88 @@ def handle_input(
nmax=None, nmax=None,
oob_error="", oob_error="",
extra_info="", extra_info="",
options: list = None,
default=NotImplemented,
optional=False,
): ):
match = re.compile(match + "$") if optional:
console.print(extra_info, no_wrap=True) console.print(
while True: message
console.print(message, end="") + "\n[green]This is an optional value. Do you want to skip it? (y/n)"
user_input = input("").strip() )
if re.match(match, user_input) is not None: if input().casefold().startswith("y"):
return default if default is not NotImplemented else ""
if default is not NotImplemented:
console.print(
"[green]"
+ message
+ '\n[blue bold]The default value is "'
+ str(default)
+ '"\nDo you want to use it?(y/n)'
)
if input().casefold().startswith("y"):
return default
if options is None:
match = re.compile(match)
console.print("[green bold]" + extra_info, no_wrap=True)
while True:
console.print(message, end="")
user_input = input("").strip()
if check_type is not False: if check_type is not False:
try: try:
user_input = check_type(user_input) # this line is fine user_input = check_type(user_input)
if nmin is not None and user_input < nmin: if (nmin is not None and user_input < nmin) or (
console.print("[red]" + oob_error) # Input too low failstate nmax is not None and user_input > nmax
continue ):
if nmax is not None and user_input > nmax: # FAILSTATE Input out of bounds
console.print("[red]" + oob_error) # Input too high console.print("[red]" + oob_error)
continue continue
break # Successful type conversion and number in bounds break # Successful type conversion and number in bounds
except ValueError: except ValueError:
console.print("[red]" + err_message) # Type conversion failed # Type conversion failed
console.print("[red]" + err_message)
continue continue
if nmin is not None and len(user_input) < nmin: # Check if string is long enough elif match != "" and re.match(match, user_input) is None:
console.print("[red]" + oob_error) console.print(
"[red]"
+ err_message
+ "\nAre you absolutely sure it's correct?(y/n)"
)
if input().casefold().startswith("y"):
break
continue continue
if nmax is not None and len(user_input) > nmax: # Check if string is not too long else:
console.print("[red]" + oob_error) # FAILSTATE Input STRING out of bounds
if (nmin is not None and len(user_input) < nmin) or (
nmax is not None and len(user_input) > nmax
):
console.print("[red bold]" + oob_error)
continue
break # SUCCESS Input STRING in bounds
return user_input
console.print(extra_info, no_wrap=True)
while True:
console.print(message, end="")
user_input = input("").strip()
if check_type is not False:
try:
isinstance(eval(user_input), check_type)
return check_type(user_input)
except:
console.print(
"[red bold]"
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
)
continue continue
break if user_input in options:
console.print("[red]" + err_message) return user_input
console.print(
return user_input "[red bold]"
+ err_message
+ "\nValid options are: "
+ ", ".join(map(str, options))
+ "."
)

@ -0,0 +1,189 @@
#!/usr/bin/env python
import toml
from rich.console import Console
import re
from typing import Tuple, Dict
from utils.console import handle_input
console = Console()
config = dict # autocomplete
def crawl(obj: dict, func=lambda x, y: print(x, y, end="\n"), path=None):
if path is None: # path Default argument value is mutable
path = []
for key in obj.keys():
if type(obj[key]) is dict:
crawl(obj[key], func, path + [key])
continue
func(path + [key], obj[key])
def check(value, checks, name):
def get_check_value(key, default_result):
return checks[key] if key in checks else default_result
incorrect = False
if value == {}:
incorrect = True
if not incorrect and "type" in checks:
try:
value = eval(checks["type"])(value)
except:
incorrect = True
if (
not incorrect and "options" in checks and value not in checks["options"]
): # FAILSTATE Value is not one of the options
incorrect = True
if (
not incorrect
and "regex" in checks
and (
(isinstance(value, str) and re.match(checks["regex"], value) is None)
or not isinstance(value, str)
)
): # FAILSTATE Value doesn't match regex, or has regex but is not a string.
incorrect = True
if (
not incorrect
and not hasattr(value, "__iter__")
and (
("nmin" in checks and checks["nmin"] is not None and value < checks["nmin"])
or (
"nmax" in checks
and checks["nmax"] is not None
and value > checks["nmax"]
)
)
):
incorrect = True
if (
not incorrect
and hasattr(value, "__iter__")
and (
(
"nmin" in checks
and checks["nmin"] is not None
and len(value) < checks["nmin"]
)
or (
"nmax" in checks
and checks["nmax"] is not None
and len(value) > checks["nmax"]
)
)
):
incorrect = True
if incorrect:
value = handle_input(
message=(
(
("[blue]Example: " + str(checks["example"]) + "\n")
if "example" in checks
else ""
)
+ "[red]"
+ ("Non-optional ", "Optional ")[
"optional" in checks and checks["optional"] is True
]
)
+ "[#C0CAF5 bold]"
+ str(name)
+ "[#F7768E bold]=",
extra_info=get_check_value("explanation", ""),
check_type=eval(get_check_value("type", "False")),
default=get_check_value("default", NotImplemented),
match=get_check_value("regex", ""),
err_message=get_check_value("input_error", "Incorrect input"),
nmin=get_check_value("nmin", None),
nmax=get_check_value("nmax", None),
oob_error=get_check_value(
"oob_error", "Input out of bounds(Value too high/low/long/short)"
),
options=get_check_value("options", None),
optional=get_check_value("optional", False),
)
return value
def crawl_and_check(obj: dict, path: list, checks: dict = {}, name=""):
if len(path) == 0:
return check(obj, checks, name)
if path[0] not in obj.keys():
obj[path[0]] = {}
obj[path[0]] = crawl_and_check(obj[path[0]], path[1:], checks, path[0])
return obj
def check_vars(path, checks):
global config
crawl_and_check(config, path, checks)
def check_toml(template_file, config_file) -> Tuple[bool, Dict]:
global config
config = None
try:
template = toml.load(template_file)
except Exception as error:
console.print(
f"[red bold]Encountered error when trying to to load {template_file}: {error}"
)
return False
try:
config = toml.load(config_file)
except toml.TomlDecodeError:
console.print(
f"""[blue]Couldn't read {config_file}.
Overwrite it?(y/n)"""
)
if not input().startswith("y"):
print("Unable to read config, and not allowed to overwrite it. Giving up.")
return False
else:
try:
with open(config_file, "w") as f:
f.write("")
except:
console.print(
f"[red bold]Failed to overwrite {config_file}. Giving up.\nSuggestion: check {config_file} permissions for the user."
)
return False
except FileNotFoundError:
console.print(
f"""[blue]Couldn't find {config_file}
Creating it now."""
)
try:
with open(config_file, "x") as f:
f.write("")
config = {}
except:
console.print(
f"[red bold]Failed to write to {config_file}. Giving up.\nSuggestion: check the folder's permissions for the user."
)
return False
console.print(
"""\
[blue bold]###############################
# #
# Checking TOML configuration #
# #
###############################
If you see any prompts, that means that you have unset/incorrectly set variables, please input the correct values.\
"""
)
crawl(template, check_vars)
with open(config_file, "w") as f:
toml.dump(config, f)
return config
if __name__ == "__main__":
check_toml(".config.template.toml", "config.toml")

@ -1,5 +1,5 @@
import json import json
from os import getenv from utils import settings
from utils.console import print_substep from utils.console import print_substep
@ -15,14 +15,16 @@ def get_subreddit_undone(submissions: list, subreddit):
""" """
# recursively checks if the top submission in the list was already done. # recursively checks if the top submission in the list was already done.
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw: with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for submission in submissions: for submission in submissions:
if already_done(done_videos, submission): if already_done(done_videos, submission):
continue continue
if submission.over_18: if submission.over_18:
try: try:
if getenv("ALLOW_NSFW").casefold() == "false": if settings.config["settings"]["allow_nsfw"] == False:
print_substep("NSFW Post Detected. Skipping...") print_substep("NSFW Post Detected. Skipping...")
continue continue
except AttributeError: except AttributeError:

@ -1,11 +1,10 @@
import json import json
import os
import time import time
from os import getenv
from typing import Dict from typing import Dict
from praw.models import Submission from praw.models import Submission
from utils import settings
from utils.console import print_step from utils.console import print_step
@ -16,16 +15,18 @@ def check_done(
"""Checks if the chosen post has already been generated """Checks if the chosen post has already been generated
Args: Args:
redditobj (Dict[str]): Reddit object gotten from reddit/subreddit.py redditobj (Submission): Reddit object gotten from reddit/subreddit.py
Returns: Returns:
Dict[str]|None: Reddit object in args Submission|None: Reddit object in args
""" """
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw: with open(
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw) done_videos = json.load(done_vids_raw)
for video in done_videos: for video in done_videos:
if video["id"] == str(redditobj): if video["id"] == str(redditobj):
if getenv("POST_ID"): if settings.config["reddit"]["thread"]["post_id"]:
print_step( print_step(
"You already have done this video but since it was declared specifically in the .env file the program will continue" "You already have done this video but since it was declared specifically in the .env file the program will continue"
) )
@ -35,7 +36,7 @@ def check_done(
return redditobj return redditobj
def save_data(filename: str, reddit_title: str, reddit_id: str): def save_data(filename: str, reddit_title: str, reddit_id: str, credit: str):
"""Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json """Saves the videos that have already been generated to a JSON file in video_creation/data/videos.json
Args: Args:
@ -51,7 +52,7 @@ def save_data(filename: str, reddit_title: str, reddit_id: str):
payload = { payload = {
"id": reddit_id, "id": reddit_id,
"time": str(int(time.time())), "time": str(int(time.time())),
"background_credit": str(os.getenv("background_credit")), "background_credit": credit,
"reddit_title": reddit_title, "reddit_title": reddit_title,
"filename": filename, "filename": filename,
} }

@ -1,5 +1,5 @@
import random import random
from os import listdir, environ, getenv from os import listdir
from pathlib import Path from pathlib import Path
import random import random
from random import randrange from random import randrange
@ -12,13 +12,14 @@ from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
from pytube import YouTube from pytube import YouTube
from pytube.cli import on_progress from pytube.cli import on_progress
from utils import settings
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
# Supported Background. Can add/remove background video here.... # Supported Background. Can add/remove background video here....
# <key>-<value> : key -> used as keyword for .env file. value -> background configuration # <key>-<value> : key -> used as keyword for TOML file. value -> background configuration
# Format (value): # Format (value):
# 1. Youtube URI # 1. Youtube URI
# 2. filename # 2. filename
# 3. Citation (owner of the video) # 3. Citation (owner of the video)
# 4. Position of image clips in the background. See moviepy reference for more information. (https://zulko.github.io/moviepy/ref/VideoClip/VideoClip.html#moviepy.video.VideoClip.VideoClip.set_position) # 4. Position of image clips in the background. See moviepy reference for more information. (https://zulko.github.io/moviepy/ref/VideoClip/VideoClip.html#moviepy.video.VideoClip.VideoClip.set_position)
background_options = { background_options = {
@ -47,8 +48,21 @@ background_options = {
lambda t: ('center', 480 + t) lambda t: ('center', 480 + t)
) )
} }
def get_background_config():
"""Fetch the background/s configuration"""
try:
choice = str(settings.config['settings']['background_choice']).casefold()
except AttributeError:
print_substep("No background selected. Picking random background'")
choice = None
# Handle default / not supported background using default option.
# Default : pick random from supported background.
if not choice or choice not in background_options:
choice = random.choice(list(background_options.keys()))
return background_options[choice]
def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int, int]: def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int, int]:
"""Generates a random interval of time to be used as the background of the video. """Generates a random interval of time to be used as the background of the video.
@ -111,6 +125,7 @@ def chop_background_video(background_config: Tuple[str, str, str, Any], video_le
choice = f"{background_config[2]}-{background_config[1]}" choice = f"{background_config[2]}-{background_config[1]}"
environ["background_credit"] = choice.split("-")[0] environ["background_credit"] = choice.split("-")[0]
background = VideoFileClip(f"assets/backgrounds/{choice}") background = VideoFileClip(f"assets/backgrounds/{choice}")
start_time, end_time = get_start_and_end_times( start_time, end_time = get_start_and_end_times(
@ -128,3 +143,4 @@ def chop_background_video(background_config: Tuple[str, str, str, Any], video_le
new = video.subclip(start_time, end_time) new = video.subclip(start_time, end_time)
new.write_videofile("assets/temp/background.mp4") new.write_videofile("assets/temp/background.mp4")
print_substep("Background video chopped successfully!", style="bold green") print_substep("Background video chopped successfully!", style="bold green")
return credit

@ -3,7 +3,8 @@ import multiprocessing
import os import os
import re import re
from os.path import exists from os.path import exists
from typing import Tuple, Any from typing import Dict, Tuple, Any
import translators as ts
from moviepy.editor import ( from moviepy.editor import (
VideoFileClip, VideoFileClip,
@ -20,6 +21,7 @@ from rich.console import Console
from utils.cleanup import cleanup from utils.cleanup import cleanup
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.videos import save_data from utils.videos import save_data
from utils import settings
console = Console() console = Console()
@ -27,31 +29,35 @@ console = Console()
W, H = 1080, 1920 W, H = 1080, 1920
def name_normalize(name: str) -> str:
name = re.sub(r'[?\\"%*:|<>]', "", name)
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name)
name = re.sub(r"( [w,W]\s?\/)", r" with", name)
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name)
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name)
name = re.sub(r"\/", r"", name)
def name_normalize( lang = settings.config["reddit"]["thread"]["post_lang"]
name: str if lang:
) -> str: print_substep("Translating filename...")
name = re.sub(r'[?\\"%*:|<>]', '', name) translated_name = ts.google(name, to_language=lang)
name = re.sub(r'( [w,W]\s?\/\s?[o,O,0])', r' without', name) return translated_name
name = re.sub(r'( [w,W]\s?\/)', r' with', name)
name = re.sub(r'([0-9]+)\s?\/\s?([0-9]+)', r'\1 of \2', name)
name = re.sub(r'(\w+)\s?\/\s?(\w+)', r'\1 or \2', name)
name = re.sub(r'\/', r'', name)
return name
else:
return name
def make_final_video(number_of_clips: int, length: int, reddit_obj: dict, background_config: Tuple[str, str, str, Any]): def make_final_video(number_of_clips: int, length: int, reddit_obj: dict, background_config: Tuple[str, str, str, Any]):
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp """Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args: Args:
number_of_clips (int): Index to end at when going through the screenshots number_of_clips (int): Index to end at when going through the screenshots
length (int): Length of the video length (int): Length of the video
reddit_obj (dict): The reddit object that contains the posts to read. reddit_obj (dict): The reddit object that contains the posts to read.
background_config Tuple[str, str, str, Any]: The background config to use.
""" """
print_step("Creating the final video 🎥") print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W) VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H) VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = os.getenv("OPACITY") opacity = settings.config["settings"]["opacity"]
background_clip = ( background_clip = (
VideoFileClip("assets/temp/background.mp4") VideoFileClip("assets/temp/background.mp4")
.without_audio() .without_audio()
@ -60,7 +66,9 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict, backgr
) )
# Gather all audio clips # Gather all audio clips
audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)] audio_clips = [
AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)
]
audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3")) audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips) audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat]) audio_composite = CompositeAudioClip([audio_concat])
@ -75,7 +83,7 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict, backgr
ImageClip("assets/temp/png/title.png") ImageClip("assets/temp/png/title.png")
.set_duration(audio_clips[0].duration) .set_duration(audio_clips[0].duration)
.resize(width=W - 100) .resize(width=W - 100)
.set_opacity(new_opacity) .set_opacity(new_opacity),
) )
for i in range(0, number_of_clips): for i in range(0, number_of_clips):
@ -103,10 +111,11 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict, backgr
final = CompositeVideoClip([background_clip, image_concat]) final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
filename = f"{name_normalize(title)}.mp4" filename = f"{name_normalize(title)}.mp4"
subreddit = os.getenv("SUBREDDIT") subreddit = settings.config["reddit"]["thread"]["subreddit"]
save_data(filename, title, idx) save_data(filename, title, idx, background_config[2])
if not exists(f"./results/{subreddit}"): if not exists(f"./results/{subreddit}"):
print_substep("The results folder didn't exist so I made it") print_substep("The results folder didn't exist so I made it")
@ -121,7 +130,10 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict, backgr
threads=multiprocessing.cpu_count(), threads=multiprocessing.cpu_count(),
) )
ffmpeg_tools.ffmpeg_extract_subclip( ffmpeg_tools.ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 0, final.duration, targetname=f"results/{subreddit}/{filename}" "assets/temp/temp.mp4",
0,
final.duration,
targetname=f"results/{subreddit}/{filename}",
) )
# os.remove("assets/temp/temp.mp4") # os.remove("assets/temp/temp.mp4")
@ -131,5 +143,5 @@ def make_final_video(number_of_clips: int, length: int, reddit_obj: dict, backgr
print_substep("See result in the results folder!") print_substep("See result in the results folder!")
print_step( print_step(
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {os.getenv("background_credit")}' f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'
) )

@ -1,9 +1,8 @@
import json import json
import os
from os import getenv
from pathlib import Path from pathlib import Path
from typing import Dict from typing import Dict
from utils import settings
from playwright.async_api import async_playwright # pylint: disable=unused-import from playwright.async_api import async_playwright # pylint: disable=unused-import
# do not remove the above line # do not remove the above line
@ -21,8 +20,8 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
"""Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png """Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png
Args: Args:
reddit_object (Dict[str]): Reddit object received from reddit/subreddit.py reddit_object (Dict): Reddit object received from reddit/subreddit.py
screenshot_num (int): Number of screenshots to downlaod screenshot_num (int): Number of screenshots to download
""" """
print_step("Downloading screenshots of reddit posts...") print_step("Downloading screenshots of reddit posts...")
@ -35,10 +34,14 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
browser = p.chromium.launch() browser = p.chromium.launch()
context = browser.new_context() context = browser.new_context()
if getenv("THEME").upper() == "DARK": if settings.config["settings"]["theme"] == "dark":
cookie_file = open("./video_creation/data/cookie-dark-mode.json", encoding="utf-8") cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
else: else:
cookie_file = open("./video_creation/data/cookie-light-mode.json", encoding="utf-8") cookie_file = open(
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
cookies = json.load(cookie_file) cookies = json.load(cookie_file)
context.add_cookies(cookies) # load preference cookies context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot # Get the thread screenshot
@ -56,9 +59,12 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
# translate code # translate code
if getenv("POSTLANG"): if settings.config["reddit"]["thread"]["post_lang"]:
print_substep("Translating post...") print_substep("Translating post...")
texts_in_tl = ts.google(reddit_object["thread_title"], to_language=os.getenv("POSTLANG")) texts_in_tl = ts.google(
reddit_object["thread_title"],
to_language=settings.config["reddit"]["thread"]["post_lang"],
)
page.evaluate( page.evaluate(
"tl_content => document.querySelector('[data-test-id=\"post-content\"] > div:nth-child(3) > div > div').textContent = tl_content", "tl_content => document.querySelector('[data-test-id=\"post-content\"] > div:nth-child(3) > div > div').textContent = tl_content",
@ -67,7 +73,9 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
else: else:
print_substep("Skipping translation...") print_substep("Skipping translation...")
page.locator('[data-test-id="post-content"]').screenshot(path="assets/temp/png/title.png") page.locator('[data-test-id="post-content"]').screenshot(
path="assets/temp/png/title.png"
)
if storymode: if storymode:
page.locator('[data-click-id="text"]').screenshot( page.locator('[data-click-id="text"]').screenshot(
@ -88,9 +96,10 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
# translate code # translate code
if getenv("POSTLANG"): if settings.config["reddit"]["thread"]["post_lang"]:
comment_tl = ts.google( comment_tl = ts.google(
comment["comment_body"], to_language=os.getenv("POSTLANG") comment["comment_body"],
to_language=settings.config["reddit"]["thread"]["post_lang"],
) )
page.evaluate( page.evaluate(
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content', '([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',

@ -1,6 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
import os
from typing import Dict, Tuple from typing import Dict, Tuple
from rich.console import Console from rich.console import Console
@ -10,7 +9,7 @@ from TTS.GTTS import GTTS
from TTS.streamlabs_polly import StreamlabsPolly from TTS.streamlabs_polly import StreamlabsPolly
from TTS.aws_polly import AWSPolly from TTS.aws_polly import AWSPolly
from TTS.TikTok import TikTok from TTS.TikTok import TikTok
from utils import settings
from utils.console import print_table, print_step from utils.console import print_table, print_step
@ -28,15 +27,17 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
"""Saves text to MP3 files. """Saves text to MP3 files.
Args: Args:
reddit_obj (dict[str]): Reddit object received from reddit API in reddit/subreddit.py reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py
Returns: Returns:
tuple[int,int]: (total length of the audio, the number of comments audio was generated for) tuple[int,int]: (total length of the audio, the number of comments audio was generated for)
""" """
env = os.getenv("TTSCHOICE", "") voice = settings.config["settings"]["tts"]["choice"]
if env.casefold() in map(lambda _: _.casefold(), TTSProviders): if voice.casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, env), reddit_obj) text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, voice), reddit_obj
)
else: else:
while True: while True:
print_step("Please choose one of the following TTS providers: ") print_step("Please choose one of the following TTS providers: ")
@ -45,13 +46,19 @@ def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break break
print("Unknown Choice") print("Unknown Choice")
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj) text_to_mp3 = TTSEngine(
get_case_insensitive_key_value(TTSProviders, choice), reddit_obj
)
return text_to_mp3.run() return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key): def get_case_insensitive_key_value(input_dict, key):
return next( return next(
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()), (
value
for dict_key, value in input_dict.items()
if dict_key.lower() == key.lower()
),
None, None,
) )

Loading…
Cancel
Save