pull/1671/head
electro199 2 years ago
commit 7486e04b26

@ -9,3 +9,4 @@ updates:
directory: "/" # Location of package manifests
schedule:
interval: "daily"
target-branch: "develop"

@ -29,4 +29,4 @@ jobs:
git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/$GITHUB_REPOSITORY
git checkout $GITHUB_HEAD_REF
git commit -am "fixup: Format Python code with Black"
git push
git push origin HEAD:master

@ -0,0 +1,23 @@
# This workflow was added by CodeSee. Learn more at https://codesee.io/
# This is v2.0 of this workflow file
on:
push:
branches:
- master
pull_request_target:
types: [opened, synchronize, reopened]
name: CodeSee
permissions: read-all
jobs:
codesee:
runs-on: ubuntu-latest
continue-on-error: true
name: Analyze the repo with CodeSee
steps:
- uses: Codesee-io/codesee-action@v2
with:
codesee-token: ${{ secrets.CODESEE_ARCH_DIAG_API_TOKEN }}
codesee-url: https://app.codesee.io

1
.gitignore vendored

@ -232,6 +232,7 @@ fabric.properties
.idea/caches/build_file_checksums.ser
assets/
/.vscode
out
.DS_Store
.setup-done-before

@ -1,4 +1,4 @@
FROM mcr.microsoft.com/playwright
FROM python:3.10.9-slim
RUN apt update
RUN apt install python3-pip -y

@ -56,6 +56,13 @@
.tooltip-inner {
max-width: 500px !important;
}
#hard-reload {
cursor: pointer;
color: darkblue;
}
#hard-reload:hover {
color: blue;
}
</style>
</head>
@ -132,11 +139,17 @@
Theme by &copy; Bootstrap. <a
href="https://github.com/elebumm/RedditVideoMakerBot/blob/master/README.md#developers-and-maintainers"
target="_blank">Developers and Maintainers</a></p>
<p class="mb-0">If your data is not refreshing, try to hard reload(Ctrl + F5) and visit your local
<p class="mb-0">If your data is not refreshing, try to hard reload(Ctrl + F5) or click <a id="hard-reload">this</a> and visit your local
<strong>{{ file }}</strong> file.
</p>
</div>
</footer>
<script>
document.getElementById("hard-reload").addEventListener("click", function () {
window.location.reload(true);
});
</script>
</body>
</html>

@ -213,6 +213,38 @@
backgrounds</a></span>
</div>
</div>
<div class="row mb-2">
<label for="background_thumbnail" class="col-4">Background Thumbnail</label>
<div class="col-8">
<div class="form-check form-switch">
<input name="background_thumbnail" class="form-check-input" type="checkbox" value="True"
data-toggle="tooltip"
data-original-title='If checked a thumbnail will be added to the video (put a thumbnail.png file in the assets/backgrounds directory for it to be used.)'>
</div>
</div>
</div>
<div class="row mb-2">
<label for="background_thumbnail_font_family" class="col-4">Background Thumbnail Font Family (.ttf)</label>
<div class="col-8">
<input name="background_thumbnail_font_family" type="text" class="form-control"
placeholder="arial" value="{{ data.background_thumbnail_font_family }}">
</div>
</div>
<div class="row mb-2">
<label for="background_thumbnail_font_size" class="col-4">Background Thumbnail Font Size (px)</label>
<div class="col-8">
<input name="background_thumbnail_font_size" type="number" class="form-control"
placeholder="96" value="{{ data.background_thumbnail_font_size }}">
</div>
</div>
<!-- need to create a color picker -->
<div class="row mb-2">
<label for="background_thumbnail_font_color" class="col-4">Background Thumbnail Font Color (rgb)</label>
<div class="col-8">
<input name="background_thumbnail_font_color" type="text" class="form-control"
placeholder="255,255,255" value="{{ data.background_thumbnail_font_color }}">
</div>
</div>
<!-- TTS Settings -->
<p class="h4">TTS Settings</p>
@ -337,6 +369,19 @@
</div>
</div>
</div>
<div class="row mb-2">
<label for="tiktok_sessionid" class="col-4">TikTok SessionId</label>
<div class="col-8">
<div class="input-group">
<div class="input-group-text">
<i class="bi bi-mic-fill"></i>
</div>
<input value="{{ data.tiktok_sessionid }}" name="tiktok_sessionid" type="text" class="form-control"
data-toggle="tooltip"
data-original-title="TikTok sessionid needed for the TTS API request. Check documentation if you don't know how to obtain it.">
</div>
</div>
</div>
<div class="row mb-2">
<label for="python_voice" class="col-4">Python Voice</label>
<div class="col-8">

@ -32,7 +32,7 @@ The only original thing being done is the editing and gathering of all materials
## Requirements
- Python 3.9+
- Python 3.10
- Playwright (this should install automatically in installation)
## Installation 👩‍💻
@ -70,6 +70,7 @@ In its current state, this bot does exactly what it needs to do. However, improv
I have tried to simplify the code so anyone can read it and start contributing at any skill level. Don't be shy :) contribute!
- [ ] Creating better documentation and adding a command line interface.
- [x] Allowing the user to choose background music for their videos.
- [x] Allowing users to choose a reddit thread instead of being randomized.
- [x] Allowing users to choose a background that is picked instead of the Minecraft one.
- [x] Allowing users to choose between any subreddit.
@ -80,12 +81,16 @@ I have tried to simplify the code so anyone can read it and start contributing a
Please read our [contributing guidelines](CONTRIBUTING.md) for more detailed information.
### For any questions or support join the [Discord](https://discord.gg/Vkanmh6C8V) server
## Developers and maintainers.
Elebumm (Lewis#6305) - https://github.com/elebumm (Founder)
Jason (JasonLovesDoggo#1904) - https://github.com/JasonLovesDoggo (Maintainer)
Simon (OpenSourceSimon) - https://github.com/OpenSourceSimon
CallumIO (c.#6837) - https://github.com/CallumIO
Verq (Verq#2338) - https://github.com/CordlessCoder
@ -93,3 +98,9 @@ Verq (Verq#2338) - https://github.com/CordlessCoder
LukaHietala (Pix.#0001) - https://github.com/LukaHietala
Freebiell (Freebie#3263) - https://github.com/FreebieII
Aman Raza (electro199#8130) - https://github.com/electro199
## LICENSE
[Roboto Fonts](https://fonts.google.com/specimen/Roboto/about) are licensed under [Apache License V2](https://www.apache.org/licenses/LICENSE-2.0)

@ -1,4 +1,3 @@
#!/usr/bin/env python3
import random
from gtts import gTTS

@ -1,26 +1,28 @@
# documentation for tiktok api: https://github.com/oscie57/tiktok-voice/wiki
import base64
import random
import time
from typing import Optional, Final
import requests
from requests.adapters import HTTPAdapter, Retry
from utils import settings
# from profanity_filter import ProfanityFilter
# pf = ProfanityFilter()
# Code by @JasonLovesDoggo
# https://twitter.com/scanlime/status/1512598559769702406
__all__ = ["TikTok", "TikTokTTSException"]
nonhuman = [ # DISNEY VOICES
disney_voices: Final[tuple] = (
"en_us_ghostface", # Ghost Face
"en_us_chewbacca", # Chewbacca
"en_us_c3po", # C3PO
"en_us_stitch", # Stitch
"en_us_stormtrooper", # Stormtrooper
"en_us_rocket", # Rocket
# ENGLISH VOICES
]
human = [
"en_female_madam_leota", # Madame Leota
"en_male_ghosthost", # Ghost Host
"en_male_pirate", # pirate
)
eng_voices: Final[tuple] = (
"en_au_001", # English AU - Female
"en_au_002", # English AU - Male
"en_uk_001", # English UK - Male 1
@ -30,23 +32,28 @@ human = [
"en_us_006", # English US - Male 1
"en_us_007", # English US - Male 2
"en_us_009", # English US - Male 3
"en_us_010",
]
voices = nonhuman + human
"en_us_010", # English US - Male 4
"en_male_narration", # Narrator
"en_male_funny", # Funny
"en_female_emotional", # Peaceful
"en_male_cody", # Serious
)
noneng = [
non_eng_voices: Final[tuple] = (
# Western European voices
"fr_001", # French - Male 1
"fr_002", # French - Male 2
"de_001", # German - Female
"de_002", # German - Male
"es_002", # Spanish - Male
# AMERICA VOICES
"it_male_m18", # Italian - Male
# South american voices
"es_mx_002", # Spanish MX - Male
"br_001", # Portuguese BR - Female 1
"br_003", # Portuguese BR - Female 2
"br_004", # Portuguese BR - Female 3
"br_005", # Portuguese BR - Male
# ASIA VOICES
# asian voices
"id_001", # Indonesian - Female
"jp_001", # Japanese - Female 1
"jp_003", # Japanese - Female 2
@ -55,51 +62,106 @@ noneng = [
"kr_002", # Korean - Male 1
"kr_003", # Korean - Female
"kr_004", # Korean - Male 2
]
)
vocals: Final[tuple] = (
"en_female_f08_salut_damour", # Alto
"en_male_m03_lobby", # Tenor
"en_male_m03_sunshine_soon", # Sunshine Soon
"en_female_f08_warmy_breeze", # Warmy Breeze
"en_female_ht_f08_glorious", # Glorious
"en_male_sing_funny_it_goes_up", # It Goes Up
"en_male_m2_xhxs_m03_silly", # Chipmunk
"en_female_ht_f08_wonderful_world", # Dramatic
)
# good_voices = {'good': ['en_us_002', 'en_us_006'],
# 'ok': ['en_au_002', 'en_uk_001']} # less en_us_stormtrooper more less en_us_rocket en_us_ghostface
class TikTok:
"""TikTok Text-to-Speech Wrapper"""
class TikTok: # TikTok Text-to-Speech Wrapper
def __init__(self):
self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
self.max_chars = 300
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng}
def run(self, text, filepath, random_voice: bool = False):
# if censor:
# req_text = pf.censor(req_text)
# pass
voice = (
self.randomvoice()
if random_voice
else (
settings.config["settings"]["tts"]["tiktok_voice"]
or random.choice(self.voices["human"])
)
headers = {
"User-Agent": "com.zhiliaoapp.musically/2022600030 (Linux; U; Android 7.1.2; es_ES; SM-G988N; "
"Build/NRD90M;tt-ok/3.12.13.1)",
"Cookie": f"sessionid={settings.config['settings']['tts']['tiktok_sessionid']}",
}
self.URI_BASE = (
"https://api16-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
)
self.max_chars = 200
self._session = requests.Session()
# set the headers to the session, so we don't have to do it for every request
self._session.headers = headers
def run(self, text: str, filepath: str, random_voice: bool = False):
if random_voice:
voice = self.random_voice()
else:
# if tiktok_voice is not set in the config file, then use a random voice
voice = settings.config["settings"]["tts"].get("tiktok_voice", None)
# get the audio from the TikTok API
data = self.get_voices(voice=voice, text=text)
# check if there was an error in the request
status_code = data["status_code"]
if status_code != 0:
raise TikTokTTSException(status_code, data["message"])
# decode data from base64 to binary
try:
r = requests.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
)
except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
r = session.post(
f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0"
raw_voices = data["data"]["v_str"]
except:
print(
"The TikTok TTS returned an invalid response. Please try again later, and report this bug."
)
# print(r.text)
vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr)
raise TikTokTTSException(0, "Invalid response")
decoded_voices = base64.b64decode(raw_voices)
# write voices to specified filepath
with open(filepath, "wb") as out:
out.write(b64d)
out.write(decoded_voices)
def get_voices(self, text: str, voice: Optional[str] = None) -> dict:
"""If voice is not passed, the API will try to use the most fitting voice"""
# sanitize text
text = text.replace("+", "plus").replace("&", "and").replace("r/", "")
# prepare url request
params = {"req_text": text, "speaker_map_type": 0, "aid": 1233}
if voice is not None:
params["text_speaker"] = voice
# send request
try:
response = self._session.post(self.URI_BASE, params=params)
except ConnectionError:
time.sleep(random.randrange(1, 7))
response = self._session.post(self.URI_BASE, params=params)
return response.json()
@staticmethod
def random_voice() -> str:
return random.choice(eng_voices)
class TikTokTTSException(Exception):
def __init__(self, code: int, message: str):
self._code = code
self._message = message
def __str__(self) -> str:
if self._code == 1:
return f"Code: {self._code}, reason: probably the aid value isn't correct, message: {self._message}"
if self._code == 2:
return f"Code: {self._code}, reason: the text is too long, message: {self._message}"
if self._code == 4:
return f"Code: {self._code}, reason: the speaker doesn't exist, message: {self._message}"
def randomvoice(self):
return random.choice(self.voices["human"])
return f"Code: {self._message}, reason: unknown, message: {self._message}"

@ -1,4 +1,3 @@
#!/usr/bin/env python3
import random
import sys

@ -1,4 +1,3 @@
#!/usr/bin/env python3
import os
import re
from pathlib import Path
@ -52,11 +51,30 @@ class TTSEngine:
self.length = 0
self.last_clip_length = last_clip_length
def run(self) -> Tuple[int, int]:
def add_periods(
self,
): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences
for comment in self.reddit_object["comments"]:
# remove links
regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*"
comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"])
comment["comment_body"] = comment["comment_body"].replace("\n", ". ")
comment["comment_body"] = re.sub(r'\bAI\b', 'A.I', comment["comment_body"])
comment["comment_body"] = re.sub(r'\bAGI\b', 'A.G.I', comment["comment_body"])
if comment["comment_body"][-1] != ".":
comment["comment_body"] += "."
comment["comment_body"] = comment["comment_body"].replace(". . .", ".")
comment["comment_body"] = comment["comment_body"].replace(".. . ", ".")
comment["comment_body"] = comment["comment_body"].replace(". . ", ".")
comment["comment_body"] = re.sub(r'\."\.', '".', comment["comment_body"])
print(comment["comment_body"])
def run(self) -> Tuple[int, int]:
Path(self.path).mkdir(parents=True, exist_ok=True)
print_step("Saving Text to MP3 files...")
self.add_periods()
self.call_tts("title", process_text(self.reddit_object["thread_title"]))
# processed_text = ##self.reddit_object["thread_post"] != ""
idx = None
@ -70,12 +88,10 @@ class TTSEngine:
"postaudio", process_text(self.reddit_object["thread_post"])
)
elif settings.config["settings"]["storymodemethod"] == 1:
for idx, text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}", process_text(text))
else:
for idx, comment in track(
enumerate(self.reddit_object["comments"]), "Saving..."
):
@ -162,7 +178,7 @@ class TTSEngine:
)
def process_text(text: str , clean : bool = True):
def process_text(text: str, clean: bool = True):
lang = settings.config["reddit"]["thread"]["post_lang"]
new_text = sanitize_text(text) if clean else text
if lang:

@ -6,8 +6,9 @@ from os import name
from pathlib import Path
from subprocess import Popen
import ffmpeg
from prawcore import ResponseException
from utils.console import print_substep
from reddit.subreddit import get_subreddit_threads
from utils import settings
from utils.cleanup import cleanup
@ -15,15 +16,17 @@ from utils.console import print_markdown, print_step
from utils.id import id
from utils.version import checkversion
from video_creation.background import (
download_background,
chop_background_video,
download_background_video,
download_background_audio,
chop_background,
get_background_config,
)
from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import get_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3
from utils.ffmpeg_install import ffmpeg_install
__VERSION__ = "2.5.3"
__VERSION__ = "3.1"
print(
"""
@ -42,20 +45,28 @@ print_markdown(
checkversion(__VERSION__)
def main(POST_ID=None):
global redditid ,reddit_object
def main(POST_ID=None) -> None:
global redditid, reddit_object
reddit_object = get_subreddit_threads(POST_ID)
redditid = id(reddit_object)
length, number_of_comments = save_text_to_mp3(reddit_object)
length = math.ceil(length)
get_screenshots_of_reddit_posts(reddit_object, number_of_comments)
bg_config = get_background_config()
download_background(bg_config)
chop_background_video(bg_config, length, reddit_object)
bg_config = {
"video": get_background_config("video"),
"audio": get_background_config("audio"),
}
download_background_video(bg_config["video"])
download_background_audio(bg_config["audio"])
chop_background(bg_config, length, reddit_object)
try:
make_final_video(number_of_comments, length, reddit_object, bg_config)
except ffmpeg.Error as e:
print(e.stderr.decode("utf8"))
exit(1)
def run_many(times):
def run_many(times) -> None:
for x in range(1, times + 1):
print_step(
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}'
@ -65,27 +76,39 @@ def run_many(times):
def shutdown():
print_markdown("## Clearing temp files")
try:
redditid
except NameError:
print("Exiting...")
exit()
else:
print_markdown("## Clearing temp files")
cleanup(redditid)
print("Exiting...")
exit()
if __name__ == "__main__":
assert sys.version_info >= (3, 9), "Python 3.10 or higher is required"
if sys.version_info.major != 3 or sys.version_info.minor != 10:
print("Hey! Congratulations, you've made it so far (which is pretty rare with no Python 3.10). Unfortunately, this program only works on Python 3.10. Please install Python 3.10 and try again.")
exit()
ffmpeg_install() # install ffmpeg if not installed
directory = Path().absolute()
config = settings.check_toml(
f"{directory}/utils/.config.template.toml", "config.toml"
)
config is False and exit()
if (
not settings.config["settings"]["tts"]["tiktok_sessionid"]
or settings.config["settings"]["tts"]["tiktok_sessionid"] == ""
) and config["settings"]["tts"]["voice_choice"] == "tiktok":
print_substep(
"TikTok voice requires a sessionid! Check our documentation on how to obtain one.",
"bold red",
)
exit()
try:
if config["reddit"]["thread"]["post_id"] :
if config["reddit"]["thread"]["post_id"]:
for index, post_id in enumerate(
config["reddit"]["thread"]["post_id"].split("+")
):
@ -108,9 +131,11 @@ if __name__ == "__main__":
shutdown()
except Exception as err:
config["settings"]["tts"]["tiktok_sessionid"] = "REDACTED"
print_step(
"Sorry, something went wrong with this test version! Try again, and feel free to report this issue at GitHub or the Discord community." +
'stm'+ config["settings"]["storymode"] + 'stm m'+ str(config["settings"]["storymodemethod"]) + 'ptc' + len(reddit_object["thread_post"])
f"Sorry, something went wrong with this version! Try again, and feel free to report this issue at GitHub or the Discord community.\n"
f"Version: {__VERSION__} \n"
f"Error: {err} \n"
f'Config: {config["settings"]}'
)
raise err
# todo error

@ -1,15 +1,18 @@
import re
from prawcore.exceptions import ResponseException
from utils import settings
import praw
from praw.models import MoreComments
from prawcore.exceptions import ResponseException
from utils import settings
from utils.console import print_step, print_substep
from utils.subreddit import get_subreddit_undone
from utils.videos import check_done
from utils.voice import sanitize_text
from utils.posttextparser import posttextparser
from utils.ai_methods import sort_by_similarity
def get_subreddit_threads(POST_ID: str):
@ -50,6 +53,7 @@ def get_subreddit_threads(POST_ID: str):
# Ask user for subreddit input
print_step("Getting subreddit threads...")
similarity_score = 0
if not settings.config["reddit"]["thread"][
"subreddit"
]: # note to user. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
@ -76,6 +80,26 @@ def get_subreddit_threads(POST_ID: str):
if POST_ID: # would only be called if there are multiple queued posts
submission = reddit.submission(id=POST_ID)
elif (
settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
):
submission = reddit.submission(
id=settings.config["reddit"]["thread"]["post_id"]
)
elif settings.config["ai"][
"ai_similarity_enabled"
]: # ai sorting based on comparison
threads = subreddit.hot(limit=50)
keywords = settings.config["ai"]["ai_similarity_keywords"].split(",")
keywords = [keyword.strip() for keyword in keywords]
# Reformat the keywords for printing
keywords_print = ", ".join(keywords)
print(f"Sorting threads by similarity to the given keywords: {keywords_print}")
threads, similarity_scores = sort_by_similarity(threads, keywords)
submission, similarity_score = get_subreddit_undone(
threads, subreddit, similarity_scores=similarity_scores
)
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
@ -84,21 +108,21 @@ def get_subreddit_threads(POST_ID: str):
return get_subreddit_threads(POST_ID) # submission already done. rerun
if settings.config["settings"]["storymode"]:
if not submission.selftext and settings.config["reddit"]["thread"]["post_id"] != "":
if not submission.selftext:
print_substep("You are trying to use story mode on post with no post text")
exit()
elif not submission.selftext:
print_substep("You are trying to use story mode on post with no post text") # not allow postid post with no self text it story == true
return get_subreddit_threads(POST_ID)
else:
# Check for the length of the post text
if len(submission.selftext) > (settings.config["settings"]["storymode_max_length"] or 2000):
if len(submission.selftext) > (
settings.config["settings"]["storymode_max_length"] or 2000
):
print_substep(
f"Post is too long ({len(submission.selftext)}), retrying with a different post. ({settings.config['settings']['storymode_max_length']} character limit)"
f"Post is too long ({len(submission.selftext)}), try with a different post. ({settings.config['settings']['storymode_max_length']} character limit)"
)
return get_subreddit_threads(POST_ID)
exit()
elif not submission.num_comments:
return get_subreddit_threads(POST_ID)
print_substep("No comments found. Skipping.")
exit()
submission = check_done(submission) # double-checking
@ -108,14 +132,20 @@ def get_subreddit_threads(POST_ID: str):
threadurl = f"https://reddit.com{submission.permalink}"
print_substep(f"Video will be: {submission.title} :thumbsup:", style="bold green")
print_substep(f"Thread url is : {threadurl } :thumbsup:", style="bold green")
print_substep(f"Thread url is: {threadurl} :thumbsup:", style="bold green")
print_substep(f"Thread has {upvotes} upvotes", style="bold blue")
print_substep(f"Thread has a upvote ratio of {ratio}%", style="bold blue")
print_substep(f"Thread has {num_comments} comments", style="bold blue")
if similarity_score:
print_substep(
f"Thread has a similarity score up to {round(similarity_score * 100)}%",
style="bold blue",
)
content["thread_url"] = threadurl
content["thread_title"] = submission.title
content["thread_id"] = submission.id
content["is_nsfw"] = submission.over_18
content["comments"] = []
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 1:
@ -139,7 +169,6 @@ def get_subreddit_threads(POST_ID: str):
if len(top_level_comment.body) >= int(
settings.config["reddit"]["thread"]["min_comment_length"]
):
if (
top_level_comment.author is not None
and sanitize_text(top_level_comment.body) is not None

@ -5,13 +5,18 @@ moviepy==1.0.3
playwright==1.23.0
praw==7.6.1
prawcore~=2.3.0
pytube==12.1.0
requests==2.28.1
rich==12.5.1
rich==13.3.5
toml==0.10.2
translators==5.3.1
pyttsx3==2.90
Pillow~=9.1.1
tomlkit==0.11.4
Flask==2.2.2
Pillow~=9.4.0
tomlkit==0.11.8
Flask==2.3.2
clean-text==0.6.0
unidecode==1.3.2
spacy==3.4.1
torch==1.12.1
transformers==4.25.1
ffmpeg-python==0.2.0
yt-dlp==2023.3.4

@ -16,30 +16,39 @@ post_lang = { default = "", optional = true, explanation = "The language you wou
min_comments = { default = 20, optional = false, nmin = 10, type = "int", explanation = "The minimum number of comments a post should have to be included. default is 20", example = 29, oob_error = "the minimum number of comments should be between 15 and 999999" }
#post_url = { optional = true, default = "", regex = "^https:\\/\\/www\\.reddit\\.com\\/r\\/[a-zA-Z0-9]+\\/comments\\/[a-zA-Z0-9]+\\/[a-zA-Z0-9_]+\\/$", explanation = "Not working currently Use if you want to use a specific post.", example = "https://www.reddit.com/r/buildapc/comments/yzh07p/have_you_switched_to_windows_11/" }
[ai]
ai_similarity_enabled = {optional = true, option = [true, false], default = false, type = "bool", explanation = "Threads read from Reddit are sorted based on their similarity to the keywords given below"}
ai_similarity_keywords = {optional = true, type="str", example= 'Elon Musk, Twitter, Stocks', explanation = "Every keyword or even sentence, seperated with comma, is used to sort the reddit threads based on similarity"}
[settings]
allow_nsfw = { optional = false, type = "bool", default = false, example = false, options = [true, false, ], explanation = "Whether to allow NSFW content, True or False" }
theme = { optional = false, default = "dark", example = "light", options = ["dark", "light", ], explanation = "Sets the Reddit theme, either LIGHT or DARK" }
theme = { optional = false, default = "dark", example = "light", options = ["dark", "light", "transparent", ], explanation = "Sets the Reddit theme, either LIGHT or DARK. For story mode you can also use a transparent background." }
times_to_run = { optional = false, default = 1, example = 2, explanation = "Used if you want to run multiple times. Set to an int e.g. 4 or 29 or 1", type = "int", nmin = 1, oob_error = "It's very hard to run something less than once." }
opacity = { optional = false, default = 0.9, example = 0.8, explanation = "Sets the opacity of the comments when overlayed over the background", type = "float", nmin = 0, nmax = 1, oob_error = "The opacity HAS to be between 0 and 1", input_error = "The opacity HAS to be a decimal number between 0 and 1" }
transition = { optional = true, default = 0.2, example = 0.2, explanation = "Sets the transition time (in seconds) between the comments. Set to 0 if you want to disable it.", type = "float", nmin = 0, nmax = 2, oob_error = "The transition HAS to be between 0 and 2", input_error = "The opacity HAS to be a decimal number between 0 and 2" }
storymode = { optional = true, type = "bool", default = false, example = false, options = [true, false,], explanation = "Only read out title and post content, great for subreddits with stories" }
storymodemethod= { optional = true, default = 1, example = 1, explanation = "Style that's used for the storymode. Set to 0 for single picture display in whole video, set to 1 for fancy looking video ", type = "int", nmin = 0, oob_error = "It's very hard to run something less than once.", options = [0, 1] }
storymode_max_length = { optional = true, default = 1000, example = 1000, explanation = "Max length of the storymode video in characters. 200 characters are approximately 50 seconds.", type = "int", nmin = 1, oob_error = "It's very hard to make a video under a second." }
fps = { optional = true, default = 30, example = 30, explanation = "Sets the FPS of the video, 30 is default for best performance. 60 FPS is smoother.", type = "int", nmin = 1, nmax = 60, oob_error = "The FPS HAS to be between 1 and 60" }
resolution_w = { optional = false, default = 1080, example = 1440, explantation = "Sets the width in pixels of the final video" }
resolution_h = { optional = false, default = 1920, example = 2560, explantation = "Sets the height in pixels of the final video" }
[settings.background]
background_choice = { optional = true, default = "minecraft", example = "rocket-league", options = ["", "minecraft", "gta", "rocket-league", "motor-gta", "csgo-surf", "cluster-truck"], explanation = "Sets the background for the video based on game name" }
#background_audio = { optional = true, type = "bool", default = false, example = false, options = [true, false,], explanation = "Sets a audio to play in the background (put a background.mp3 file in the assets/backgrounds directory for it to be used.)" }
#background_audio_volume = { optional = true, type = "float", default = 0.3, example = 0.1, explanation="Sets the volume of the background audio. only used if the background_audio is also set to true" }
background_video = { optional = true, default = "minecraft", example = "rocket-league", options = ["minecraft", "gta", "rocket-league", "motor-gta", "csgo-surf", "cluster-truck", "minecraft-2","multiversus","fall-guys","steep", ""], explanation = "Sets the background for the video based on game name" }
background_audio = { optional = true, default = "lofi", example = "chill-summer", options = ["lofi","lofi-2","chill-summer",""], explanation = "Sets the background audio for the video" }
background_audio_volume = { optional = true, type = "float", nmin = 0, nmax = 1, default = 0.15, example = 0.05, explanation="Sets the volume of the background audio. If you don't want background audio, set it to 0.", oob_error = "The volume HAS to be between 0 and 1", input_error = "The volume HAS to be a float number between 0 and 1"}
enable_extra_audio = { optional = true, type = "bool", default = false, example = false, explanation="Used if you want to render another video without background audio in a separate folder", input_error = "The value HAS to be true or false"}
background_thumbnail = { optional = true, type = "bool", default = false, example = false, options = [true, false,], explanation = "Generate a thumbnail for the video (put a thumbnail.png file in the assets/backgrounds directory.)" }
background_thumbnail_font_family = { optional = true, default = "arial", example = "arial", explanation = "Font family for the thumbnail text" }
background_thumbnail_font_size = { optional = true, type = "int", default = 96, example = 96, explanation = "Font size in pixels for the thumbnail text" }
background_thumbnail_font_color = { optional = true, default = "255,255,255", example = "255,255,255", explanation = "Font color in RGB format for the thumbnail text" }
[settings.tts]
voice_choice = { optional = false, default = "googletranslate", options = ["streamlabspolly", "tiktok", "googletranslate", "awspolly", "pyttsx", ], example = "tiktok", explanation = "The voice platform used for TTS generation. This can be left blank and you will be prompted to choose at runtime." }
aws_polly_voice = { optional = true, default = "Matthew", example = "Matthew", explanation = "The voice used for AWS Polly" }
streamlabs_polly_voice = { optional = true, default = "Matthew", example = "Matthew", explanation = "The voice used for Streamlabs Polly" }
tiktok_voice = { optional = true, default = "en_us_006", example = "en_us_006", explanation = "The voice used for TikTok TTS" }
python_voice = { optional = true, default = "1", example = "1", explanation = "The index of the system tts voices (can be downloaded externally, run ptt.py to find value, start from zero)" }
py_voice_num = { optional = true, default = "2", example = "2", explanation = "The number of system voices (2 are pre-installed in Windows)" }
voice_choice = { optional = false, default = "streamlabspolly", options = ["streamlabspolly", "tiktok", "googletranslate", "awspolly", "pyttsx", ], example = "tiktok", explanation = "The voice platform used for TTS generation. This can be left blank and you will be prompted to choose at runtime." }
aws_polly_voice = { optional = false, default = "Matthew", example = "Matthew", explanation = "The voice used for AWS Polly" }
streamlabs_polly_voice = { optional = false, default = "Matthew", example = "Matthew", explanation = "The voice used for Streamlabs Polly" }
tiktok_voice = { optional = true, default = "en_us_001", example = "en_us_006", explanation = "The voice used for TikTok TTS" }
tiktok_sessionid = { optional = true, example = "c76bcc3a7625abcc27b508c7db457ff1", explanation = "TikTok sessionid needed if you're using the TikTok TTS. Check documentation if you don't know how to obtain it." }
python_voice = { optional = false, default = "1", example = "1", explanation = "The index of the system tts voices (can be downloaded externally, run ptt.py to find value, start from zero)" }
py_voice_num = { optional = false, default = "2", example = "2", explanation = "The number of system voices (2 are pre-installed in Windows)" }
silence_duration = { optional = true, example = "0.1", explanation = "Time in seconds between TTS comments", default = 0.3, type = "float" }
no_emojis = { optional = false, type = "bool", default = false, example = false, options = [true, false,], explanation = "Whether to remove emojis from the comments" }

@ -0,0 +1,74 @@
import numpy as np
from transformers import AutoTokenizer, AutoModel
import torch
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[
0
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)
# This function sort the given threads based on their total similarity with the given keywords
def sort_by_similarity(thread_objects, keywords):
# Initialize tokenizer + model.
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
# Transform the generator to a list of Submission Objects, so we can sort later based on context similarity to
# keywords
thread_objects = list(thread_objects)
threads_sentences = []
for i, thread in enumerate(thread_objects):
threads_sentences.append(" ".join([thread.title, thread.selftext]))
# Threads inference
encoded_threads = tokenizer(
threads_sentences, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad():
threads_embeddings = model(**encoded_threads)
threads_embeddings = mean_pooling(
threads_embeddings, encoded_threads["attention_mask"]
)
# Keywords inference
encoded_keywords = tokenizer(
keywords, padding=True, truncation=True, return_tensors="pt"
)
with torch.no_grad():
keywords_embeddings = model(**encoded_keywords)
keywords_embeddings = mean_pooling(
keywords_embeddings, encoded_keywords["attention_mask"]
)
# Compare every keyword w/ every thread embedding
threads_embeddings_tensor = torch.tensor(threads_embeddings)
total_scores = torch.zeros(threads_embeddings_tensor.shape[0])
cosine_similarity = torch.nn.CosineSimilarity()
for keyword_embedding in keywords_embeddings:
keyword_embedding = torch.tensor(keyword_embedding).repeat(
threads_embeddings_tensor.shape[0], 1
)
similarity = cosine_similarity(keyword_embedding, threads_embeddings_tensor)
total_scores += similarity
similarity_scores, indices = torch.sort(total_scores, descending=True)
threads_sentences = np.array(threads_sentences)[indices.numpy()]
thread_objects = np.array(thread_objects)[indices.numpy()].tolist()
# print('Similarity Thread Ranking')
# for i, thread in enumerate(thread_objects):
# print(f'{i}) {threads_sentences[i]} score {similarity_scores[i]}')
return thread_objects, similarity_scores

@ -0,0 +1,18 @@
{
"__comment": "Supported Backgrounds Audio. Can add/remove background audio here...",
"lofi": [
"https://www.youtube.com/watch?v=LTphVIore3A",
"lofi.mp3",
"Super Lofi World"
],
"lofi-2":[
"https://www.youtube.com/watch?v=BEXL80LS0-I",
"lofi-2.mp3",
"stompsPlaylist"
],
"chill-summer":[
"https://www.youtube.com/watch?v=EZE8JagnBI8",
"chill-summer.mp3",
"Mellow Vibes Radio"
]
}

@ -4,13 +4,13 @@
"https://www.youtube.com/watch?v=vw5L4xCPy9Q",
"bike-parkour-gta.mp4",
"Achy Gaming",
480
"center"
],
"rocket-league": [
"https://www.youtube.com/watch?v=2X9QGY__0II",
"rocket_league.mp4",
"Orbital Gameplay",
200
"center"
],
"minecraft": [
"https://www.youtube.com/watch?v=n_Dv4JMiwK8",
@ -22,7 +22,7 @@
"https://www.youtube.com/watch?v=qGa9kWREOnE",
"gta-stunt-race.mp4",
"Achy Gaming",
480
"center"
],
"csgo-surf": [
"https://www.youtube.com/watch?v=E-8JlyO59Io",
@ -34,6 +34,30 @@
"https://www.youtube.com/watch?v=uVKxtdMgJVU",
"cluster_truck.mp4",
"No Copyright Gameplay",
480
"center"
],
"minecraft-2": [
"https://www.youtube.com/watch?v=Pt5_GSKIWQM",
"minecraft-2.mp4",
"Itslpsn",
"center"
],
"multiversus": [
"https://www.youtube.com/watch?v=66oK1Mktz6g",
"multiversus.mp4",
"MKIceAndFire",
"center"
],
"fall-guys": [
"https://www.youtube.com/watch?v=oGSsgACIc6Q",
"fall-guys.mp4",
"Throneful",
"center"
],
"steep": [
"https://www.youtube.com/watch?v=EnGiQrWBrko",
"steep.mp4",
"joel",
"center"
]
}

@ -12,20 +12,20 @@ def cleanup(id) -> int:
Returns:
int: How many files were deleted
"""
if exists("./assets/temp"):
if exists(f"../assets/temp/{id}/"):
count = 0
files = [
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
count += len(files)
for f in files:
os.remove(f"../assets/temp/{id}/{f}")
REMOVE_DIRS = [f"../assets/temp/{id}/mp3/", f"../assets/temp/{id}/png/"]
for d in REMOVE_DIRS:
if exists(d):
count += len(_listdir(d))
for f in _listdir(d):
os.remove(f)
REMOVE_DIRS = [f"./assets/temp/{id}/mp3/", f"./assets/temp/{id}/png/"]
files_to_remove = list(map(_listdir, REMOVE_DIRS))
for directory in files_to_remove:
for file in directory:
count += 1
os.remove(file)
os.rmdir(d)
os.rmdir(f"../assets/temp/{id}/")
return count
return 0

@ -1,4 +1,3 @@
#!/usr/bin/env python3
import re
from rich.columns import Columns
@ -11,28 +10,28 @@ from rich.text import Text
console = Console()
def print_markdown(text):
def print_markdown(text) -> None:
"""Prints a rich info message. Support Markdown syntax."""
md = Padding(Markdown(text), 2)
console.print(md)
def print_step(text):
def print_step(text) -> None:
"""Prints a rich info message."""
panel = Panel(Text(text, justify="left"))
console.print(panel)
def print_table(items):
def print_table(items) -> None:
"""Prints items in a table."""
console.print(Columns([Panel(f"[yellow]{item}", expand=True) for item in items]))
def print_substep(text, style=""):
"""Prints a rich info message without the panelling."""
def print_substep(text, style="") -> None:
"""Prints a rich colored info message without the panelling."""
console.print(text, style=style)

@ -0,0 +1,84 @@
import requests
import os
import subprocess
def ffmpeg_install_windows():
try:
zip = "https://github.com/BtbN/FFmpeg-Builds/releases/download/latest/ffmpeg-master-latest-win64-gpl.zip"
r = requests.get(zip)
with open("ffmpeg.zip", "wb") as f:
f.write(r.content)
import zipfile
with zipfile.ZipFile("ffmpeg.zip", "r") as zip_ref:
zip_ref.extractall()
os.remove("ffmpeg.zip")
os.rename("ffmpeg-master-latest-win64-gpl", "ffmpeg")
# Move the files inside bin to the root
for file in os.listdir("ffmpeg/bin"):
os.rename(f"ffmpeg/bin/{file}", f"ffmpeg/{file}")
os.rmdir("ffmpeg/bin")
for file in os.listdir("ffmpeg/doc"):
os.remove(f"ffmpeg/doc/{file}")
os.rmdir("ffmpeg/doc")
# Add to the path
subprocess.run("setx /M PATH \"%PATH%;%CD%\\ffmpeg\"", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("FFmpeg installed successfully! Please restart your computer and then re-run the program.")
exit()
except Exception as e:
print(
"An error occurred while trying to install FFmpeg. Please try again. Otherwise, please install FFmpeg manually and try again.")
print(e)
exit()
def ffmpeg_install_linux():
try:
subprocess.run("sudo apt install ffmpeg", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
print(
"An error occurred while trying to install FFmpeg. Please try again. Otherwise, please install FFmpeg manually and try again.")
print(e)
exit()
print("FFmpeg installed successfully! Please re-run the program.")
exit()
def ffmpeg_install_mac():
try:
subprocess.run("brew install ffmpeg", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
print(
"Homebrew is not installed. Please install it and try again. Otherwise, please install FFmpeg manually and try again.")
exit()
print("FFmpeg installed successfully! Please re-run the program.")
exit()
def ffmpeg_install():
try:
# Try to run the FFmpeg command
subprocess.run(['ffmpeg', '-version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print('FFmpeg is installed on this system! If you are seeing this error for the second time, restart your computer.')
except FileNotFoundError as e:
print('FFmpeg is not installed on this system.')
resp = input("We can try to automatically install it for you. Would you like to do that? (y/n): ")
if resp.lower() == "y":
print("Installing FFmpeg...")
if os.name == "nt":
ffmpeg_install_windows()
elif os.name == "posix":
ffmpeg_install_linux()
elif os.name == "mac":
ffmpeg_install_mac()
else:
print("Your OS is not supported. Please install FFmpeg manually and try again.")
exit()
else:
print("Please install FFmpeg manually and try again.")
exit()
except Exception as e:
print("Welcome fellow traveler! You're one of the few who have made it this far. We have no idea how you got at this error, but we're glad you're here. Please report this error to the developer, and we'll try to fix it as soon as possible. Thank you for your patience!")
print(e)
return None

@ -162,7 +162,7 @@ def delete_background(key):
# Add background video
def add_background(youtube_uri, filename, citation, position):
# Validate YouTube URI
regex = re.compile(r"(?:\/|%3D|v=|vi=)([0-9A-z-_]{11})(?:[%#?&]|$)").search(
regex = re.compile(r"(?:\/|%3D|v=|vi=)([0-9A-z\-_]{11})(?:[%#?&]|$)").search(
youtube_uri
)

@ -1,11 +1,15 @@
import re
import textwrap
import os
from PIL import Image, ImageDraw, ImageFont
from rich.progress import track
from TTS.engine_wrapper import process_text
def draw_multiple_line_text(image, text, font, text_color, padding, wrap=50):
def draw_multiple_line_text(
image, text, font, text_color, padding, wrap=50, transparent=False
) -> None:
"""
Draw multiline text over given image
"""
@ -18,58 +22,70 @@ def draw_multiple_line_text(image, text, font, text_color, padding, wrap=50):
)
for line in lines:
line_width, line_height = font.getsize(line)
if transparent:
shadowcolor = "black"
for i in range(1, 5):
draw.text(
((image_width - line_width) / 2 - i, y - i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 + i, y - i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 - i, y + i),
line,
font=font,
fill=shadowcolor,
)
draw.text(
((image_width - line_width) / 2 + i, y + i),
line,
font=font,
fill=shadowcolor,
)
draw.text(((image_width - line_width) / 2, y), line, font=font, fill=text_color)
y += line_height + padding
# theme=bgcolor,reddit_obj=reddit_object,txtclr=txtcolor
def imagemaker(theme, reddit_obj: dict, txtclr, padding=5):
def imagemaker(theme, reddit_obj: dict, txtclr, padding=5, transparent=False) -> None:
"""
Render Images for video
"""
title = process_text(reddit_obj["thread_title"], False) #TODO if second argument cause any error
title = process_text(
reddit_obj["thread_title"], False
)
texts = reddit_obj["thread_post"]
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
tfont = ImageFont.truetype("fonts\\Roboto-Bold.ttf", 27) # for title
font = ImageFont.truetype(
"fonts\\Roboto-Regular.ttf", 20
) # for despcription|comments
size = (500, 176)
if transparent:
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 50)
tfont = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 50)
else:
tfont = ImageFont.truetype(
os.path.join("fonts", "Roboto-Bold.ttf"), 100
) # for title
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Regular.ttf"), 90)
size = (1920, 1080)
image = Image.new("RGBA", size, theme)
draw = ImageDraw.Draw(image)
# for title
if len(title) > 40:
draw_multiple_line_text(image, title, tfont, txtclr, padding, wrap=30)
else:
Fontperm = tfont.getsize(title)
draw.text(
((image.size[0] - Fontperm[0]) / 2, (image.size[1] - Fontperm[1]) / 2),
font=tfont,
text=title,
) # (image.size[1]/2)-(Fontperm[1]/2)
draw_multiple_line_text(
image, title, tfont, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/title.png")
# for comment|description
for idx, text in track(enumerate(texts), "Rendering Image"):#, total=len(texts)):
for idx, text in track(enumerate(texts), "Rendering Image"):
image = Image.new("RGBA", size, theme)
draw = ImageDraw.Draw(image)
text = process_text(text,False)
if len(text) > 50:
draw_multiple_line_text(image, text, font, txtclr, padding)
else:
Fontperm = font.getsize(text)
draw.text(
((image.size[0] - Fontperm[0]) / 2, (image.size[1] - Fontperm[1]) / 2),
font=font,
text=text,
) # (image.size[1]/2)-(Fontperm[1]/2)
text = process_text(text, False)
draw_multiple_line_text(
image, text, font, txtclr, padding, wrap=30, transparent=transparent
)
image.save(f"assets/temp/{id}/png/img{idx}.png")

@ -11,9 +11,11 @@ def posttextparser(obj):
text = re.sub("\n", "", obj)
try:
nlp = spacy.load('en_core_web_sm')
nlp = spacy.load("en_core_web_sm")
except OSError:
print_step("The spacy model can't load. You need to install it with \npython -m spacy download en")
print_step(
"The spacy model can't load. You need to install it with the command \npython -m spacy download en_core_web_sm"
)
exit()
doc = nlp(text)

@ -1,4 +1,3 @@
#!/usr/bin/env python
import re
from typing import Tuple, Dict
from pathlib import Path

@ -3,9 +3,12 @@ from os.path import exists
from utils import settings
from utils.console import print_substep
from utils.ai_methods import sort_by_similarity
def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
def get_subreddit_undone(
submissions: list, subreddit, times_checked=0, similarity_scores=None
):
"""_summary_
Args:
@ -15,6 +18,15 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
Returns:
Any: The submission that has not been done
"""
# Second try of getting a valid Submission
if times_checked and settings.config["ai"]["ai_similarity_enabled"]:
print(
"Sorting based on similarity for a different date filter and thread limit.."
)
submissions = sort_by_similarity(
submissions, keywords=settings.config["ai"]["ai_similarity_enabled"]
)
# recursively checks if the top submission in the list was already done.
if not exists("./video_creation/data/videos.json"):
with open("./video_creation/data/videos.json", "w+") as f:
@ -23,7 +35,7 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
"./video_creation/data/videos.json", "r", encoding="utf-8"
) as done_vids_raw:
done_videos = json.load(done_vids_raw)
for submission in submissions:
for i, submission in enumerate(submissions):
if already_done(done_videos, submission):
continue
if submission.over_18:
@ -36,15 +48,19 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0):
if submission.stickied:
print_substep("This post was pinned by moderators. Skipping...")
continue
if submission.num_comments <= int(
settings.config["reddit"]["thread"]["min_comments"]
) and not settings.config["settings"]["storymode"]:
if (
submission.num_comments
<= int(settings.config["reddit"]["thread"]["min_comments"])
and not settings.config["settings"]["storymode"]
):
print_substep(
f'This post has under the specified minimum of comments ({settings.config["reddit"]["thread"]["min_comments"]}). Skipping...'
)
continue
if settings.config["settings"]["storymode"] and not submission.is_self:
continue
if similarity_scores is not None:
return submission, similarity_scores[i].item()
return submission
print("all submissions have been done going by top submission order")
VALID_TIME_FILTERS = [

@ -0,0 +1,41 @@
from PIL import ImageDraw, ImageFont
def create_thumbnail(
thumbnail, font_family, font_size, font_color, width, height, title
):
font = ImageFont.truetype(font_family + ".ttf", font_size)
Xaxis = width - (width * 0.2) # 20% of the width
sizeLetterXaxis = font_size * 0.5 # 50% of the font size
XaxisLetterQty = round(
Xaxis / sizeLetterXaxis
) # Quantity of letters that can fit in the X axis
MarginYaxis = height * 0.12 # 12% of the height
MarginXaxis = width * 0.05 # 5% of the width
# 1.1 rem
LineHeight = font_size * 1.1
# rgb = "255,255,255" transform to list
rgb = font_color.split(",")
rgb = (int(rgb[0]), int(rgb[1]), int(rgb[2]))
arrayTitle = []
for word in title.split():
if len(arrayTitle) == 0:
# colocar a primeira palavra no arrayTitl# put the first word in the arrayTitle
arrayTitle.append(word)
else:
# if the size of arrayTitle is less than qtLetters
if len(arrayTitle[-1]) + len(word) < XaxisLetterQty:
arrayTitle[-1] = arrayTitle[-1] + " " + word
else:
arrayTitle.append(word)
draw = ImageDraw.Draw(thumbnail)
# loop for put the title in the thumbnail
for i in range(0, len(arrayTitle)):
# 1.1 rem
draw.text(
(MarginXaxis, MarginYaxis + (LineHeight * i)), arrayTitle[i], rgb, font=font
)
return thumbnail

@ -6,11 +6,14 @@ from time import sleep
from requests import Response
from utils import settings
from cleantext import clean
if sys.version_info[0] >= 3:
from datetime import timezone
def check_ratelimit(response: Response):
def check_ratelimit(response: Response) -> bool:
"""
Checks if the response is a ratelimit response.
If it is, it sleeps for the time specified in the response.
@ -27,7 +30,7 @@ def check_ratelimit(response: Response):
return True
def sleep_until(time):
def sleep_until(time) -> None:
"""
Pause your program until a specific end time.
'time' is either a valid datetime object or unix timestamp in seconds (i.e. seconds since Unix epoch)
@ -86,5 +89,10 @@ def sanitize_text(text: str) -> str:
regex_expr = r"\s['|]|['|]\s|[\^_~@!&;#:\-%—“”‘\"%\*/{}\[\]\(\)\\|<>=+]"
result = re.sub(regex_expr, " ", result)
result = result.replace("+", "plus").replace("&", "and")
# emoji removal if the setting is enabled
if settings.config["settings"]["tts"]["no_emojis"]:
result = clean(result, no_emoji=True)
# remove extra whitespace
return " ".join(result.split())

@ -3,29 +3,38 @@ import random
import re
from pathlib import Path
from random import randrange
from typing import Any, Tuple
from typing import Any, Tuple,Dict
from moviepy.editor import VideoFileClip
from moviepy.editor import VideoFileClip,AudioFileClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
from pytube import YouTube
from pytube.cli import on_progress
from utils import settings
from utils.console import print_step, print_substep
import yt_dlp
# Load background videos
with open("utils/backgrounds.json") as json_file:
background_options = json.load(json_file)
def load_background_options():
background_options = {}
# Load background videos
with open("./utils/background_videos.json") as json_file:
background_options["video"] = json.load(json_file)
# Remove "__comment" from backgrounds
background_options.pop("__comment", None)
# Load background audios
with open("./utils/background_audios.json") as json_file:
background_options["audio"] = json.load(json_file)
# Add position lambda function
# (https://zulko.github.io/moviepy/ref/VideoClip/VideoClip.html#moviepy.video.VideoClip.VideoClip.set_position)
for name in list(background_options.keys()):
pos = background_options[name][3]
# Remove "__comment" from backgrounds
del background_options["video"]["__comment"]
del background_options["audio"]["__comment"]
# Add position lambda function
# (https://zulko.github.io/moviepy/ref/VideoClip/VideoClip.html#moviepy.video.VideoClip.VideoClip.set_position)
for name in list(background_options["video"].keys()):
pos = background_options["video"][name][3]
if pos != "center":
background_options[name][3] = lambda t: ("center", pos + t)
background_options["video"][name][3] = lambda t: ("center", pos + t)
return background_options
def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int, int]:
@ -42,11 +51,11 @@ def get_start_and_end_times(video_length: int, length_of_clip: int) -> Tuple[int
return random_time, random_time + video_length
def get_background_config():
def get_background_config(mode: str):
"""Fetch the background/s configuration"""
try:
choice = str(
settings.config["settings"]["background"]["background_choice"]
settings.config["settings"]["background"][f"background_{mode}"]
).casefold()
except AttributeError:
print_substep("No background selected. Picking random background'")
@ -54,57 +63,98 @@ def get_background_config():
# Handle default / not supported background using default option.
# Default : pick random from supported background.
if not choice or choice not in background_options:
choice = random.choice(list(background_options.keys()))
if not choice or choice not in background_options[mode]:
choice = random.choice(list(background_options[mode].keys()))
return background_options[choice]
return background_options[mode][choice]
def download_background(background_config: Tuple[str, str, str, Any]):
def download_background_video(background_config: Tuple[str, str, str, Any]):
"""Downloads the background/s video from YouTube."""
Path("./assets/backgrounds/").mkdir(parents=True, exist_ok=True)
Path("./assets/backgrounds/video/").mkdir(parents=True, exist_ok=True)
# note: make sure the file name doesn't include an - in it
uri, filename, credit, _ = background_config
if Path(f"assets/backgrounds/{credit}-{filename}").is_file():
if Path(f"assets/backgrounds/video/{credit}-{filename}").is_file():
return
print_step(
"We need to download the backgrounds videos. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds videos... please be patient 🙏 ")
print_substep(f"Downloading {filename} from {uri}")
YouTube(uri, on_progress_callback=on_progress).streams.filter(
res="1080p"
).first().download("assets/backgrounds", filename=f"{credit}-{filename}")
ydl_opts = {
'format': "bestvideo[height<=1080][ext=mp4]",
"outtmpl": f"assets/backgrounds/video/{credit}-{filename}",
"retries": 10,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download(uri)
print_substep("Background video downloaded successfully! 🎉", style="bold green")
def download_background_audio(background_config: Tuple[str, str, str]):
"""Downloads the background/s audio from YouTube."""
Path("./assets/backgrounds/audio/").mkdir(parents=True, exist_ok=True)
# note: make sure the file name doesn't include an - in it
uri, filename, credit = background_config
if Path(f"assets/backgrounds/audio/{credit}-{filename}").is_file():
return
print_step(
"We need to download the backgrounds audio. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds audio... please be patient 🙏 ")
print_substep(f"Downloading {filename} from {uri}")
ydl_opts = {
'outtmpl': f'./assets/backgrounds/audio/{credit}-{filename}',
'format': 'bestaudio/best',
'extract_audio': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([uri])
print_substep("Background audio downloaded successfully! 🎉", style="bold green")
def chop_background_video(
background_config: Tuple[str, str, str, Any], video_length: int, reddit_object: dict
def chop_background(
background_config: Dict[str,Tuple], video_length: int, reddit_object: dict
):
"""Generates the background footage to be used in the video and writes it to assets/temp/background.mp4
"""Generates the background audio and footage to be used in the video and writes it to assets/temp/background.mp3 and assets/temp/background.mp4
Args:
background_config (Tuple[str, str, str, Any]) : Current background configuration
background_config (Dict[str,Tuple]]) : Current background configuration
video_length (int): Length of the clip where the background footage is to be taken out of
"""
print_step("Finding a spot in the backgrounds video to chop...✂️")
choice = f"{background_config[2]}-{background_config[1]}"
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
background = VideoFileClip(f"assets/backgrounds/{choice}")
start_time, end_time = get_start_and_end_times(video_length, background.duration)
if(settings.config["settings"]["background"][f"background_audio_volume"] == 0):
print_step("Volume was set to 0. Skipping background audio creation . . .")
else:
print_step("Finding a spot in the backgrounds audio to chop...✂️")
audio_choice = f"{background_config['audio'][2]}-{background_config['audio'][1]}"
background_audio = AudioFileClip(f"assets/backgrounds/audio/{audio_choice}")
start_time_audio, end_time_audio = get_start_and_end_times(video_length, background_audio.duration)
background_audio = background_audio.subclip(start_time_audio,end_time_audio)
background_audio.write_audiofile(f"assets/temp/{id}/background.mp3")
print_step("Finding a spot in the backgrounds video to chop...✂️")
video_choice = f"{background_config['video'][2]}-{background_config['video'][1]}"
background_video = VideoFileClip(f"assets/backgrounds/video/{video_choice}")
start_time_video, end_time_video = get_start_and_end_times(video_length, background_video.duration)
# Extract video subclip
try:
ffmpeg_extract_subclip(
f"assets/backgrounds/{choice}",
start_time,
end_time,
f"assets/backgrounds/video/{video_choice}",
start_time_video,
end_time_video,
targetname=f"assets/temp/{id}/background.mp4",
)
except (OSError, IOError): # ffmpeg issue see #348
print_substep("FFMPEG issue. Trying again...")
with VideoFileClip(f"assets/backgrounds/{choice}") as video:
new = video.subclip(start_time, end_time)
with VideoFileClip(f"assets/backgrounds/video/{video_choice}") as video:
new = video.subclip(start_time_video, end_time_video)
new.write_videofile(f"assets/temp/{id}/background.mp4")
print_substep("Background video chopped successfully!", style="bold green")
return background_config[2]
return background_config["video"][2]
# Create a tuple for downloads background (background_audio_options, background_video_options)
background_options = load_background_options()

@ -1,27 +1,65 @@
#!/usr/bin/env python3
import multiprocessing
import os
import re
from os.path import exists
from typing import Tuple, Any
from moviepy.audio.AudioClip import concatenate_audioclips, CompositeAudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.VideoClip import ImageClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.compositing.concatenate import concatenate_videoclips
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
import shutil
from os.path import exists # Needs to be imported specifically
from typing import Final
from typing import Tuple, Any, Dict
import ffmpeg
import translators as ts
from PIL import Image
from rich.console import Console
from rich.progress import track
from utils.cleanup import cleanup
from utils.console import print_step, print_substep
from utils.video import Video
from utils.thumbnail import create_thumbnail
from utils.videos import save_data
from utils import settings
console = Console()
W, H = 1080, 1920
import tempfile
import threading
import time
class ProgressFfmpeg(threading.Thread):
def __init__(self, vid_duration_seconds, progress_update_callback):
threading.Thread.__init__(self, name="ProgressFfmpeg")
self.stop_event = threading.Event()
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
self.vid_duration_seconds = vid_duration_seconds
self.progress_update_callback = progress_update_callback
def run(self):
while not self.stop_event.is_set():
latest_progress = self.get_latest_ms_progress()
if latest_progress is not None:
completed_percent = latest_progress / self.vid_duration_seconds
self.progress_update_callback(completed_percent)
time.sleep(1)
def get_latest_ms_progress(self):
lines = self.output_file.readlines()
if lines:
for line in lines:
if "out_time_ms" in line:
out_time_ms = line.split("=")[1]
return int(out_time_ms) / 1000000.0
return None
def stop(self):
self.stop_event.set()
def __enter__(self):
self.start()
return self
def __exit__(self, *args, **kwargs):
self.stop()
def name_normalize(name: str) -> str:
@ -31,25 +69,68 @@ def name_normalize(name: str) -> str:
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name)
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name)
name = re.sub(r"\/", r"", name)
name[:30]
lang = settings.config["reddit"]["thread"]["post_lang"]
if lang:
import translators as ts
print_substep("Translating filename...")
translated_name = ts.google(name, to_language=lang)
return translated_name
else:
return name
def prepare_background(reddit_id: str, W: int, H: int) -> str:
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
output = (
ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4")
.filter("crop", f"ih*({W}/{H})", "ih")
.output(
output_path,
an=None,
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
)
.overwrite_output()
)
try:
output.run(quiet=True)
except Exception as e:
print(e)
exit()
return output_path
def merge_background_audio(audio: ffmpeg, reddit_id: str):
"""Gather an audio and merge with assets/backgrounds/background.mp3
Args:
audio (ffmpeg): The TTS final audio but without background.
reddit_id (str): The ID of subreddit
"""
background_audio_volume = settings.config["settings"]["background"]["background_audio_volume"]
if (background_audio_volume == 0):
return audio # Return the original audio
else:
# sets volume to config
bg_audio = (
ffmpeg.input(f"assets/temp/{reddit_id}/background.mp3")
.filter(
"volume",
background_audio_volume,
)
)
# Merges audio and background_audio
merged_audio = ffmpeg.filter([audio, bg_audio], "amix", duration="longest")
return merged_audio # Return merged audio
def make_final_video(
number_of_clips: int,
length: int,
reddit_obj: dict,
background_config: Tuple[str, str, str, Any],
background_config: Dict[str,Tuple],
):
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args:
@ -58,147 +139,269 @@ def make_final_video(
reddit_obj (dict): The reddit object that contains the posts to read.
background_config (Tuple[str, str, str, Any]): The background config to use.
"""
# try: # if it isn't found (i.e you just updated and copied over config.toml) it will throw an error
# VOLUME_MULTIPLIER = settings.config["settings"]['background']["background_audio_volume"]
# except (TypeError, KeyError):
# print('No background audio volume found in config.toml. Using default value of 1.')
# VOLUME_MULTIPLIER = 1
id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
# settings values
W: Final[int] = int(settings.config["settings"]["resolution_w"])
H: Final[int] = int(settings.config["settings"]["resolution_h"])
reddit_id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
allowOnlyTTSFolder: bool = settings.config["settings"]["background"]["enable_extra_audio"] \
and settings.config["settings"]["background"]["background_audio_volume"] != 0
print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = settings.config["settings"]["opacity"]
transition = settings.config["settings"]["transition"]
background_clip = (
VideoFileClip(f"assets/temp/{id}/background.mp4")
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
background_clip = ffmpeg.input(prepare_background(reddit_id, W=W, H=H))
# Gather all audio clips
audio_clips = list()
if settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [AudioFileClip(f"assets/temp/{id}/mp3/title.mp3")]
audio_clips.insert(1, AudioFileClip(f"assets/temp/{id}/mp3/postaudio.mp3"))
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert(
1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")
)
elif settings.config["settings"]["storymodemethod"] == 1:
audio_clips = [
AudioFileClip(f"assets/temp/{id}/mp3/postaudio-{i}.mp3")
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
for i in track(
range(number_of_clips + 1), "Collecting the audio files..."
)
]
audio_clips.insert(0, AudioFileClip(f"assets/temp/{id}/mp3/title.mp3"))
audio_clips.insert(
0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")
)
else:
audio_clips = [
AudioFileClip(f"assets/temp/{id}/mp3/{i}.mp3")
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3")
for i in range(number_of_clips)
]
audio_clips.insert(0, AudioFileClip(f"assets/temp/{id}/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
# add title to video
image_clips = []
# Gather all images
new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity)
new_transition = (
0 if transition is None or float(transition) > 2 else float(transition)
audio_clips_durations = [
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"][
"duration"
]
)
for i in range(number_of_clips)
]
audio_clips_durations.insert(
0,
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
)
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
ffmpeg.output(
audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}
).overwrite_output().run(quiet=True)
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
screenshot_width = int((W * 45) // 100)
audio = ffmpeg.input(f"assets/temp/{reddit_id}/audio.mp3")
final_audio = merge_background_audio(audio,reddit_id)
image_clips = list()
image_clips.insert(
0,
ImageClip(f"assets/temp/{id}/png/title.png")
.set_duration(audio_clips[0].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
.crossfadein(new_transition)
.crossfadeout(new_transition),
ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter(
"scale", screenshot_width, -1
),
)
current_time = 0
if settings.config["settings"]["storymode"]:
audio_clips_durations = [
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")[
"format"
]["duration"]
)
for i in range(number_of_clips)
]
audio_clips_durations.insert(
0,
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"][
"duration"
]
),
)
if settings.config["settings"]["storymodemethod"] == 0:
image_clips.insert(
1,
ImageClip(f"assets/temp/{id}/png/story_content.png")
.set_duration(audio_clips[1].duration)
.set_position("center")
.resize(width=W - 100)
.set_opacity(float(opacity)),
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter(
"scale", screenshot_width, -1
),
)
background_clip = background_clip.overlay(
image_clips[1],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[1]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[1]
elif settings.config["settings"]["storymodemethod"] == 1:
for i in track(
range(0, number_of_clips + 1), "Collecting the image files..."
):
image_clips.append(
ImageClip(f"assets/temp/{id}/png/img{i}.png")
.set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
# .crossfadein(new_transition)
# .crossfadeout(new_transition)
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
"scale", screenshot_width, -1
)
)
background_clip = background_clip.overlay(
image_clips[i],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[i]
else:
for i in range(0, number_of_clips):
for i in range(0, number_of_clips + 1):
image_clips.append(
ImageClip(f"assets/temp/{id}/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
.crossfadein(new_transition)
.crossfadeout(new_transition)
)
img_clip_pos = background_config[3]
image_concat = concatenate_videoclips(image_clips).set_position(
img_clip_pos
) # note transition kwarg for delay in imgs
image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat])
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")[
"v"
].filter("scale", screenshot_width, -1)
)
background_clip = background_clip.overlay(
image_clips[i],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[i]
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
title_thumb = reddit_obj["thread_title"]
filename = f"{name_normalize(title)[:251]}.mp4"
filename = f"{name_normalize(title)[:251]}"
subreddit = settings.config["reddit"]["thread"]["subreddit"]
if not exists(f"./results/{subreddit}"):
print_substep("The results folder didn't exist so I made it")
print_substep("The 'results' folder could not be found so it was automatically created.")
os.makedirs(f"./results/{subreddit}")
# if settings.config["settings"]['background']["background_audio"] and exists(f"assets/backgrounds/background.mp3"):
# audioclip = mpe.AudioFileClip(f"assets/backgrounds/background.mp3").set_duration(final.duration)
# audioclip = audioclip.fx( volumex, 0.2)
# final_audio = mpe.CompositeAudioClip([final.audio, audioclip])
# # lowered_audio = audio_background.multiply_volume( # todo get this to work
# # VOLUME_MULTIPLIER) # lower volume by background_audio_volume, use with fx
# final.set_audio(final_audio)
# if
final = Video(final).add_watermark(
text=f"Background credit: {background_config[2]}", opacity=0.4, redditid=reddit_obj
if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder:
print_substep("The 'OnlyTTS' folder could not be found so it was automatically created.")
os.makedirs(f"./results/{subreddit}/OnlyTTS")
# create a thumbnail for the video
settingsbackground = settings.config["settings"]["background"]
if settingsbackground["background_thumbnail"]:
if not exists(f"./results/{subreddit}/thumbnails"):
print_substep("The 'results/thumbnails' folder could not be found so it was automatically created.")
os.makedirs(f"./results/{subreddit}/thumbnails")
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
first_image = next(
(
file
for file in os.listdir("assets/backgrounds")
if file.endswith(".png")
),
None,
)
if first_image is None:
print_substep("No png files found in assets/backgrounds", "red")
else:
font_family = settingsbackground["background_thumbnail_font_family"]
font_size = settingsbackground["background_thumbnail_font_size"]
font_color = settingsbackground["background_thumbnail_font_color"]
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
width, height = thumbnail.size
thumbnailSave = create_thumbnail(
thumbnail,
font_family,
font_size,
font_color,
width,
height,
title_thumb,
)
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
print_substep(
f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png"
)
text = f"Background by {background_config['video'][2]}"
background_clip = ffmpeg.drawtext(
background_clip,
text=text,
x=f"(w-text_w)",
y=f"(h-text_h)",
fontsize=12,
fontcolor="White",
fontfile=os.path.join("fonts", "Roboto-Regular.ttf"),
)
print_step("Rendering the video 🎥")
from tqdm import tqdm
pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %")
def on_update_example(progress):
status = round(progress * 100, 2)
old_percentage = pbar.n
pbar.update(status - old_percentage)
final.write_videofile(
f"assets/temp/{id}/temp.mp4",
fps=int(settings.config["settings"]["fps"]),
audio_codec="aac",
audio_bitrate="192k",
verbose=False,
threads=multiprocessing.cpu_count(),
defaultPath = f"results/{subreddit}"
with ProgressFfmpeg(length, on_update_example) as progress:
path = defaultPath + f"/{filename}"
path = path[:251] + ".mp4" #Prevent a error by limiting the path length, do not change this.
ffmpeg.output(
background_clip,
final_audio,
path,
f="mp4",
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
).overwrite_output().global_args("-progress", progress.output_file.name).run(
quiet=True,
overwrite_output=True,
capture_stdout=False,
capture_stderr=False,
)
ffmpeg_extract_subclip(
f"assets/temp/{id}/temp.mp4",
0,
length,
targetname=f"results/{subreddit}/{filename}",
old_percentage = pbar.n
pbar.update(100 - old_percentage)
if(allowOnlyTTSFolder):
path = defaultPath + f"/OnlyTTS/{filename}"
path = path[:251] + ".mp4" #Prevent a error by limiting the path length, do not change this.
print_step("Rendering the Only TTS Video 🎥")
with ProgressFfmpeg(length, on_update_example) as progress:
ffmpeg.output(
background_clip,
audio,
path,
f="mp4",
**{
"c:v": "h264",
"b:v": "20M",
"b:a": "192k",
"threads": multiprocessing.cpu_count(),
},
).overwrite_output().global_args("-progress", progress.output_file.name).run(
quiet=True,
overwrite_output=True,
capture_stdout=False,
capture_stderr=False,
)
save_data(subreddit, filename, title, idx, background_config[2])
old_percentage = pbar.n
pbar.update(100 - old_percentage)
pbar.close()
save_data(subreddit, filename + ".mp4", title, idx, background_config['video'][2])
print_step("Removing temporary files 🗑")
cleanups = cleanup(id)
cleanups = cleanup(reddit_id)
print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!")
print_step(
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'
)
print_step("Done! 🎉 The video is in the results folder 📁")

@ -1,7 +1,7 @@
import json
import re
from pathlib import Path
from typing import Dict
from typing import Dict, Final
import translators as ts
from playwright.async_api import async_playwright # pylint: disable=unused-import
@ -12,7 +12,9 @@ from utils import settings
from utils.console import print_step, print_substep
from utils.imagenarator import imagemaker
# do not remove the above line
from utils.videos import save_data
__all__ = ["download_screenshots_of_reddit_posts"]
def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
@ -22,48 +24,142 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
reddit_object (Dict): Reddit object received from reddit/subreddit.py
screenshot_num (int): Number of screenshots to download
"""
# settings values
W: Final[int] = int(settings.config["settings"]["resolution_w"])
H: Final[int] = int(settings.config["settings"]["resolution_h"])
lang: Final[str] = settings.config["reddit"]["thread"]["post_lang"]
storymode: Final[bool] = settings.config["settings"]["storymode"]
print_step("Downloading screenshots of reddit posts...")
id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
reddit_id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
# ! Make sure the reddit screenshots folder exists
Path(f"assets/temp/{id}/png").mkdir(parents=True, exist_ok=True)
Path(f"assets/temp/{reddit_id}/png").mkdir(parents=True, exist_ok=True)
# set the theme and disable non-essential cookies
if settings.config["settings"]["theme"] == "dark":
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240)
transparent = False
elif settings.config["settings"]["theme"] == "transparent":
if storymode:
# Transparent theme
bgcolor = (0, 0, 0, 0)
txtcolor = (255, 255, 255)
transparent = True
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
else:
# Switch to dark theme
cookie_file = open(
"./video_creation/data/cookie-dark-mode.json", encoding="utf-8"
)
bgcolor = (33, 33, 36, 255)
txtcolor = (240, 240, 240)
transparent = False
else:
cookie_file = open(
"./video_creation/data/cookie-light-mode.json", encoding="utf-8"
)
bgcolor = (255, 255, 255, 255)
txtcolor = (0, 0, 0)
transparent = False
if storymode and settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
print_substep("Generating images...")
return imagemaker(
theme=bgcolor,
reddit_obj=reddit_object,
txtclr=txtcolor,
transparent=transparent,
)
def download(cookie_file, num=None):
screenshot_num = num
screenshot_num: int
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
browser = p.chromium.launch() # headless=False #to check for chrome view
context = browser.new_context()
browser = p.chromium.launch(
headless=True
) # headless=False will show the browser for debugging purposes
# Device scale factor (or dsf for short) allows us to increase the resolution of the screenshots
# When the dsf is 1, the width of the screenshot is 600 pixels
# so we need a dsf such that the width of the screenshot is greater than the final resolution of the video
dsf = (W // 600) + 1
context = browser.new_context(
locale=lang or "en-us",
color_scheme="dark",
viewport=ViewportSize(width=W, height=H),
device_scale_factor=dsf,
)
cookies = json.load(cookie_file)
cookie_file.close()
context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot
# Login to Reddit
print_substep("Logging in to Reddit...")
page = context.new_page()
page.goto("https://www.reddit.com/login", timeout=0)
page.set_viewport_size(ViewportSize(width=1920, height=1080))
page.wait_for_load_state()
page.locator('[name="username"]').fill(
settings.config["reddit"]["creds"]["username"]
)
page.locator('[name="password"]').fill(
settings.config["reddit"]["creds"]["password"]
)
page.locator("button[class$='m-full-width']").click()
page.wait_for_timeout(5000)
login_error_div = page.locator(".AnimatedForm__errorMessage").first
if login_error_div.is_visible():
login_error_message = login_error_div.inner_text()
if login_error_message.strip() == "":
# The div element is empty, no error
pass
else:
# The div contains an error message
print_substep("Your reddit credentials are incorrect! Please modify them accordingly in the config.toml file.", style="red")
exit()
else:
pass
page.wait_for_load_state()
# Get the thread screenshot
page.goto(reddit_object["thread_url"], timeout=0)
page.set_viewport_size(ViewportSize(width=settings.config["settings"]["vwidth"], height=1920))
if page.locator('[data-testid="content-gate"]').is_visible():
page.set_viewport_size(ViewportSize(width=W, height=H))
page.wait_for_load_state()
page.wait_for_timeout(5000)
if page.locator(
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).is_visible():
# This means the post is NSFW and requires to click the proceed button.
print_substep("Post is NSFW. You are spicy...")
page.locator('[data-testid="content-gate"] button').click()
page.wait_for_load_state() # Wait for page to fully load
if page.locator('[data-click-id="text"] button').is_visible():
page.locator(
'[data-click-id="text"] button'
).click() # Remove "Click to see nsfw" Button in Screenshot
"#t3_12hmbug > div > div._3xX726aBn29LDbsDtzr_6E._1Ap4F5maDtT1E1YuCiaO0r.D3IL3FD0RFy_mkKLPwL4 > div > div > button"
).click()
page.wait_for_load_state() # Wait for page to fully load
# translate code
if page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).is_visible():
page.locator(
"#SHORTCUT_FOCUSABLE_DIV > div:nth-child(7) > div > div > div > header > div > div._1m0iFpls1wkPZJVo38-LSh > button > i"
).click() # Interest popup is showing, this code will close it
if settings.config["reddit"]["thread"]["post_lang"]:
if lang:
print_substep("Translating post...")
texts_in_tl = ts.google(
reddit_object["thread_title"],
to_language=settings.config["reddit"]["thread"]["post_lang"],
to_language=lang,
)
page.evaluate(
@ -72,22 +168,43 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
)
else:
print_substep("Skipping translation...")
postcontentpath = f"assets/temp/{id}/png/title.png"
postcontentpath = f"assets/temp/{reddit_id}/png/title.png"
try:
page.locator('[data-test-id="post-content"]').screenshot(
path=postcontentpath
)
except Exception as e:
print_substep("Something went wrong!", style="red")
resp = input(
"Something went wrong with making the screenshots! Do you want to skip the post? (y/n) "
)
if settings.config["settings"]["storymode"]:
if resp.casefold().startswith("y"):
save_data("", "", "skipped", reddit_id, "")
print_substep(
"The post is successfully skipped! You can now restart the program and this post will skipped.",
"green",
)
resp = input(
"Do you want the error traceback for debugging purposes? (y/n)"
)
if not resp.casefold().startswith("y"):
exit()
raise e
try: # new change
if storymode:
page.locator('[data-click-id="text"]').first.screenshot(
path=f"assets/temp/{id}/png/story_content.png"
path=f"assets/temp/{reddit_id}/png/story_content.png"
)
except:
exit
if not settings.config["settings"]["storymode"]:
else:
for idx, comment in enumerate(
track(reddit_object["comments"], "Downloading screenshots...")
track(
reddit_object["comments"][:screenshot_num],
"Downloading screenshots...",
)
):
# Stop if we have reached the screenshot_num
if idx >= screenshot_num:
@ -103,9 +220,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
if settings.config["reddit"]["thread"]["post_lang"]:
comment_tl = ts.google(
comment["comment_body"],
to_language=settings.config["reddit"]["thread"][
"post_lang"
],
to_language=settings.config["reddit"]["thread"]["post_lang"],
)
page.evaluate(
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',
@ -113,13 +228,17 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
)
try:
page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/{id}/png/comment_{idx}.png"
path=f"assets/temp/{reddit_id}/png/comment_{idx}.png"
)
except TimeoutError:
del reddit_object["comments"]
screenshot_num -= 1
screenshot_num += 1
print("TimeoutError: Skipping screenshot...")
continue
# close browser instance when we are done using it
browser.close()
print_substep("Screenshots downloaded Successfully.", style="bold green")
# story=False

@ -1,5 +1,3 @@
#!/usr/bin/env python
from typing import Tuple
from rich.console import Console

Loading…
Cancel
Save