cherry-picked async api tts

pull/958/head
Drugsosos 3 years ago
parent c617af98ce
commit c3c90cfd90
No known key found for this signature in database
GPG Key ID: 8E35176FE617E28D

@ -1,23 +1,19 @@
#!/usr/bin/env python3
import random
from utils import settings
from gtts import gTTS
max_chars = 0
class GTTS:
def __init__(self):
self.max_chars = 0
self.voices = []
max_chars = 0
def run(self, text, filepath):
@staticmethod
async def run(
text,
filepath
) -> None:
tts = gTTS(
text=text,
lang=settings.config["reddit"]["thread"]["post_lang"] or "en",
slow=False,
)
tts.save(filepath)
def randomvoice(self):
return random.choice(self.voices)

@ -1,101 +1,101 @@
import base64
from aiohttp import ClientSession
from utils import settings
import random
import requests
from requests.adapters import HTTPAdapter, Retry
from random import choice
from attr import attrs, attrib
from attr.validators import instance_of
from TTS.common import BaseApiTTS, get_random_voice
# TTS examples: https://twitter.com/scanlime/status/1512598559769702406
# from profanity_filter import ProfanityFilter
# pf = ProfanityFilter()
# Code by @JasonLovesDoggo
# https://twitter.com/scanlime/status/1512598559769702406
voices = dict()
nonhuman = [ # DISNEY VOICES
"en_us_ghostface", # Ghost Face
"en_us_chewbacca", # Chewbacca
"en_us_c3po", # C3PO
"en_us_stitch", # Stitch
"en_us_stormtrooper", # Stormtrooper
"en_us_rocket", # Rocket
voices['nonhuman'] = [ # DISNEY VOICES
'en_us_ghostface', # Ghost Face
'en_us_chewbacca', # Chewbacca
'en_us_c3po', # C3PO
'en_us_stitch', # Stitch
'en_us_stormtrooper', # Stormtrooper
'en_us_rocket', # Rocket
# ENGLISH VOICES
]
human = [
"en_au_001", # English AU - Female
"en_au_002", # English AU - Male
"en_uk_001", # English UK - Male 1
"en_uk_003", # English UK - Male 2
"en_us_001", # English US - Female (Int. 1)
"en_us_002", # English US - Female (Int. 2)
"en_us_006", # English US - Male 1
"en_us_007", # English US - Male 2
"en_us_009", # English US - Male 3
"en_us_010",
voices['human'] = [
'en_au_001', # English AU - Female
'en_au_002', # English AU - Male
'en_uk_001', # English UK - Male 1
'en_uk_003', # English UK - Male 2
'en_us_001', # English US - Female (Int. 1)
'en_us_002', # English US - Female (Int. 2)
'en_us_006', # English US - Male 1
'en_us_007', # English US - Male 2
'en_us_009', # English US - Male 3
'en_us_010',
]
voices = nonhuman + human
noneng = [
"fr_001", # French - Male 1
"fr_002", # French - Male 2
"de_001", # German - Female
"de_002", # German - Male
"es_002", # Spanish - Male
voices['non_eng'] = [
'fr_001', # French - Male 1
'fr_002', # French - Male 2
'de_001', # German - Female
'de_002', # German - Male
'es_002', # Spanish - Male
# AMERICA VOICES
"es_mx_002", # Spanish MX - Male
"br_001", # Portuguese BR - Female 1
"br_003", # Portuguese BR - Female 2
"br_004", # Portuguese BR - Female 3
"br_005", # Portuguese BR - Male
'es_mx_002', # Spanish MX - Male
'br_001', # Portuguese BR - Female 1
'br_003', # Portuguese BR - Female 2
'br_004', # Portuguese BR - Female 3
'br_005', # Portuguese BR - Male
# ASIA VOICES
"id_001", # Indonesian - Female
"jp_001", # Japanese - Female 1
"jp_003", # Japanese - Female 2
"jp_005", # Japanese - Female 3
"jp_006", # Japanese - Male
"kr_002", # Korean - Male 1
"kr_003", # Korean - Female
"kr_004", # Korean - Male 2
'id_001', # Indonesian - Female
'jp_001', # Japanese - Female 1
'jp_003', # Japanese - Female 2
'jp_005', # Japanese - Female 3
'jp_006', # Japanese - Male
'kr_002', # Korean - Male 1
'kr_003', # Korean - Female
'kr_004', # Korean - Male 2
]
# good_voices = {'good': ['en_us_002', 'en_us_006'],
# 'ok': ['en_au_002', 'en_uk_001']} # less en_us_stormtrooper more less en_us_rocket en_us_ghostface
# good_voices: 'en_us_002', 'en_us_006'
# ok: 'en_au_002', 'en_uk_001'
# less: en_us_stormtrooper
# more or less: en_us_rocket, en_us_ghostface
class TikTok: # TikTok Text-to-Speech Wrapper
def __init__(self):
self.URI_BASE = (
"https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker="
)
self.max_chars = 300
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng}
def run(self, text, filepath, random_voice: bool = False):
# if censor:
# req_text = pf.censor(req_text)
# pass
voice = (
self.randomvoice()
if random_voice
else (
settings.config["settings"]["tts"]["tiktok_voice"]
or random.choice(self.voices["human"])
@attrs(auto_attribs=True)
class TikTok(BaseApiTTS): # TikTok Text-to-Speech Wrapper
client: ClientSession = attrib(
validator=instance_of(ClientSession),
)
random_voice: bool = False
uri_base: str = attrib(
default='https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke',
kw_only=True,
)
try:
r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
# print(r.text)
vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr)
max_chars = 300
decode_base64 = True
with open(filepath, "wb") as out:
out.write(b64d)
def __attrs_post_init__(self):
self.voice = (
get_random_voice(voices, 'human')
if self.random_voice
else str(settings.config['settings']['tts']['tiktok_voice']).lower()
if str(settings.config['settings']['tts']['tiktok_voice']).lower() in [
voice.lower() for dict_title in voices for voice in voices[dict_title]]
else get_random_voice(voices, 'human')
)
def randomvoice(self):
return random.choice(self.voices["human"])
async def make_request(
self,
text: str,
):
return await self.client.post(
f'{self.uri_base}',
params={
'text_speaker': self.voice,
'req_text': text,
'speaker_map_type': 0,
}
)

@ -1,45 +1,52 @@
#!/usr/bin/env python3
from boto3 import Session
from botocore.exceptions import BotoCoreError, ClientError
import sys
from utils import settings
import random
from attr import attrs
from TTS.common import get_random_voice
voices = [
"Brian",
"Emma",
"Russell",
"Joey",
"Matthew",
"Joanna",
"Kimberly",
"Amy",
"Geraint",
"Nicole",
"Justin",
"Ivy",
"Kendra",
"Salli",
"Raveena",
'Brian',
'Emma',
'Russell',
'Joey',
'Matthew',
'Joanna',
'Kimberly',
'Amy',
'Geraint',
'Nicole',
'Justin',
'Ivy',
'Kendra',
'Salli',
'Raveena',
]
@attrs(auto_attribs=True)
class AWSPolly:
def __init__(self):
self.max_chars = 0
self.voices = voices
random_voice: bool = False
max_chars: int = 0
def run(self, text, filepath, random_voice: bool = False):
session = Session(profile_name="polly")
async def run(
self,
text,
filepath,
):
session = Session(profile_name='polly')
polly = session.client("polly")
if random_voice:
voice = self.randomvoice()
else:
if not settings.config["settings"]["tts"]["aws_polly_voice"]:
return ValueError(
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}"
voice = (
get_random_voice(voices)
if self.random_voice
else str(settings.config['settings']['tts']['aws_polly_voice']).capitalize()
if str(settings.config['settings']['tts']['aws_polly_voice']).lower() in [voice.lower() for voice in voices]
else get_random_voice(voices)
)
voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize()
try:
# Request speech synthesis
response = polly.synthesize_speech(
@ -51,7 +58,7 @@ class AWSPolly:
sys.exit(-1)
# Access the audio stream from the response
if "AudioStream" in response:
if 'AudioStream' in response:
file = open(filepath, "wb")
file.write(response["AudioStream"].read())
file.close()
@ -59,8 +66,5 @@ class AWSPolly:
else:
# The response didn't contain audio data, exit gracefully
print("Could not stream audio")
print('Could not stream audio')
sys.exit(-1)
def randomvoice(self):
return random.choice(self.voices)

@ -0,0 +1,70 @@
from aiofiles import open
import base64
from random import choice
from typing import Union, Optional
class BaseApiTTS:
max_chars: int
decode_base64: bool = False
@staticmethod
def text_len_sanitize(
text: str,
max_length: int,
) -> list:
# Split by comma or dot (else you can lose intonations), if there is non, split by groups of 299 chars
if '.' in text and all([split_text.__len__() < max_length for split_text in text.split('.')]):
return text.split('.')
if ',' in text and all([split_text.__len__() < max_length for split_text in text.split(',')]):
return text.split(',')
return [text[i:i + max_length] for i in range(0, len(text), max_length)]
async def write_file(
self,
output_text: str,
filename: str,
) -> None:
decoded_text = base64.b64decode(output_text) if self.decode_base64 else output_text
async with open(filename, 'wb') as out:
await out.write(decoded_text)
async def run(
self,
req_text: str,
filename: str,
) -> None:
output_text = ''
if len(req_text) > self.max_chars:
for part in self.text_len_sanitize(req_text, self.max_chars):
if part:
output_text += await self.make_request(part)
else:
output_text = await self.make_request(req_text)
await self.write_file(output_text, filename)
def get_random_voice(
voices: Union[list, dict],
key: Optional[str] = None,
) -> str:
if isinstance(voices, list):
return choice(voices)
else:
return choice(voices[key])
def audio_length(
path: str,
) -> float | int:
from mutagen.mp3 import MP3
try:
audio = MP3(path)
return audio.info.length
except Exception as e: # TODO add logging
return 0

@ -1,23 +1,19 @@
#!/usr/bin/env python3
from pathlib import Path
from typing import Tuple
import re
from asyncio import as_completed
# import sox
# from mutagen import MutagenError
# from mutagen.mp3 import MP3, HeaderNotFoundError
from pathlib import Path
import translators as ts
from rich.progress import track
from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips
from attr import attrs, attrib
from utils.console import print_step, print_substep
from utils.voice import sanitize_text
from utils import settings
DEFUALT_MAX_LENGTH: int = 50 # video length variable
from TTS.common import audio_length
@attrs(auto_attribs=True)
class TTSEngine:
"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.
Args:
@ -29,94 +25,81 @@ class TTSEngine:
Notes:
tts_module must take the arguments text and filepath.
"""
tts_module: object
reddit_object: dict
path: str = 'assets/temp/mp3'
max_length: int = 50 # TODO move to config
__total_length: int = attrib(
default=0,
kw_only=True
)
def __init__(
self,
tts_module,
reddit_object: dict,
path: str = "assets/temp/mp3",
max_length: int = DEFUALT_MAX_LENGTH,
):
self.tts_module = tts_module()
self.reddit_object = reddit_object
self.path = path
self.max_length = max_length
self.length = 0
def run(self) -> Tuple[int, int]:
async def run(
self
) -> list:
Path(self.path).mkdir(parents=True, exist_ok=True)
# This file needs to be removed in case this post does not use post text, so that it wont appear in the final video
# This file needs to be removed in case this post does not use post text
# so that it won't appear in the final video
try:
Path(f"{self.path}/posttext.mp3").unlink()
Path(f'{self.path}/posttext.mp3').unlink()
except OSError:
pass
print_step("Saving Text to MP3 files...")
print_step('Saving Text to MP3 files...')
self.call_tts("title", self.reddit_object["thread_title"])
if (
self.reddit_object["thread_post"] != ""
and settings.config["settings"]["storymode"] == True
):
self.call_tts("posttext", self.reddit_object["thread_post"])
idx = None
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
# ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length:
break
if not self.tts_module.max_chars:
self.call_tts(f"{idx}", comment["comment_body"])
else:
self.split_post(comment["comment_body"], idx)
print_substep("Saved Text to MP3 files successfully.", style="bold green")
return self.length, idx
def split_post(self, text: str, idx: int):
split_files = []
split_text = [
x.group().strip()
for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text)
]
await self.call_tts('title', self.reddit_object['thread_title'])
async_tasks_offset = 1
idy = None
for idy, text_cut in enumerate(split_text):
# print(f"{idx}-{idy}: {text_cut}\n")
self.call_tts(f"{idx}-{idy}.part", text_cut)
split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy}.part.mp3"))
CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile(
f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None
)
if self.reddit_object['thread_post'] and settings.config['settings']['storymode']:
await self.call_tts('posttext', self.reddit_object['thread_post'])
async_tasks_offset += 1
for i in split_files:
name = i.filename
i.close()
Path(name).unlink()
async_tasks_primary = [
self.call_tts(str(idx), comment['comment_body'])
for idx, comment in enumerate(self.reddit_object['comments'])
]
# for i in range(0, idy + 1):
# print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3")
for task in track(
as_completed(async_tasks_primary),
description='Saving...',
total=async_tasks_primary.__len__()
):
await task
# Path(f"{self.path}/{idx}-{i}.part.mp3").unlink()
print_substep('Saved Text to MP3 files successfully.', style='bold green')
return [
comments for comments, condition in
zip(self.reddit_object['comments'], async_tasks_primary[async_tasks_offset:])
if condition
]
async def call_tts(
self,
filename: str,
text: str
) -> bool:
await self.tts_module.run(
text=self.process_text(text),
filepath=f'{self.path}/{filename}.mp3'
)
def call_tts(self, filename: str, text: str):
self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.length += clip.duration
clip.close()
clip_length = audio_length(f'assets/audio/{filename}.mp3')
if self.__total_length + clip_length <= self.max_length:
self.max_length += clip_length
return True
return False
def process_text(text: str):
lang = settings.config["reddit"]["thread"]["post_lang"]
@staticmethod
def process_text(
text: str,
) -> str:
lang = settings.config['reddit']['thread']['post_lang']
new_text = sanitize_text(text)
if lang:
print_substep("Translating Text...")
print_substep('Translating Text...')
translated_text = ts.google(text, to_language=lang)
new_text = sanitize_text(translated_text)
return new_text

@ -1,62 +1,71 @@
import random
import requests
from requests.exceptions import JSONDecodeError
from aiohttp import ClientSession
from random import choice
from utils import settings
from utils.voice import check_ratelimit
from attr import attrs, attrib
from attr.validators import instance_of
from TTS.common import BaseApiTTS, get_random_voice
voices = [
"Brian",
"Emma",
"Russell",
"Joey",
"Matthew",
"Joanna",
"Kimberly",
"Amy",
"Geraint",
"Nicole",
"Justin",
"Ivy",
"Kendra",
"Salli",
"Raveena",
'Brian',
'Emma',
'Russell',
'Joey',
'Matthew',
'Joanna',
'Kimberly',
'Amy',
'Geraint',
'Nicole',
'Justin',
'Ivy',
'Kendra',
'Salli',
'Raveena',
]
# valid voices https://lazypy.ro/tts/
class StreamlabsPolly:
def __init__(self):
self.url = "https://streamlabs.com/polly/speak"
self.max_chars = 550
self.voices = voices
@attrs(auto_attribs=True)
class StreamlabsPolly(BaseApiTTS):
client: ClientSession = attrib(
validator=instance_of(ClientSession),
)
random_voice: bool = False
url: str = attrib(
default='https://streamlabs.com/polly/speak',
kw_only=True,
)
def run(self, text, filepath, random_voice: bool = False):
if random_voice:
voice = self.randomvoice()
else:
if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]:
return ValueError(
f"Please set the config variable STREAMLABS_VOICE to a valid voice. options are: {voices}"
max_chars = 550
async def make_request(
self,
text: str,
):
voice = (
get_random_voice(voices)
if self.random_voice
else str(settings.config['settings']['tts']['streamlabs_polly_voice']).capitalize()
if str(settings.config['settings']['tts']['streamlabs_polly_voice']).lower() in [
voice.lower() for voice in voices]
else get_random_voice(voices)
)
response = await self.client.post(
self.url,
data={
'voice': voice,
'text': text,
'service': 'polly',
}
)
voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize()
body = {"voice": voice, "text": text, "service": "polly"}
response = requests.post(self.url, data=body)
if not check_ratelimit(response):
self.run(text, filepath, random_voice)
else:
try:
voice_data = requests.get(response.json()["speak_url"])
with open(filepath, "wb") as f:
f.write(voice_data.content)
except (KeyError, JSONDecodeError):
try:
if response.json()["error"] == "No text specified!":
raise ValueError("Please specify a text to convert to speech.")
except (KeyError, JSONDecodeError):
print("Error occurred calling Streamlabs Polly")
def randomvoice(self):
return random.choice(self.voices)
speak_url = await(
await response.json()
)['speak_url']
return await self.client.get(speak_url)

@ -1,5 +1,5 @@
#!/usr/bin/env python
import math
from asyncio import run
from subprocess import Popen
from os import name
from reddit.subreddit import get_subreddit_threads
@ -9,8 +9,6 @@ from utils import settings
# from utils.checker import envUpdate
from video_creation.background import (
download_background,
chop_background_video,
get_background_config,
)
from video_creation.final_video import make_final_video
@ -35,24 +33,21 @@ print_markdown(
print_step(f"You are using V{VERSION} of the bot")
def main(POST_ID=None):
async def main(POST_ID=None):
cleanup()
reddit_object = get_subreddit_threads(POST_ID)
length, number_of_comments = save_text_to_mp3(reddit_object)
length = math.ceil(length)
download_screenshots_of_reddit_posts(reddit_object, number_of_comments)
comments_created = await save_text_to_mp3(reddit_object)
download_screenshots_of_reddit_posts(reddit_object, comments_created)
bg_config = get_background_config()
download_background(bg_config)
chop_background_video(bg_config, length)
make_final_video(number_of_comments, length, reddit_object, bg_config)
make_final_video(comments_created, reddit_object, bg_config)
def run_many(times):
async def run_many(times):
for x in range(1, times + 1):
print_step(
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th","th", "th")[x%10]} iteration of {times}'
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}'
) # correct 1st 2nd 3rd 4th 5th....
main()
await main()
Popen("cls" if name == "nt" else "clear", shell=True).wait()
@ -61,15 +56,19 @@ if __name__ == "__main__":
config is False and exit()
try:
if config["settings"]["times_to_run"]:
run(
run_many(config["settings"]["times_to_run"])
)
elif len(config["reddit"]["thread"]["post_id"].split("+")) > 1:
for index, post_id in enumerate(config["reddit"]["thread"]["post_id"].split("+")):
index += 1
print_step(
f'on the {index}{("st" if index%10 == 1 else ("nd" if index%10 == 2 else ("rd" if index%10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'
f'on the {index}{("st" if index % 10 == 1 else ("nd" if index % 10 == 2 else ("rd" if index % 10 == 3 else "th")))} post of {len(config["reddit"]["thread"]["post_id"].split("+"))}'
)
run(
main(post_id)
)
Popen("cls" if name == "nt" else "clear", shell=True).wait()
else:
main()

@ -9,3 +9,7 @@ requests==2.28.1
rich==12.4.4
toml==0.10.2
translators==5.3.1
attrs==21.4.0
aiohttp==3.8.1 # There is security warning for <=3.8.1, no fixes for now
aiofiles==0.8.0
mutagen==1.45.1

@ -10,7 +10,9 @@ if sys.version_info[0] >= 3:
from datetime import timezone
def check_ratelimit(response: Response):
def check_ratelimit(
response: Response,
):
"""
Checks if the response is a ratelimit response.
If it is, it sleeps for the time specified in the response.

@ -3,7 +3,7 @@ import multiprocessing
import os
import re
from os.path import exists
from typing import Dict, Tuple, Any
from typing import Tuple, Any
import translators as ts
@ -13,7 +13,6 @@ from moviepy.editor import (
ImageClip,
concatenate_videoclips,
concatenate_audioclips,
CompositeAudioClip,
CompositeVideoClip,
)
from moviepy.video.io.ffmpeg_tools import ffmpeg_merge_video_audio, ffmpeg_extract_subclip
@ -23,24 +22,26 @@ from utils.cleanup import cleanup
from utils.console import print_step, print_substep
from utils.videos import save_data
from utils import settings
from video_creation.background import download_background, chop_background_video
console = Console()
W, H = 1080, 1920
W, H = 1080, 1920 # TODO move to config
def name_normalize(name: str) -> str:
def name_normalize(
name: str
) -> str:
name = re.sub(r'[?\\"%*:|<>]', "", name)
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name)
name = re.sub(r"( [w,W]\s?\/)", r" with", name)
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name)
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name)
name = re.sub(r"\/", r"", name)
name = re.sub(r'( [w,W]\s?\/\s?[o,O,0])', r' without', name)
name = re.sub(r'( [w,W]\s?\/)', r' with', name)
name = re.sub(r'(\d+)\s?\/\s?(\d+)', r'\1 of \2', name)
name = re.sub(r'(\w+)\s?\/\s?(\w+)', r'\1 or \2', name)
name = re.sub(r'\/', '', name)
lang = settings.config["reddit"]["thread"]["post_lang"]
lang = settings.config['reddit']['thread']['post_lang']
if lang:
print_substep("Translating filename...")
print_substep('Translating filename...')
translated_name = ts.google(name, to_language=lang)
return translated_name
@ -49,48 +50,46 @@ def name_normalize(name: str) -> str:
def make_final_video(
number_of_clips: int, length: int, reddit_obj: dict, background_config: Tuple[str, str, str, Any]
):
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
indexes_of_clips: list,
reddit_obj: dict,
background_config: Tuple[str, str, str, Any],
) -> None:
"""
Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args:
number_of_clips (int): Index to end at when going through the screenshots'
length (int): Length of the video
indexes_of_clips (list): Indexes of voiced comments
reddit_obj (dict): The reddit object that contains the posts to read.
background_config (Tuple[str, str, str, Any]): The background config to use.
"""
print_step("Creating the final video 🎥")
print_step('Creating the final video 🎥')
VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = settings.config["settings"]["opacity"]
background_clip = (
VideoFileClip("assets/temp/background.mp4")
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
opacity = settings.config['settings']['opacity']
final_length = 0
# Gather all audio clips
audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)]
audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3"))
audio_concat = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
audio_clips = [AudioFileClip(f'assets/temp/mp3/{i}.mp3') for i in indexes_of_clips]
audio_clips.insert(0, AudioFileClip('assets/temp/mp3/title.mp3'))
audio_composite = concatenate_audioclips(audio_clips)
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
console.log(f'[bold green] Video Will Be: {audio_composite.length} Seconds Long')
# add title to video
image_clips = []
# Gather all images
new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity)
image_clips.insert(
0,
ImageClip("assets/temp/png/title.png")
ImageClip('assets/temp/png/title.png')
.set_duration(audio_clips[0].duration)
.resize(width=W - 100)
.set_opacity(new_opacity),
)
for i in range(0, number_of_clips):
for i in indexes_of_clips:
image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png")
ImageClip(f'assets/temp/png/comment_{i}.png')
.set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100)
.set_opacity(new_opacity)
@ -109,63 +108,73 @@ def make_final_video(
img_clip_pos = background_config[3]
image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos)
image_concat.audio = audio_composite
download_background(background_config)
chop_background_video(background_config, final_length)
background_clip = (
VideoFileClip("assets/temp/background.mp4")
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
title = re.sub(r'[^\w\s-]', '', reddit_obj['thread_title'])
idx = re.sub(r'[^\w\s-]', '', reddit_obj['thread_id'])
filename = f"{name_normalize(title)}.mp4"
subreddit = settings.config["reddit"]["thread"]["subreddit"]
filename = f'{name_normalize(title)}.mp4'
subreddit = settings.config['reddit']['thread']['subreddit']
save_data(subreddit, filename, title, idx, background_config[2])
if not exists(f"./results/{subreddit}"):
print_substep("The results folder didn't exist so I made it")
os.makedirs(f"./results/{subreddit}")
if not exists(f'./results/{subreddit}'):
print_substep('The results folder didn\'t exist so I made it')
os.makedirs(f'./results/{subreddit}')
final.write_videofile(
"assets/temp/temp.mp4",
'assets/temp/temp.mp4',
fps=30,
audio_codec="aac",
audio_bitrate="192k",
audio_codec='aac',
audio_bitrate='192k',
verbose=False,
threads=multiprocessing.cpu_count(),
)
if settings.config["settings"]["background_audio"]:
print("[bold green] Merging background audio with video")
if not exists(f"assets/backgrounds/background.mp3"):
if settings.config['settings']['background_audio']:
print('[bold green] Merging background audio with video')
if not exists('assets/backgrounds/background.mp3'):
print_substep(
"Cannot find assets/backgrounds/background.mp3 audio file didn't so skipping."
'Cannot find assets/backgrounds/background.mp3 audio file didn\'t so skipping.'
)
ffmpeg_extract_subclip(
"assets/temp/temp.mp4",
'assets/temp/temp.mp4',
0,
final.duration,
targetname=f"results/{subreddit}/{filename}",
targetname=f'results/{subreddit}/{filename}',
)
else:
ffmpeg_merge_video_audio(
"assets/temp/temp.mp4",
"assets/backgrounds/background.mp3",
"assets/temp/temp_audio.mp4",
'assets/temp/temp.mp4',
'assets/backgrounds/background.mp3',
'assets/temp/temp_audio.mp4',
)
ffmpeg_extract_subclip( # check if this gets run
"assets/temp/temp_audio.mp4",
'assets/temp/temp_audio.mp4',
0,
final.duration,
targetname=f"results/{subreddit}/{filename}",
)
else:
print("debug duck")
print('debug duck')
ffmpeg_extract_subclip(
"assets/temp/temp.mp4",
'assets/temp/temp.mp4',
0,
final.duration,
targetname=f"results/{subreddit}/{filename}",
targetname=f'results/{subreddit}/{filename}',
)
print_step("Removing temporary files 🗑")
print_step('Removing temporary files 🗑')
cleanups = cleanup()
print_substep(f"Removed {cleanups} temporary files 🗑")
print_substep("See result in the results folder!")
print_substep(f'Removed {cleanups} temporary files 🗑')
print_substep('See result in the results folder!')
print_step(
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'

@ -16,12 +16,12 @@ from utils.console import print_step, print_substep
storymode = False
def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
def download_screenshots_of_reddit_posts(reddit_object: dict, voiced_idx: list):
"""Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png
Args:
reddit_object (Dict): Reddit object received from reddit/subreddit.py
screenshot_num (int): Number of screenshots to download
voiced_idx (int): Indexes of voiced comments
"""
print_step("Downloading screenshots of reddit posts...")
@ -76,12 +76,11 @@ def download_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: in
path="assets/temp/png/story_content.png"
)
else:
for idx, comment in enumerate(
track(reddit_object["comments"], "Downloading screenshots...")
for idx in track(
screenshot_num,
"Downloading screenshots..."
):
# Stop if we have reached the screenshot_num
if idx >= screenshot_num:
break
comment = reddit_object["comments"][idx]
if page.locator('[data-testid="content-gate"]').is_visible():
page.locator('[data-testid="content-gate"] button').click()

@ -1,55 +1,50 @@
#!/usr/bin/env python
from typing import Dict, Tuple
from rich.console import Console
from TTS.engine_wrapper import TTSEngine
from TTS.GTTS import GTTS
from TTS.streamlabs_polly import StreamlabsPolly
from TTS.aws_polly import AWSPolly
from TTS.TikTok import TikTok
from utils import settings
from utils.console import print_table, print_step
console = Console()
TTSProviders = {
"GoogleTranslate": GTTS,
"AWSPolly": AWSPolly,
"StreamlabsPolly": StreamlabsPolly,
"TikTok": TikTok,
'GoogleTranslate': GTTS,
'AWSPolly': AWSPolly,
'StreamlabsPolly': StreamlabsPolly,
'TikTok': TikTok,
}
def save_text_to_mp3(reddit_obj) -> Tuple[int, int]:
async def save_text_to_mp3(
reddit_obj: dict,
) -> list:
"""Saves text to MP3 files.
Args:
reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py
Returns:
tuple[int,int]: (total length of the audio, the number of comments audio was generated for)
The number of comments audio was generated for
"""
voice = settings.config["settings"]["tts"]["choice"]
if voice.casefold() in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
else:
voice = settings.config['settings']['tts']['choice']
if voice.casefold() not in map(lambda _: _.casefold(), TTSProviders):
while True:
print_step("Please choose one of the following TTS providers: ")
print_step('Please choose one of the following TTS providers: ')
print_table(TTSProviders)
choice = input("\n")
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders):
voice = input('\n')
if voice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break
print("Unknown Choice")
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj)
return text_to_mp3.run()
print('Unknown Choice')
engine_instance = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
return await engine_instance.run()
def get_case_insensitive_key_value(input_dict, key):
def get_case_insensitive_key_value(
input_dict,
key
) -> object:
return next(
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
None,

Loading…
Cancel
Save