Merge pull request #558 from ART3MISTICAL/master

Converted spaces into tabs for all files
pull/560/head
Jason 2 years ago committed by GitHub
commit 88c3145a82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,7 +5,7 @@
version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"

@ -12,61 +12,61 @@
name: "CodeQL"
on:
push:
branches: [ "master" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "master" ]
schedule:
- cron: '16 14 * * 3'
push:
branches: [ "master" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "master" ]
schedule:
- cron: '16 14 * * 3'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'python' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
strategy:
fail-fast: false
matrix:
language: [ 'python' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
steps:
- name: Checkout repository
uses: actions/checkout@v3
steps:
- name: Checkout repository
uses: actions/checkout@v3
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
# Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh
# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

@ -43,16 +43,16 @@ The only original thing being done is the editing and gathering of all materials
2b **Manual Install**: Rename `.env.template` to `.env` and replace all values with the appropriate fields. To get Reddit keys (**required**), visit [the Reddit Apps page.](https://www.reddit.com/prefs/apps) TL;DR set up an app that is a "script". Copy your keys into the `.env` file, along with whether your account uses two-factor authentication.
3. install [SoX](https://sourceforge.net/projects/sox/files/sox/)
4.
5. Run `pip3 install -r requirements.txt`
3. Install [SoX](https://sourceforge.net/projects/sox/files/sox/)
4. Run `pip3 install -r requirements.txt`
6. Run `playwright install` and `playwright install-deps`.
5. Run `playwright install` and `playwright install-deps`.
7. Run `python3 main.py` (unless you chose automatic install, then the installer will automatically run main.py)
6. Run `python3 main.py` (unless you chose automatic install, then the installer will automatically run main.py)
required\*\*), visit [the Reddit Apps page.](https://www.reddit.com/prefs/apps) TL;DR set up an app that is a "script".
Copy your keys into the `.env` file, along with whether your account uses two-factor authentication.
8. Enjoy 😎
7. Enjoy 😎
## Video
@ -71,7 +71,7 @@ I have tried to simplify the code so anyone can read it and start contributing a
- [x] Allowing users to change voice.
- [x] Checks if a video has already been created
- [x] Light and Dark modes
- [x] Nsfw post filter
- [x] NSFW post filter
Please read our [contributing guidelines](CONTRIBUTING.md) for more detailed information.

@ -2,12 +2,12 @@ from gtts import gTTS
class GTTS:
def tts(
self,
req_text: str = "Google Text To Speech",
filename: str = "title.mp3",
random_speaker=False,
censer=False,
):
tts = gTTS(text=req_text, lang="en", slow=False)
tts.save(f"{filename}")
def tts(
self,
req_text: str = "Google Text To Speech",
filename: str = "title.mp3",
random_speaker=False,
censor=False,
):
tts = gTTS(text=req_text, lang="en", slow=False)
tts.save(f"{filename}")

@ -8,79 +8,101 @@ from moviepy.audio.AudioClip import concatenate_audioclips, CompositeAudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from requests.exceptions import JSONDecodeError
voices = ['Brian', 'Emma', 'Russell', 'Joey', 'Matthew', 'Joanna', 'Kimberly', 'Amy', 'Geraint', 'Nicole', 'Justin', 'Ivy', 'Kendra', 'Salli', 'Raveena']
voices = [
"Brian",
"Emma",
"Russell",
"Joey",
"Matthew",
"Joanna",
"Kimberly",
"Amy",
"Geraint",
"Nicole",
"Justin",
"Ivy",
"Kendra",
"Salli",
"Raveena",
]
# valid voices https://lazypy.ro/tts/
class POLLY:
def __init__(self):
self.url = 'https://streamlabs.com/polly/speak'
def __init__(self):
self.url = "https://streamlabs.com/polly/speak"
def tts(
self,
req_text: str = "Amazon Text To Speech",
filename: str = "title.mp3",
random_speaker=False,
censer=False,
):
if random_speaker:
voice = self.randomvoice()
else:
if not os.getenv('VOICE'):
return ValueError('Please set the environment variable VOICE to a valid voice. options are: {}'.format(voices))
voice = str(os.getenv("VOICE")).capitalize()
body = {'voice': voice, 'text': req_text, 'service': 'polly'}
response = requests.post(self.url, data=body)
try:
voice_data = requests.get(response.json()['speak_url'])
with open(filename, 'wb') as f:
f.write(voice_data.content)
except (KeyError, JSONDecodeError):
if response.json()['error'] == 'Text length is too long!':
chunks = [
m.group().strip() for m in re.finditer(r" *((.{0,499})(\.|.$))", req_text)
]
def tts(
self,
req_text: str = "Amazon Text To Speech",
filename: str = "title.mp3",
random_speaker=False,
censor=False,
):
if random_speaker:
voice = self.randomvoice()
else:
if not os.getenv("VOICE"):
return ValueError(
"Please set the environment variable VOICE to a valid voice. options are: {}".format(
voices
)
)
voice = str(os.getenv("VOICE")).capitalize()
body = {"voice": voice, "text": req_text, "service": "polly"}
response = requests.post(self.url, data=body)
try:
voice_data = requests.get(response.json()["speak_url"])
with open(filename, "wb") as f:
f.write(voice_data.content)
except (KeyError, JSONDecodeError):
if response.json()["error"] == "Text length is too long!":
chunks = [
m.group().strip() for m in re.finditer(r" *((.{0,499})(\.|.$))", req_text)
]
audio_clips = []
cbn = sox.Combiner()
audio_clips = []
cbn = sox.Combiner()
chunkId = 0
for chunk in chunks:
body = {'voice': voice, 'text': chunk, 'service': 'polly'}
resp = requests.post(self.url, data=body)
voice_data = requests.get(resp.json()['speak_url'])
with open(filename.replace(".mp3", f"-{chunkId}.mp3"), "wb") as out:
out.write(voice_data.content)
chunkId = 0
for chunk in chunks:
body = {"voice": voice, "text": chunk, "service": "polly"}
resp = requests.post(self.url, data=body)
voice_data = requests.get(resp.json()["speak_url"])
with open(filename.replace(".mp3", f"-{chunkId}.mp3"), "wb") as out:
out.write(voice_data.content)
audio_clips.append(filename.replace(".mp3", f"-{chunkId}.mp3"))
audio_clips.append(filename.replace(".mp3", f"-{chunkId}.mp3"))
chunkId = chunkId + 1
try:
if len(audio_clips) > 1:
cbn.convert(samplerate=44100, n_channels=2)
cbn.build(audio_clips, filename, "concatenate")
else:
os.rename(audio_clips[0], filename)
except (sox.core.SoxError,
FileNotFoundError): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339
for clip in audio_clips:
i = audio_clips.index(clip) # get the index of the clip
audio_clips = (
audio_clips[:i] + [AudioFileClip(clip)] + audio_clips[i + 1:]
) # replace the clip with an AudioFileClip
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
audio_composite.write_audiofile(filename, 44100, 2, 2000, None)
chunkId = chunkId + 1
try:
if len(audio_clips) > 1:
cbn.convert(samplerate=44100, n_channels=2)
cbn.build(audio_clips, filename, "concatenate")
else:
os.rename(audio_clips[0], filename)
except (
sox.core.SoxError,
FileNotFoundError,
): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339
for clip in audio_clips:
i = audio_clips.index(clip) # get the index of the clip
audio_clips = (
audio_clips[:i] + [AudioFileClip(clip)] + audio_clips[i + 1 :]
) # replace the clip with an AudioFileClip
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
audio_composite.write_audiofile(filename, 44100, 2, 2000, None)
def make_readable(self, text):
"""
Amazon Polly fails to read some symbols properly such as '& (and)'.
So we normalize input text before passing it to the service
"""
text = text.replace('&', 'and')
return text
def make_readable(self, text):
"""
Amazon Polly fails to read some symbols properly such as '& (and)'.
So we normalize input text before passing it to the service
"""
text = text.replace("&", "and")
return text
def randomvoice(self):
return random.choice(voices)
def randomvoice(self):
return random.choice(voices)

@ -15,49 +15,49 @@ from requests.adapters import HTTPAdapter, Retry
# https://twitter.com/scanlime/status/1512598559769702406
nonhuman = [ # DISNEY VOICES
"en_us_ghostface", # Ghost Face
"en_us_chewbacca", # Chewbacca
"en_us_c3po", # C3PO
"en_us_stitch", # Stitch
"en_us_stormtrooper", # Stormtrooper
"en_us_rocket", # Rocket
# ENGLISH VOICES
"en_us_ghostface", # Ghost Face
"en_us_chewbacca", # Chewbacca
"en_us_c3po", # C3PO
"en_us_stitch", # Stitch
"en_us_stormtrooper", # Stormtrooper
"en_us_rocket", # Rocket
# ENGLISH VOICES
]
human = [
"en_au_001", # English AU - Female
"en_au_002", # English AU - Male
"en_uk_001", # English UK - Male 1
"en_uk_003", # English UK - Male 2
"en_us_001", # English US - Female (Int. 1)
"en_us_002", # English US - Female (Int. 2)
"en_us_006", # English US - Male 1
"en_us_007", # English US - Male 2
"en_us_009", # English US - Male 3
"en_us_010",
"en_au_001", # English AU - Female
"en_au_002", # English AU - Male
"en_uk_001", # English UK - Male 1
"en_uk_003", # English UK - Male 2
"en_us_001", # English US - Female (Int. 1)
"en_us_002", # English US - Female (Int. 2)
"en_us_006", # English US - Male 1
"en_us_007", # English US - Male 2
"en_us_009", # English US - Male 3
"en_us_010",
]
voices = nonhuman + human
noneng = [
"fr_001", # French - Male 1
"fr_002", # French - Male 2
"de_001", # German - Female
"de_002", # German - Male
"es_002", # Spanish - Male
# AMERICA VOICES
"es_mx_002", # Spanish MX - Male
"br_001", # Portuguese BR - Female 1
"br_003", # Portuguese BR - Female 2
"br_004", # Portuguese BR - Female 3
"br_005", # Portuguese BR - Male
# ASIA VOICES
"id_001", # Indonesian - Female
"jp_001", # Japanese - Female 1
"jp_003", # Japanese - Female 2
"jp_005", # Japanese - Female 3
"jp_006", # Japanese - Male
"kr_002", # Korean - Male 1
"kr_003", # Korean - Female
"kr_004", # Korean - Male 2
"fr_001", # French - Male 1
"fr_002", # French - Male 2
"de_001", # German - Female
"de_002", # German - Male
"es_002", # Spanish - Male
# AMERICA VOICES
"es_mx_002", # Spanish MX - Male
"br_001", # Portuguese BR - Female 1
"br_003", # Portuguese BR - Female 2
"br_004", # Portuguese BR - Female 3
"br_005", # Portuguese BR - Male
# ASIA VOICES
"id_001", # Indonesian - Female
"jp_001", # Japanese - Female 1
"jp_003", # Japanese - Female 2
"jp_005", # Japanese - Female 3
"jp_006", # Japanese - Male
"kr_002", # Korean - Male 1
"kr_003", # Korean - Female
"kr_004", # Korean - Male 2
]
@ -66,79 +66,74 @@ noneng = [
class TikTok: # TikTok Text-to-Speech Wrapper
def __init__(self):
self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
def tts(
self,
req_text: str = "TikTok Text To Speech",
filename: str = "title.mp3",
random_speaker: bool = False,
censer=False,
):
req_text = req_text.replace("+", "plus").replace(" ", "+").replace("&", "and")
if censer:
# req_text = pf.censor(req_text)
pass
voice = (
self.randomvoice()
if random_speaker
else (os.getenv("VOICE") or random.choice(human))
)
chunks = [
m.group().strip() for m in re.finditer(r" *((.{0,299})(\.|.$))", req_text)
]
audio_clips = []
cbn = sox.Combiner()
# cbn.set_input_format(file_type=["mp3" for _ in chunks])
chunkId = 0
for chunk in chunks:
try:
r = requests.post(
f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0"
)
except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
r = session.post(
f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0"
)
print(r.text)
vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr)
with open(filename.replace(".mp3", f"-{chunkId}.mp3"), "wb") as out:
out.write(b64d)
audio_clips.append(filename.replace(".mp3", f"-{chunkId}.mp3"))
chunkId = chunkId + 1
try:
if len(audio_clips) > 1:
cbn.convert(samplerate=44100, n_channels=2)
cbn.build(audio_clips, filename, "concatenate")
else:
os.rename(audio_clips[0], filename)
except (sox.core.SoxError, FileNotFoundError): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339
for clip in audio_clips:
i = audio_clips.index(clip) # get the index of the clip
audio_clips = (
audio_clips[:i] + [AudioFileClip(clip)] + audio_clips[i + 1 :]
) # replace the clip with an AudioFileClip
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
audio_composite.write_audiofile(filename, 44100, 2, 2000, None)
@staticmethod
def randomvoice():
ok_or_good = random.randrange(1, 10)
if ok_or_good == 1: # 1/10 chance of ok voice
return random.choice(voices)
return random.choice(human) # 9/10 chance of good voice
def __init__(self):
self.URI_BASE = "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
def tts(
self,
req_text: str = "TikTok Text To Speech",
filename: str = "title.mp3",
random_speaker: bool = False,
censor=False,
):
req_text = req_text.replace("+", "plus").replace(" ", "+").replace("&", "and")
if censor:
# req_text = pf.censor(req_text)
pass
voice = (
self.randomvoice() if random_speaker else (os.getenv("VOICE") or random.choice(human))
)
chunks = [m.group().strip() for m in re.finditer(r" *((.{0,299})(\.|.$))", req_text)]
audio_clips = []
cbn = sox.Combiner()
# cbn.set_input_format(file_type=["mp3" for _ in chunks])
chunkId = 0
for chunk in chunks:
try:
r = requests.post(f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0")
except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
r = session.post(f"{self.URI_BASE}{voice}&req_text={chunk}&speaker_map_type=0")
print(r.text)
vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr)
with open(filename.replace(".mp3", f"-{chunkId}.mp3"), "wb") as out:
out.write(b64d)
audio_clips.append(filename.replace(".mp3", f"-{chunkId}.mp3"))
chunkId = chunkId + 1
try:
if len(audio_clips) > 1:
cbn.convert(samplerate=44100, n_channels=2)
cbn.build(audio_clips, filename, "concatenate")
else:
os.rename(audio_clips[0], filename)
except (
sox.core.SoxError,
FileNotFoundError,
): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339
for clip in audio_clips:
i = audio_clips.index(clip) # get the index of the clip
audio_clips = (
audio_clips[:i] + [AudioFileClip(clip)] + audio_clips[i + 1 :]
) # replace the clip with an AudioFileClip
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
audio_composite.write_audiofile(filename, 44100, 2, 2000, None)
@staticmethod
def randomvoice():
ok_or_good = random.randrange(1, 10)
if ok_or_good == 1: # 1/10 chance of ok voice
return random.choice(voices)
return random.choice(human) # 9/10 chance of good voice

@ -10,12 +10,12 @@ CHOICE_DIR = {"tiktok": TikTok, "gtts": GTTS, 'polly': POLLY}
class TTS:
def __new__(cls):
load_dotenv()
CHOICE = getenv("TTsChoice").casefold()
valid_keys = [key.lower() for key in CHOICE_DIR.keys()]
if CHOICE not in valid_keys:
raise ValueError(
f"{CHOICE} is not valid. Please use one of these {valid_keys} options"
)
return CHOICE_DIR.get(CHOICE)()
def __new__(cls):
load_dotenv()
CHOICE = getenv("TTsChoice").casefold()
valid_keys = [key.lower() for key in CHOICE_DIR.keys()]
if CHOICE not in valid_keys:
raise ValueError(
f"{CHOICE} is not valid. Please use one of these {valid_keys} options"
)
return CHOICE_DIR.get(CHOICE)()

@ -11,15 +11,16 @@ from video_creation.final_video import make_final_video
from video_creation.screenshot_downloader import download_screenshots_of_reddit_posts
from video_creation.voices import save_text_to_mp3
banner = """
print(
"""
"""
print(banner)
)
load_dotenv()
# Modified by JasonLovesDoggo
print_markdown(
@ -47,7 +48,7 @@ def main():
download_screenshots_of_reddit_posts(reddit_object, number_of_comments)
download_background()
chop_background_video(length)
final_video = make_final_video(number_of_comments, length)
make_final_video(number_of_comments, length)
def run_many(times):

@ -11,93 +11,85 @@ TEXT_WHITELIST = set("abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ 1234
def textify(text):
return "".join(filter(TEXT_WHITELIST.__contains__, text))
return "".join(filter(TEXT_WHITELIST.__contains__, text))
def get_subreddit_threads():
"""
Returns a list of threads from the AskReddit subreddit.
"""
global submission
print_substep("Logging into Reddit.")
"""
Returns a list of threads from the AskReddit subreddit.
"""
global submission
print_substep("Logging into Reddit.")
content = {}
if str(getenv("REDDIT_2FA")).casefold() == "yes":
print(
"\nEnter your two-factor authentication code from your authenticator app.\n"
)
code = input("> ")
print()
pw = getenv("REDDIT_PASSWORD")
passkey = f"{pw}:{code}"
else:
passkey = getenv("REDDIT_PASSWORD")
reddit = praw.Reddit(
client_id=getenv("REDDIT_CLIENT_ID"),
client_secret=getenv("REDDIT_CLIENT_SECRET"),
user_agent="Accessing Reddit threads",
username=getenv("REDDIT_USERNAME"),
passkey=passkey,
check_for_async=False,
)
"""
Ask user for subreddit input
"""
print_step("Getting subreddit threads...")
if not getenv(
"SUBREDDIT"
): # note to self. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
subreddit = reddit.subreddit(
input("What subreddit would you like to pull from? ")
) # if the env isnt set, ask user
else:
print_substep(
f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config"
)
subreddit = reddit.subreddit(
getenv("SUBREDDIT")
) # Allows you to specify in .env. Done for automation purposes.
content = {}
if str(getenv("REDDIT_2FA")).casefold() == "yes":
print("\nEnter your two-factor authentication code from your authenticator app.\n")
code = input("> ")
print()
pw = getenv("REDDIT_PASSWORD")
passkey = f"{pw}:{code}"
else:
passkey = getenv("REDDIT_PASSWORD")
reddit = praw.Reddit(
client_id=getenv("REDDIT_CLIENT_ID"),
client_secret=getenv("REDDIT_CLIENT_SECRET"),
user_agent="Accessing Reddit threads",
username=getenv("REDDIT_USERNAME"),
passkey=passkey,
check_for_async=False,
)
"""
Ask user for subreddit input
"""
print_step("Getting subreddit threads...")
if not getenv(
"SUBREDDIT"
): # note to self. you can have multiple subreddits via reddit.subreddit("redditdev+learnpython")
subreddit = reddit.subreddit(
input("What subreddit would you like to pull from? ")
) # if the env isnt set, ask user
else:
print_substep(f"Using subreddit: r/{getenv('SUBREDDIT')} from environment variable config")
subreddit = reddit.subreddit(
getenv("SUBREDDIT")
) # Allows you to specify in .env. Done for automation purposes.
if getenv("POST_ID"):
submission = reddit.submission(id=getenv("POST_ID"))
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
submission = check_done(submission) # double checking
if submission is None:
return get_subreddit_threads() # submission already done. rerun
upvotes = submission.score
ratio = submission.upvote_ratio * 100
num_comments = submission.num_comments
if getenv("POST_ID"):
submission = reddit.submission(id=getenv("POST_ID"))
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
submission = check_done(submission) # double checking
if submission is None:
return get_subreddit_threads() # submission already done. rerun
upvotes = submission.score
ratio = submission.upvote_ratio * 100
num_comments = submission.num_comments
print_substep(f"Video will be: {submission.title} :thumbsup:", style="bold green")
print_substep(f"Thread has " + str(upvotes) + " upvotes", style="bold blue")
print_substep(
f"Thread has a upvote ratio of " + str(ratio) + "%", style="bold blue"
)
print_substep(f"Thread has " + str(num_comments) + " comments", style="bold blue")
environ["VIDEO_TITLE"] = str(
textify(submission.title)
) # todo use global instend of env vars
environ["VIDEO_ID"] = str(textify(submission.id))
print_substep(f"Video will be: {submission.title} :thumbsup:", style="bold green")
print_substep(f"Thread has {upvotes} upvotes", style="bold blue")
print_substep(f"Thread has a upvote ratio of {ratio}%", style="bold blue")
print_substep(f"Thread has {num_comments} comments", style="bold blue")
environ["VIDEO_TITLE"] = str(textify(submission.title)) # todo use global instend of env vars
environ["VIDEO_ID"] = str(textify(submission.id))
content["thread_url"] = f"https://reddit.com{submission.permalink}"
content["thread_title"] = submission.title
# content["thread_content"] = submission.content
content["comments"] = []
for top_level_comment in submission.comments:
if isinstance(top_level_comment, MoreComments):
continue
if top_level_comment.body in ["[removed]", "[deleted]"]:
continue # # see https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/78
if not top_level_comment.stickied:
if len(top_level_comment.body) <= int(environ["MAX_COMMENT_LENGTH"]):
content["comments"].append(
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id,
}
)
print_substep("Received subreddit threads Successfully.", style="bold green")
return content
content["thread_url"] = f"https://reddit.com{submission.permalink}"
content["thread_title"] = submission.title
# content["thread_content"] = submission.content
content["comments"] = []
for top_level_comment in submission.comments:
if isinstance(top_level_comment, MoreComments):
continue
if top_level_comment.body in ["[removed]", "[deleted]"]:
continue # # see https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/78
if not top_level_comment.stickied:
if len(top_level_comment.body) <= int(environ["MAX_COMMENT_LENGTH"]):
content["comments"].append(
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id,
}
)
print_substep("Received subreddit threads Successfully.", style="bold green")
return content

@ -15,52 +15,52 @@ console = Console()
def handle_input(
message: str = "",
check_type=False,
match: str = "",
err_message: str = "",
nmin=None,
nmax=None,
oob_error="",
message: str = "",
check_type=False,
match: str = "",
err_message: str = "",
nmin=None,
nmax=None,
oob_error="",
):
match = re.compile(match + "$")
while True:
user_input = input(message + "\n> ").strip()
if re.match(match, user_input) is not None:
if check_type is not False:
try:
user_input = check_type(user_input)
if nmin is not None and user_input < nmin:
console.log("[red]" + oob_error) # Input too low failstate
continue
if nmax is not None and user_input > nmax:
console.log("[red]" + oob_error) # Input too high
continue
break # Successful type conversion and number in bounds
except ValueError:
console.log("[red]" + err_message) # Type conversion failed
continue
if (
nmin is not None and len(user_input) < nmin
): # Check if string is long enough
console.log("[red]" + oob_error)
continue
if (
nmax is not None and len(user_input) > nmax
): # Check if string is not too long
console.log("[red]" + oob_error)
continue
break
console.log("[red]" + err_message)
return user_input
match = re.compile(match + "$")
while True:
user_input = input(message + "\n> ").strip()
if re.match(match, user_input) is not None:
if check_type is not False:
try:
user_input = check_type(user_input)
if nmin is not None and user_input < nmin:
console.log("[red]" + oob_error) # Input too low failstate
continue
if nmax is not None and user_input > nmax:
console.log("[red]" + oob_error) # Input too high
continue
break # Successful type conversion and number in bounds
except ValueError:
console.log("[red]" + err_message) # Type conversion failed
continue
if (
nmin is not None and len(user_input) < nmin
): # Check if string is long enough
console.log("[red]" + oob_error)
continue
if (
nmax is not None and len(user_input) > nmax
): # Check if string is not too long
console.log("[red]" + oob_error)
continue
break
console.log("[red]" + err_message)
return user_input
if os.path.isfile(".setup-done-before"):
console.log(
"[red]Setup was already completed! Please make sure you have to run this script again. If that is such, delete the file .setup-done-before"
)
exit()
console.log(
"[red]Setup was already completed! Please make sure you have to run this script again. If that is such, delete the file .setup-done-before"
)
exit()
# These lines ensure the user:
# - knows they are in setup mode
@ -68,26 +68,26 @@ if os.path.isfile(".setup-done-before"):
print_step("Setup Assistant")
print_markdown(
"### You're in the setup wizard. Ensure you're supposed to be here, then type yes to continue. If you're not sure, type no to quit."
"### You're in the setup wizard. Ensure you're supposed to be here, then type yes to continue. If you're not sure, type no to quit."
)
# This Input is used to ensure the user is sure they want to continue.
if input("Are you sure you want to continue? > ").strip().casefold() != "yes":
console.print("[red]Exiting...")
exit()
console.print("[red]Exiting...")
exit()
# This code is inaccessible if the prior check fails, and thus an else statement is unnecessary
# Again, let them know they are about to erase all other setup data.
console.print(
"[bold red] This will overwrite your current settings. Are you sure you want to continue? [bold green]yes/no"
"[bold red] This will overwrite your current settings. Are you sure you want to continue? [bold green]yes/no"
)
if input("Are you sure you want to continue? > ").strip().casefold() != "yes":
console.print("[red]Abort mission! Exiting...")
exit()
console.print("[red]Abort mission! Exiting...")
exit()
# This is once again inaccessible if the prior checks fail
# Once they confirm, move on with the script.
console.print("[bold green]Alright! Let's get started!")
@ -103,18 +103,18 @@ console.log("[bold green]Opacity (range of 0-1, decimals are OK)")
console.log("[bold green]Subreddit (without r/ or /r/)")
console.log("[bold green]Theme (light or dark)")
console.print(
"[green]If you don't have these, please follow the instructions in the README.md file to set them up."
"[green]If you don't have these, please follow the instructions in the README.md file to set them up."
)
console.print(
"[green]If you do have these, type yes to continue. If you dont, go ahead and grab those quickly and come back."
"[green]If you do have these, type yes to continue. If you dont, go ahead and grab those quickly and come back."
)
print()
if input("Are you sure you have the credentials? > ").strip().casefold() != "yes":
console.print("[red]I don't understand that.")
console.print("[red]Exiting...")
exit()
console.print("[red]I don't understand that.")
console.print("[red]Exiting...")
exit()
console.print("[bold green]Alright! Let's get started!")
@ -123,69 +123,69 @@ console.print("[bold green]Alright! Let's get started!")
console.log("Enter your credentials now.")
client_id = handle_input(
"Client ID > ",
False,
"[-a-zA-Z0-9._~+/]+=*",
"That is somehow not a correct ID, try again.",
12,
30,
"The ID should be over 12 and under 30 characters, double check your input.",
"Client ID > ",
False,
"[-a-zA-Z0-9._~+/]+=*",
"That is somehow not a correct ID, try again.",
12,
30,
"The ID should be over 12 and under 30 characters, double check your input.",
)
client_sec = handle_input(
"Client Secret > ",
False,
"[-a-zA-Z0-9._~+/]+=*",
"That is somehow not a correct secret, try again.",
20,
40,
"The secret should be over 20 and under 40 characters, double check your input.",
"Client Secret > ",
False,
"[-a-zA-Z0-9._~+/]+=*",
"That is somehow not a correct secret, try again.",
20,
40,
"The secret should be over 20 and under 40 characters, double check your input.",
)
user = handle_input(
"Username > ",
False,
r"[_0-9a-zA-Z]+",
"That is not a valid user",
3,
20,
"A username HAS to be between 3 and 20 characters",
"Username > ",
False,
r"[_0-9a-zA-Z]+",
"That is not a valid user",
3,
20,
"A username HAS to be between 3 and 20 characters",
)
passw = handle_input("Password > ", False, ".*", "", 8, None, "Password too short")
twofactor = handle_input(
"2fa Enabled? (yes/no) > ",
False,
r"(yes)|(no)",
"You need to input either yes or no",
"2fa Enabled? (yes/no) > ",
False,
r"(yes)|(no)",
"You need to input either yes or no",
)
opacity = handle_input(
"Opacity? (range of 0-1) > ",
float,
".*",
"You need to input a number between 0 and 1",
0,
1,
"Your number is not between 0 and 1",
"Opacity? (range of 0-1) > ",
float,
".*",
"You need to input a number between 0 and 1",
0,
1,
"Your number is not between 0 and 1",
)
subreddit = handle_input(
"Subreddit (without r/) > ",
False,
r"[_0-9a-zA-Z]+",
"This subreddit cannot exist, make sure you typed it in correctly and removed the r/ (or /r/).",
3,
20,
"A subreddit name HAS to be between 3 and 20 characters",
"Subreddit (without r/) > ",
False,
r"[_0-9a-zA-Z]+",
"This subreddit cannot exist, make sure you typed it in correctly and removed the r/ (or /r/).",
3,
20,
"A subreddit name HAS to be between 3 and 20 characters",
)
theme = handle_input(
"Theme? (light or dark) > ",
False,
r"(light)|(dark)",
"You need to input 'light' or 'dark'",
"Theme? (light or dark) > ",
False,
r"(light)|(dark)",
"You need to input 'light' or 'dark'",
)
loader = Loader("Attempting to save your credentials...", "Done!").start()
# you can also put a while loop here, e.g. while VideoIsBeingMade == True: ...
console.log("Writing to the .env file...")
with open(".env", "w") as f:
f.write(
f"""REDDIT_CLIENT_ID="{client_id}"
f.write(
f"""REDDIT_CLIENT_ID="{client_id}"
REDDIT_CLIENT_SECRET="{client_sec}"
REDDIT_USERNAME="{user}"
REDDIT_PASSWORD="{passw}"
@ -194,12 +194,12 @@ THEME="{theme}"
SUBREDDIT="{subreddit}"
OPACITY={opacity}
"""
)
)
with open(".setup-done-before", "w") as f:
f.write(
"This file blocks the setup assistant from running again. Delete this file to run setup again."
)
f.write(
"This file blocks the setup assistant from running again. Delete this file to run setup again."
)
loader.stop()

@ -3,22 +3,22 @@ from os.path import exists
def cleanup() -> int:
if exists("./assets/temp"):
count = 0
files = [
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
count += len(files)
for f in files:
os.remove(f)
try:
for file in os.listdir("./assets/temp/mp4"):
count += 1
os.remove("./assets/temp/mp4/" + file)
except FileNotFoundError:
pass
for file in os.listdir("./assets/temp/mp3"):
count += 1
os.remove("./assets/temp/mp3/" + file)
return count
return 0
if exists("./assets/temp"):
count = 0
files = [
f for f in os.listdir(".") if f.endswith(".mp4") and "temp" in f.lower()
]
count += len(files)
for f in files:
os.remove(f)
try:
for file in os.listdir("./assets/temp/mp4"):
count += 1
os.remove("./assets/temp/mp4/" + file)
except FileNotFoundError:
pass
for file in os.listdir("./assets/temp/mp3"):
count += 1
os.remove("./assets/temp/mp3/" + file)
return count
return 0

@ -3,36 +3,36 @@
from dotenv import dotenv_values
DEFAULTS = {
"SUBREDDIT": "AskReddit",
"ALLOW_NSFW": "False",
"POST_ID": "",
"THEME": "DARK",
"REDDIT_2FA": "no",
"TIMES_TO_RUN": "",
"MAX_COMMENT_LENGTH": "500",
"OPACITY": "1",
"VOICE": "en_us_001",
"STORYMODE": "False",
"SUBREDDIT": "AskReddit",
"ALLOW_NSFW": "False",
"POST_ID": "",
"THEME": "DARK",
"REDDIT_2FA": "no",
"TIMES_TO_RUN": "",
"MAX_COMMENT_LENGTH": "500",
"OPACITY": "1",
"VOICE": "en_us_001",
"STORYMODE": "False",
}
class Config:
def __init__(self):
self.raw = dotenv_values("../.env")
self.load_attrs()
def __getattr__(self, attr): # code completion for attributes fix.
return getattr(self, attr)
def load_attrs(self):
for key, value in self.raw.items():
self.add_attr(key, value)
def add_attr(self, key, value):
if value is None or value == "":
setattr(self, key, DEFAULTS[key])
else:
setattr(self, key, str(value))
def __init__(self):
self.raw = dotenv_values("../.env")
self.load_attrs()
def __getattr__(self, attr): # code completion for attributes fix.
return getattr(self, attr)
def load_attrs(self):
for key, value in self.raw.items():
self.add_attr(key, value)
def add_attr(self, key, value):
if value is None or value == "":
setattr(self, key, DEFAULTS[key])
else:
setattr(self, key, str(value))
config = Config()

@ -9,19 +9,19 @@ console = Console()
def print_markdown(text):
"""Prints a rich info message. Support Markdown syntax."""
"""Prints a rich info message. Support Markdown syntax."""
md = Padding(Markdown(text), 2)
console.print(md)
md = Padding(Markdown(text), 2)
console.print(md)
def print_step(text):
"""Prints a rich info message."""
"""Prints a rich info message."""
panel = Panel(Text(text, justify="left"))
console.print(panel)
panel = Panel(Text(text, justify="left"))
console.print(panel)
def print_substep(text, style=""):
"""Prints a rich info message without the panelling."""
console.print(text, style=style)
"""Prints a rich info message without the panelling."""
console.print(text, style=style)

@ -9,43 +9,43 @@ from time import sleep
class Loader:
def __init__(self, desc="Loading...", end="Done!", timeout=0.1):
"""
A loader-like context manager
Args:
desc (str, optional): The loader's description. Defaults to "Loading...".
end (str, optional): Final print. Defaults to "Done!".
timeout (float, optional): Sleep time between prints. Defaults to 0.1.
"""
self.desc = desc
self.end = end
self.timeout = timeout
self._thread = Thread(target=self._animate, daemon=True)
self.steps = ["", "", "", "", "", "", "", ""]
self.done = False
def start(self):
self._thread.start()
return self
def _animate(self):
for c in cycle(self.steps):
if self.done:
break
print(f"\r{self.desc} {c}", flush=True, end="")
sleep(self.timeout)
def __enter__(self):
self.start()
def stop(self):
self.done = True
cols = get_terminal_size((80, 20)).columns
print("\r" + " " * cols, end="", flush=True)
print(f"\r{self.end}", flush=True)
def __exit__(self, exc_type, exc_value, tb):
# handle exceptions with those variables ^
self.stop()
def __init__(self, desc="Loading...", end="Done!", timeout=0.1):
"""
A loader-like context manager
Args:
desc (str, optional): The loader's description. Defaults to "Loading...".
end (str, optional): Final print. Defaults to "Done!".
timeout (float, optional): Sleep time between prints. Defaults to 0.1.
"""
self.desc = desc
self.end = end
self.timeout = timeout
self._thread = Thread(target=self._animate, daemon=True)
self.steps = ["", "", "", "", "", "", "", ""]
self.done = False
def start(self):
self._thread.start()
return self
def _animate(self):
for c in cycle(self.steps):
if self.done:
break
print(f"\r{self.desc} {c}", flush=True, end="")
sleep(self.timeout)
def __enter__(self):
self.start()
def stop(self):
self.done = True
cols = get_terminal_size((80, 20)).columns
print("\r" + " " * cols, end="", flush=True)
print(f"\r{self.end}", flush=True)
def __exit__(self, exc_type, exc_value, tb):
# handle exceptions with those variables ^
self.stop()

@ -5,28 +5,28 @@ from utils.console import print_substep
def get_subreddit_undone(submissions: List, subreddit):
"""
recursively checks if the top submission in the list was already done.
"""
with open("./video_creation/data/videos.json", "r") as done_vids_raw:
done_videos = json.load(done_vids_raw)
for submission in submissions:
if already_done(done_videos, submission):
continue
if submission.over_18:
if getenv("ALLOW_NSFW").casefold() == "false":
print_substep("NSFW Post Detected. Skipping...")
continue
return submission
print('all submissions have been done going by top submission order')
return get_subreddit_undone(
subreddit.top(time_filter="hour"), subreddit
) # all of the videos in hot have already been done
"""
recursively checks if the top submission in the list was already done.
"""
with open("./video_creation/data/videos.json", "r") as done_vids_raw:
done_videos = json.load(done_vids_raw)
for submission in submissions:
if already_done(done_videos, submission):
continue
if submission.over_18:
if getenv("ALLOW_NSFW").casefold() == "false":
print_substep("NSFW Post Detected. Skipping...")
continue
return submission
print('all submissions have been done going by top submission order')
return get_subreddit_undone(
subreddit.top(time_filter="hour"), subreddit
) # all of the videos in hot have already been done
def already_done(done_videos: list, submission):
for video in done_videos:
if video["id"] == str(submission):
return True
return False
for video in done_videos:
if video["id"] == str(submission):
return True
return False

@ -5,19 +5,19 @@ from utils.console import print_step
def check_done(
redditobj,
redditobj,
): # don't set this to be run anyplace that isn't subreddit.py bc of inspect stack
"""params:
reddit_object: The Reddit Object you received in askreddit.py"""
with open("./video_creation/data/videos.json", "r") as done_vids_raw:
done_videos = json.load(done_vids_raw)
for video in done_videos:
if video["id"] == str(redditobj):
if getenv("POST_ID"):
print_step(
"You already have done this video but since it was declared specifically in the .env file the program will continue"
)
return redditobj
print_step("Getting new post as the current one has already been done")
return None
return redditobj
"""params:
reddit_object: The Reddit Object you received in askreddit.py"""
with open("./video_creation/data/videos.json", "r") as done_vids_raw:
done_videos = json.load(done_vids_raw)
for video in done_videos:
if video["id"] == str(redditobj):
if getenv("POST_ID"):
print_step(
"You already have done this video but since it was declared specifically in the .env file the program will continue"
)
return redditobj
print_step("Getting new post as the current one has already been done")
return None
return redditobj

@ -2,21 +2,21 @@ import re
def sanitize_text(text):
"""
Sanitizes the text for tts.
What gets removed:
- following characters`^_~@!&;#:-%“”‘"%*/{}[]()\|<>?=+`
- any http or https links
"""
"""
Sanitizes the text for tts.
What gets removed:
- following characters`^_~@!&;#:-%“”‘"%*/{}[]()\|<>?=+`
- any http or https links
"""
# remove any urls from the text
regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*"
# remove any urls from the text
regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*"
result = re.sub(regex_urls, " ", text)
result = re.sub(regex_urls, " ", text)
# note: not removing apostrophes
regex_expr = r"\s['|]|['|]\s|[\^_~@!&;#:\-%“”‘\"%\*/{}\[\]\(\)\\|<>=+]"
result = re.sub(regex_expr, " ", result)
# note: not removing apostrophes
regex_expr = r"\s['|]|['|]\s|[\^_~@!&;#:\-%“”‘\"%\*/{}\[\]\(\)\\|<>=+]"
result = re.sub(regex_expr, " ", result)
# remove extra whitespace
return " ".join(result.split())
# remove extra whitespace
return " ".join(result.split())

@ -9,55 +9,55 @@ from utils.console import print_step, print_substep
def get_start_and_end_times(video_length, length_of_clip):
random_time = randrange(180, int(length_of_clip) - int(video_length))
return random_time, random_time + video_length
random_time = randrange(180, int(length_of_clip) - int(video_length))
return random_time, random_time + video_length
def download_background():
"""Downloads the backgrounds/s video from YouTube."""
Path("./assets/backgrounds/").mkdir(parents=True, exist_ok=True)
background_options = [ # uri , filename , credit
("https://www.youtube.com/watch?v=n_Dv4JMiwK8", "parkour.mp4", "bbswitzer"),
# (
# "https://www.youtube.com/watch?v=2X9QGY__0II",
# "rocket_league.mp4",
# "Orbital Gameplay",
# ),
]
# note: make sure the file name doesn't include an - in it
if not len(listdir("./assets/backgrounds")) >= len(
background_options
): # if there are any background videos not installed
print_step(
"We need to download the backgrounds videos. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds videos... please be patient 🙏 ")
for uri, filename, credit in background_options:
if Path(f"assets/backgrounds/{credit}-{filename}").is_file():
continue # adds check to see if file exists before downloading
print_substep(f"Downloading {filename} from {uri}")
YouTube(uri).streams.filter(res="1080p").first().download(
"assets/backgrounds", filename=f"{credit}-{filename}"
)
print_substep(
"Background videos downloaded successfully! 🎉", style="bold green"
)
"""Downloads the backgrounds/s video from YouTube."""
Path("./assets/backgrounds/").mkdir(parents=True, exist_ok=True)
background_options = [ # uri , filename , credit
("https://www.youtube.com/watch?v=n_Dv4JMiwK8", "parkour.mp4", "bbswitzer"),
# (
# "https://www.youtube.com/watch?v=2X9QGY__0II",
# "rocket_league.mp4",
# "Orbital Gameplay",
# ),
]
# note: make sure the file name doesn't include an - in it
if not len(listdir("./assets/backgrounds")) >= len(
background_options
): # if there are any background videos not installed
print_step(
"We need to download the backgrounds videos. they are fairly large but it's only done once. 😎"
)
print_substep("Downloading the backgrounds videos... please be patient 🙏 ")
for uri, filename, credit in background_options:
if Path(f"assets/backgrounds/{credit}-{filename}").is_file():
continue # adds check to see if file exists before downloading
print_substep(f"Downloading {filename} from {uri}")
YouTube(uri).streams.filter(res="1080p").first().download(
"assets/backgrounds", filename=f"{credit}-{filename}"
)
print_substep(
"Background videos downloaded successfully! 🎉", style="bold green"
)
def chop_background_video(video_length):
print_step("Finding a spot in the backgrounds video to chop...✂️")
choice = random.choice(listdir("assets/backgrounds"))
environ["background_credit"] = choice.split("-")[0]
background = VideoFileClip(f"assets/backgrounds/{choice}")
start_time, end_time = get_start_and_end_times(video_length, background.duration)
ffmpeg_extract_subclip(
f"assets/backgrounds/{choice}",
start_time,
end_time,
targetname="assets/temp/background.mp4",
)
print_substep("Background video chopped successfully!", style="bold green")
return True
print_step("Finding a spot in the backgrounds video to chop...✂️")
choice = random.choice(listdir("assets/backgrounds"))
environ["background_credit"] = choice.split("-")[0]
background = VideoFileClip(f"assets/backgrounds/{choice}")
start_time, end_time = get_start_and_end_times(video_length, background.duration)
ffmpeg_extract_subclip(
f"assets/backgrounds/{choice}",
start_time,
end_time,
targetname="assets/temp/background.mp4",
)
print_substep("Background video chopped successfully!", style="bold green")
return True

@ -1,8 +1,8 @@
[
{
"name": "USER",
"value": "eyJwcmVmcyI6eyJ0b3BDb250ZW50RGlzbWlzc2FsVGltZSI6MCwiZ2xvYmFsVGhlbWUiOiJSRURESVQiLCJuaWdodG1vZGUiOnRydWUsImNvbGxhcHNlZFRyYXlTZWN0aW9ucyI6eyJmYXZvcml0ZXMiOmZhbHNlLCJtdWx0aXMiOmZhbHNlLCJtb2RlcmF0aW5nIjpmYWxzZSwic3Vic2NyaXB0aW9ucyI6ZmFsc2UsInByb2ZpbGVzIjpmYWxzZX0sInRvcENvbnRlbnRUaW1lc0Rpc21pc3NlZCI6MH19",
"domain": ".reddit.com",
"path": "/"
"name": "USER",
"value": "eyJwcmVmcyI6eyJ0b3BDb250ZW50RGlzbWlzc2FsVGltZSI6MCwiZ2xvYmFsVGhlbWUiOiJSRURESVQiLCJuaWdodG1vZGUiOnRydWUsImNvbGxhcHNlZFRyYXlTZWN0aW9ucyI6eyJmYXZvcml0ZXMiOmZhbHNlLCJtdWx0aXMiOmZhbHNlLCJtb2RlcmF0aW5nIjpmYWxzZSwic3Vic2NyaXB0aW9ucyI6ZmFsc2UsInByb2ZpbGVzIjpmYWxzZX0sInRvcENvbnRlbnRUaW1lc0Rpc21pc3NlZCI6MH19",
"domain": ".reddit.com",
"path": "/"
}
]

@ -1,14 +1,14 @@
[
{
"name": "USER",
"value": "eyJwcmVmcyI6eyJ0b3BDb250ZW50RGlzbWlzc2FsVGltZSI6MCwiZ2xvYmFsVGhlbWUiOiJSRURESVQiLCJuaWdodG1vZGUiOnRydWUsImNvbGxhcHNlZFRyYXlTZWN0aW9ucyI6eyJmYXZvcml0ZXMiOmZhbHNlLCJtdWx0aXMiOmZhbHNlLCJtb2RlcmF0aW5nIjpmYWxzZSwic3Vic2NyaXB0aW9ucyI6ZmFsc2UsInByb2ZpbGVzIjpmYWxzZX0sInRvcENvbnRlbnRUaW1lc0Rpc21pc3NlZCI6MH19",
"domain": ".reddit.com",
"path": "/"
"name": "USER",
"value": "eyJwcmVmcyI6eyJ0b3BDb250ZW50RGlzbWlzc2FsVGltZSI6MCwiZ2xvYmFsVGhlbWUiOiJSRURESVQiLCJuaWdodG1vZGUiOnRydWUsImNvbGxhcHNlZFRyYXlTZWN0aW9ucyI6eyJmYXZvcml0ZXMiOmZhbHNlLCJtdWx0aXMiOmZhbHNlLCJtb2RlcmF0aW5nIjpmYWxzZSwic3Vic2NyaXB0aW9ucyI6ZmFsc2UsInByb2ZpbGVzIjpmYWxzZX0sInRvcENvbnRlbnRUaW1lc0Rpc21pc3NlZCI6MH19",
"domain": ".reddit.com",
"path": "/"
},
{
"name": "eu_cookie",
"value": "{%22opted%22:true%2C%22nonessential%22:false}",
"domain": ".reddit.com",
"path": "/"
"name": "eu_cookie",
"value": "{%22opted%22:true%2C%22nonessential%22:false}",
"domain": ".reddit.com",
"path": "/"
}
]

@ -1,8 +1,8 @@
[
{
"name": "eu_cookie",
"value": "{%22opted%22:true%2C%22nonessential%22:false}",
"domain": ".reddit.com",
"path": "/"
"name": "eu_cookie",
"value": "{%22opted%22:true%2C%22nonessential%22:false}",
"domain": ".reddit.com",
"path": "/"
}
]

@ -5,13 +5,13 @@ import time
from os.path import exists
from moviepy.editor import (
VideoFileClip,
AudioFileClip,
ImageClip,
concatenate_videoclips,
concatenate_audioclips,
CompositeAudioClip,
CompositeVideoClip,
VideoFileClip,
AudioFileClip,
ImageClip,
concatenate_videoclips,
concatenate_audioclips,
CompositeAudioClip,
CompositeVideoClip,
)
from moviepy.video.io import ffmpeg_tools
from rich.console import Console
@ -26,132 +26,125 @@ W, H = 1080, 1920
def make_final_video(number_of_clips, length):
print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = os.getenv("OPACITY")
background_clip = (
VideoFileClip("assets/temp/background.mp4")
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
# Gather all audio clips
audio_clips = []
for i in range(0, number_of_clips):
audio_clips.append(AudioFileClip(f"assets/temp/mp3/{i}.mp3"))
audio_clips.insert(0, AudioFileClip(f"assets/temp/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
# Get sum of all clip lengths
total_length = sum([clip.duration for clip in audio_clips])
# round total_length to an integer
int_total_length = round(total_length)
# Output Length
console.log(f"[bold green] Video Will Be: {int_total_length} Seconds Long")
# add title to video
image_clips = []
# Gather all images
if (
opacity is None or float(opacity) >= 1
): # opacity not set or is set to one OR MORE
image_clips.insert(
0,
ImageClip(f"assets/temp/png/title.png")
.set_duration(audio_clips[0].duration)
.set_position("center")
.resize(width=W - 100)
.set_opacity(float(opacity)),
)
else:
image_clips.insert(
0,
ImageClip(f"assets/temp/png/title.png")
.set_duration(audio_clips[0].duration)
.set_position("center")
.resize(width=W - 100))
for i in range(0, number_of_clips):
if (
opacity is None or float(opacity) >= 1
): # opacity not set or is set to one OR MORE
image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
.set_position("center")
.resize(width=W - 100),
)
else:
image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
.set_position("center")
.resize(width=W - 100)
.set_opacity(float(opacity)),
)
# if os.path.exists("assets/mp3/posttext.mp3"):
# image_clips.insert(
# 0,
# ImageClip("assets/png/title.png")
# .set_duration(audio_clips[0].duration + audio_clips[1].duration)
# .set_position("center")
# .resize(width=W - 100)
# .set_opacity(float(opacity)),
# )
# else:
image_concat = concatenate_videoclips(image_clips).set_position(
("center", "center")
)
image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat])
def get_video_title() -> str:
title = os.getenv("VIDEO_TITLE") or "final_video"
if len(title) <= 35:
return title
else:
return title[0:30] + "..."
filename = f"{get_video_title()}.mp4"
def save_data():
with open("./video_creation/data/videos.json", "r+") as raw_vids:
done_vids = json.load(raw_vids)
if str(subreddit.submission.id) in [video["id"] for video in done_vids]:
return # video already done but was specified to continue anyway in the .env file
payload = {
"id": str(os.getenv("VIDEO_ID")),
"time": str(int(time.time())),
"background_credit": str(os.getenv("background_credit")),
"reddit_title": str(os.getenv("VIDEO_TITLE")),
"filename": filename,
}
done_vids.append(payload)
raw_vids.seek(0)
json.dump(done_vids, raw_vids, ensure_ascii=False, indent=4)
save_data()
if not exists("./results"):
print_substep("the results folder didn't exist so I made it")
os.mkdir("./results")
final.write_videofile(
"assets/temp/temp.mp4", fps=30, audio_codec="aac", audio_bitrate="192k"
)
ffmpeg_tools.ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 0, length, targetname=f"results/{filename}"
)
# os.remove("assets/temp/temp.mp4")
print_step("Removing temporary files 🗑")
cleanups = cleanup()
print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep(f"See result in the results folder!")
print_step(
f"Reddit title: {os.getenv('VIDEO_TITLE')} \n Background Credit: {os.getenv('background_credit')}"
)
print_step("Creating the final video 🎥")
VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = os.getenv("OPACITY")
background_clip = (
VideoFileClip("assets/temp/background.mp4")
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
# Gather all audio clips
audio_clips = []
for i in range(0, number_of_clips):
audio_clips.append(AudioFileClip(f"assets/temp/mp3/{i}.mp3"))
audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
# Get sum of all clip lengths
total_length = sum([clip.duration for clip in audio_clips])
# round total_length to an integer
int_total_length = round(total_length)
# Output Length
console.log(f"[bold green] Video Will Be: {int_total_length} Seconds Long")
# add title to video
image_clips = []
# Gather all images
if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE
image_clips.insert(
0,
ImageClip("assets/temp/png/title.png")
.set_duration(audio_clips[0].duration)
.set_position("center")
.resize(width=W - 100)
.set_opacity(float(opacity)),
)
else:
image_clips.insert(
0,
ImageClip("assets/temp/png/title.png")
.set_duration(audio_clips[0].duration)
.set_position("center")
.resize(width=W - 100),
)
for i in range(0, number_of_clips):
if opacity is None or float(opacity) >= 1: # opacity not set or is set to one OR MORE
image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
.set_position("center")
.resize(width=W - 100),
)
else:
image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png")
.set_duration(audio_clips[i + 1].duration)
.set_position("center")
.resize(width=W - 100)
.set_opacity(float(opacity)),
)
# if os.path.exists("assets/mp3/posttext.mp3"):
# image_clips.insert(
# 0,
# ImageClip("assets/png/title.png")
# .set_duration(audio_clips[0].duration + audio_clips[1].duration)
# .set_position("center")
# .resize(width=W - 100)
# .set_opacity(float(opacity)),
# )
# else:
image_concat = concatenate_videoclips(image_clips).set_position(("center", "center"))
image_concat.audio = audio_composite
final = CompositeVideoClip([background_clip, image_concat])
def get_video_title() -> str:
title = os.getenv("VIDEO_TITLE") or "final_video"
if len(title) <= 35:
return title
else:
return title[0:30] + "..."
filename = f"{get_video_title()}.mp4"
def save_data():
with open("./video_creation/data/videos.json", "r+") as raw_vids:
done_vids = json.load(raw_vids)
if str(subreddit.submission.id) in [video["id"] for video in done_vids]:
return # video already done but was specified to continue anyway in the .env file
payload = {
"id": str(os.getenv("VIDEO_ID")),
"time": str(int(time.time())),
"background_credit": str(os.getenv("background_credit")),
"reddit_title": str(os.getenv("VIDEO_TITLE")),
"filename": filename,
}
done_vids.append(payload)
raw_vids.seek(0)
json.dump(done_vids, raw_vids, ensure_ascii=False, indent=4)
save_data()
if not exists("./results"):
print_substep("the results folder didn't exist so I made it")
os.mkdir("./results")
final.write_videofile("assets/temp/temp.mp4", fps=30, audio_codec="aac", audio_bitrate="192k")
ffmpeg_tools.ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 0, length, targetname=f"results/{filename}"
)
# os.remove("assets/temp/temp.mp4")
print_step("Removing temporary files 🗑")
cleanups = cleanup()
print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!")
print_step(
f"Reddit title: {os.getenv('VIDEO_TITLE')} \n Background Credit: {os.getenv('background_credit')}"
)

@ -16,62 +16,62 @@ storymode = False
def download_screenshots_of_reddit_posts(reddit_object, screenshot_num):
"""Downloads screenshots of reddit posts as they are seen on the web.
Args:
reddit_object: The Reddit Object you received in askreddit.py
screenshot_num: The number of screenshots you want to download.
"""
print_step("Downloading screenshots of reddit posts...")
"""Downloads screenshots of reddit posts as they are seen on the web.
Args:
reddit_object: The Reddit Object you received in askreddit.py
screenshot_num: The number of screenshots you want to download.
"""
print_step("Downloading screenshots of reddit posts...")
# ! Make sure the reddit screenshots folder exists
Path("assets/temp/png").mkdir(parents=True, exist_ok=True)
# ! Make sure the reddit screenshots folder exists
Path("assets/temp/png").mkdir(parents=True, exist_ok=True)
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
with sync_playwright() as p:
print_substep("Launching Headless Browser...")
browser = p.chromium.launch()
context = browser.new_context()
browser = p.chromium.launch()
context = browser.new_context()
if getenv("THEME").upper() == "DARK":
cookie_file = open("./video_creation/data/cookie-dark-mode.json")
else:
cookie_file = open("./video_creation/data/cookie-light-mode.json")
cookies = json.load(cookie_file)
context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot
page = context.new_page()
page.goto(reddit_object["thread_url"])
page.set_viewport_size(ViewportSize(width=1920, height=1080))
if page.locator('[data-testid="content-gate"]').is_visible():
# This means the post is NSFW and requires to click the proceed button.
if getenv("THEME").upper() == "DARK":
cookie_file = open("./video_creation/data/cookie-dark-mode.json")
else:
cookie_file = open("./video_creation/data/cookie-light-mode.json")
cookies = json.load(cookie_file)
context.add_cookies(cookies) # load preference cookies
# Get the thread screenshot
page = context.new_page()
page.goto(reddit_object["thread_url"])
page.set_viewport_size(ViewportSize(width=1920, height=1080))
if page.locator('[data-testid="content-gate"]').is_visible():
# This means the post is NSFW and requires to click the proceed button.
print_substep("Post is NSFW. You are spicy...")
page.locator('[data-testid="content-gate"] button').click()
page.locator(
'[data-click-id="text"] button'
).click() # Remove "Click to see nsfw" Button in Screenshot
print_substep("Post is NSFW. You are spicy...")
page.locator('[data-testid="content-gate"] button').click()
page.locator(
'[data-click-id="text"] button'
).click() # Remove "Click to see nsfw" Button in Screenshot
page.locator('[data-test-id="post-content"]').screenshot(
path="assets/temp/png/title.png"
)
if storymode:
page.locator('[data-click-id="text"]').screenshot(
path="assets/temp/png/story_content.png"
)
else:
for idx, comment in track(
enumerate(reddit_object["comments"]), "Downloading screenshots..."
):
page.locator('[data-test-id="post-content"]').screenshot(
path="assets/temp/png/title.png"
)
if storymode:
page.locator('[data-click-id="text"]').screenshot(
path="assets/temp/png/story_content.png"
)
else:
for idx, comment in track(
enumerate(reddit_object["comments"]), "Downloading screenshots..."
):
# Stop if we have reached the screenshot_num
if idx >= screenshot_num:
break
# Stop if we have reached the screenshot_num
if idx >= screenshot_num:
break
if page.locator('[data-testid="content-gate"]').is_visible():
page.locator('[data-testid="content-gate"] button').click()
if page.locator('[data-testid="content-gate"]').is_visible():
page.locator('[data-testid="content-gate"] button').click()
page.goto(f'https://reddit.com{comment["comment_url"]}')
page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/png/comment_{idx}.png"
)
print_substep("Screenshots downloaded Successfully.", style="bold green")
page.goto(f'https://reddit.com{comment["comment_url"]}')
page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/png/comment_{idx}.png"
)
print_substep("Screenshots downloaded Successfully.", style="bold green")

@ -10,68 +10,70 @@ from rich.progress import track
from TTS.swapper import TTS
console = Console()
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
console = Console()
VIDEO_LENGTH: int = 40 # secs
def save_text_to_mp3(reddit_obj):
"""Saves Text to MP3 files.
Args:
reddit_obj : The reddit object you received from the reddit API in the askreddit.py file.
"""
print_step("Saving Text to MP3 files...")
length = 0
"""Saves Text to MP3 files.
Args:
reddit_obj : The reddit object you received from the reddit API in the askreddit.py file.
"""
print_step("Saving Text to MP3 files...")
length = 0
# Create a folder for the mp3 files.
Path("assets/temp/mp3").mkdir(parents=True, exist_ok=True)
TextToSpeech = TTS()
TextToSpeech.tts(
sanitize_text(reddit_obj["thread_title"]),
filename=f"assets/temp/mp3/title.mp3",
random_speaker=False,
)
try:
length += MP3(f"assets/temp/mp3/title.mp3").info.length
except HeaderNotFoundError: # note to self AudioFileClip
length += sox.file_info.duration(f"assets/temp/mp3/title.mp3")
if getenv("STORYMODE").casefold() == "true":
TextToSpeech.tts(
sanitize_text(reddit_obj["thread_content"]),
filename=f"assets/temp/mp3/story_content.mp3",
random_speaker=False,
)
# 'story_content'
com = 0
for comment in track((reddit_obj["comments"]), "Saving..."):
# ! Stop creating mp3 files if the length is greater than VIDEO_LENGTH seconds. This can be longer, but this is just a good_voices starting point
if length > VIDEO_LENGTH:
break
# Create a folder for the mp3 files.
Path("assets/temp/mp3").mkdir(parents=True, exist_ok=True)
TextToSpeech = TTS()
TextToSpeech.tts(
sanitize_text(reddit_obj["thread_title"]),
filename="assets/temp/mp3/title.mp3",
random_speaker=False,
)
try:
length += MP3("assets/temp/mp3/title.mp3").info.length
except HeaderNotFoundError: # note to self AudioFileClip
length += sox.file_info.duration("assets/temp/mp3/title.mp3")
if getenv("STORYMODE").casefold() == "true":
TextToSpeech.tts(
sanitize_text(reddit_obj["thread_content"]),
filename="assets/temp/mp3/story_content.mp3",
random_speaker=False,
)
# 'story_content'
com = 0
for comment in track((reddit_obj["comments"]), "Saving..."):
# ! Stop creating mp3 files if the length is greater than VIDEO_LENGTH seconds. This can be longer
# but this is just a good_voices starting point
if length > VIDEO_LENGTH:
break
TextToSpeech.tts(
sanitize_text(comment["comment_body"]),
filename=f"assets/temp/mp3/{com}.mp3",
random_speaker=False,
)
try:
length += MP3(f"assets/temp/mp3/{com}.mp3").info.length
com += 1
except (HeaderNotFoundError, MutagenError, Exception):
try:
length += sox.file_info.duration(f"assets/temp/mp3/{com}.mp3")
com += 1
except (OSError, IOError):
print(
"would have removed"
f"assets/temp/mp3/{com}.mp3"
f"assets/temp/png/comment_{com}.png"
)
# remove(f"assets/temp/mp3/{com}.mp3")
# remove(f"assets/temp/png/comment_{com}.png")# todo might cause odd un-syncing
TextToSpeech.tts(
sanitize_text(comment["comment_body"]),
filename=f"assets/temp/mp3/{com}.mp3",
random_speaker=False,
)
try:
length += MP3(f"assets/temp/mp3/{com}.mp3").info.length
com += 1
except (HeaderNotFoundError, MutagenError, Exception):
try:
length += sox.file_info.duration(f"assets/temp/mp3/{com}.mp3")
com += 1
except (OSError, IOError):
print(
"would have removed"
f"assets/temp/mp3/{com}.mp3"
f"assets/temp/png/comment_{com}.png"
)
# remove(f"assets/temp/mp3/{com}.mp3")
# remove(f"assets/temp/png/comment_{com}.png")# todo might cause odd un-syncing
print_substep("Saved Text to MP3 files Successfully.", style="bold green")
# ! Return the index, so we know how many screenshots of comments we need to make.
return length, com
print_substep("Saved Text to MP3 files Successfully.", style="bold green")
# ! Return the index, so we know how many screenshots of comments we need to make.
return length, com

Loading…
Cancel
Save