moved api TTS to aiohttp & other fixes

pull/933/head
Drugsosos 3 years ago
parent 4dbabd024e
commit c08afd8e6a
No known key found for this signature in database
GPG Key ID: 8E35176FE617E28D

@ -1,23 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import random
from utils import settings from utils import settings
from gtts import gTTS from gtts import gTTS
max_chars = 0
class GTTS: class GTTS:
def __init__(self): max_chars = 0
self.max_chars = 0
self.voices = []
def run(self, text, filepath): @staticmethod
async def run(
text,
filepath
) -> None:
tts = gTTS( tts = gTTS(
text=text, text=text,
lang=settings.config["reddit"]["thread"]["post_lang"] or "en", lang=settings.config["reddit"]["thread"]["post_lang"] or "en",
slow=False, slow=False,
) )
tts.save(filepath) tts.save(filepath)
def randomvoice(self):
return random.choice(self.voices)

@ -1,101 +1,101 @@
import base64 from aiohttp import ClientSession
from utils import settings from utils import settings
import random from random import choice
import requests
from requests.adapters import HTTPAdapter, Retry from attr import attrs, attrib
from attr.validators import instance_of
from TTS.common import BaseApiTTS, get_random_voice
# TTS examples: https://twitter.com/scanlime/status/1512598559769702406
# from profanity_filter import ProfanityFilter voices = dict()
# pf = ProfanityFilter()
# Code by @JasonLovesDoggo
# https://twitter.com/scanlime/status/1512598559769702406
nonhuman = [ # DISNEY VOICES voices['nonhuman'] = [ # DISNEY VOICES
"en_us_ghostface", # Ghost Face 'en_us_ghostface', # Ghost Face
"en_us_chewbacca", # Chewbacca 'en_us_chewbacca', # Chewbacca
"en_us_c3po", # C3PO 'en_us_c3po', # C3PO
"en_us_stitch", # Stitch 'en_us_stitch', # Stitch
"en_us_stormtrooper", # Stormtrooper 'en_us_stormtrooper', # Stormtrooper
"en_us_rocket", # Rocket 'en_us_rocket', # Rocket
# ENGLISH VOICES # ENGLISH VOICES
] ]
human = [ voices['human'] = [
"en_au_001", # English AU - Female 'en_au_001', # English AU - Female
"en_au_002", # English AU - Male 'en_au_002', # English AU - Male
"en_uk_001", # English UK - Male 1 'en_uk_001', # English UK - Male 1
"en_uk_003", # English UK - Male 2 'en_uk_003', # English UK - Male 2
"en_us_001", # English US - Female (Int. 1) 'en_us_001', # English US - Female (Int. 1)
"en_us_002", # English US - Female (Int. 2) 'en_us_002', # English US - Female (Int. 2)
"en_us_006", # English US - Male 1 'en_us_006', # English US - Male 1
"en_us_007", # English US - Male 2 'en_us_007', # English US - Male 2
"en_us_009", # English US - Male 3 'en_us_009', # English US - Male 3
"en_us_010", 'en_us_010',
] ]
voices = nonhuman + human
noneng = [ voices['non_eng'] = [
"fr_001", # French - Male 1 'fr_001', # French - Male 1
"fr_002", # French - Male 2 'fr_002', # French - Male 2
"de_001", # German - Female 'de_001', # German - Female
"de_002", # German - Male 'de_002', # German - Male
"es_002", # Spanish - Male 'es_002', # Spanish - Male
# AMERICA VOICES # AMERICA VOICES
"es_mx_002", # Spanish MX - Male 'es_mx_002', # Spanish MX - Male
"br_001", # Portuguese BR - Female 1 'br_001', # Portuguese BR - Female 1
"br_003", # Portuguese BR - Female 2 'br_003', # Portuguese BR - Female 2
"br_004", # Portuguese BR - Female 3 'br_004', # Portuguese BR - Female 3
"br_005", # Portuguese BR - Male 'br_005', # Portuguese BR - Male
# ASIA VOICES # ASIA VOICES
"id_001", # Indonesian - Female 'id_001', # Indonesian - Female
"jp_001", # Japanese - Female 1 'jp_001', # Japanese - Female 1
"jp_003", # Japanese - Female 2 'jp_003', # Japanese - Female 2
"jp_005", # Japanese - Female 3 'jp_005', # Japanese - Female 3
"jp_006", # Japanese - Male 'jp_006', # Japanese - Male
"kr_002", # Korean - Male 1 'kr_002', # Korean - Male 1
"kr_003", # Korean - Female 'kr_003', # Korean - Female
"kr_004", # Korean - Male 2 'kr_004', # Korean - Male 2
] ]
# good_voices = {'good': ['en_us_002', 'en_us_006'], # good_voices: 'en_us_002', 'en_us_006'
# 'ok': ['en_au_002', 'en_uk_001']} # less en_us_stormtrooper more less en_us_rocket en_us_ghostface # ok: 'en_au_002', 'en_uk_001'
# less: en_us_stormtrooper
# more or less: en_us_rocket, en_us_ghostface
class TikTok: # TikTok Text-to-Speech Wrapper @attrs(auto_attribs=True)
def __init__(self): class TikTok(BaseApiTTS): # TikTok Text-to-Speech Wrapper
self.URI_BASE = ( client: ClientSession = attrib(
"https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker=" validator=instance_of(ClientSession),
)
self.max_chars = 300
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng}
def run(self, text, filepath, random_voice: bool = False):
# if censor:
# req_text = pf.censor(req_text)
# pass
voice = (
self.randomvoice()
if random_voice
else (
settings.config["settings"]["tts"]["tiktok_voice"]
or random.choice(self.voices["human"])
) )
random_voice: bool = False
uri_base: str = attrib(
default='https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke',
kw_only=True,
) )
try: max_chars = 300
r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") decode_base64 = True
except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0")
# print(r.text)
vstr = [r.json()["data"]["v_str"]][0]
b64d = base64.b64decode(vstr)
with open(filepath, "wb") as out: def __attrs_post_init__(self):
out.write(b64d) self.voice = (
get_random_voice(voices, 'human')
if self.random_voice
else str(settings.config['settings']['tts']['tiktok_voice']).lower()
if str(settings.config['settings']['tts']['tiktok_voice']).lower() in [
voice.lower() for dict_title in voices for voice in voices[dict_title]]
else get_random_voice(voices, 'human')
)
def randomvoice(self): async def make_request(
return random.choice(self.voices["human"]) self,
text: str,
):
return await self.client.post(
f'{self.uri_base}',
params={
'text_speaker': self.voice,
'req_text': text,
'speaker_map_type': 0,
}
)

@ -1,45 +1,52 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from boto3 import Session from boto3 import Session
from botocore.exceptions import BotoCoreError, ClientError from botocore.exceptions import BotoCoreError, ClientError
import sys import sys
from utils import settings from utils import settings
import random from attr import attrs
from TTS.common import get_random_voice
voices = [ voices = [
"Brian", 'Brian',
"Emma", 'Emma',
"Russell", 'Russell',
"Joey", 'Joey',
"Matthew", 'Matthew',
"Joanna", 'Joanna',
"Kimberly", 'Kimberly',
"Amy", 'Amy',
"Geraint", 'Geraint',
"Nicole", 'Nicole',
"Justin", 'Justin',
"Ivy", 'Ivy',
"Kendra", 'Kendra',
"Salli", 'Salli',
"Raveena", 'Raveena',
] ]
@attrs(auto_attribs=True)
class AWSPolly: class AWSPolly:
def __init__(self): random_voice: bool = False
self.max_chars = 0 max_chars: int = 0
self.voices = voices
def run(self, text, filepath, random_voice: bool = False): async def run(
session = Session(profile_name="polly") self,
text,
filepath,
):
session = Session(profile_name='polly')
polly = session.client("polly") polly = session.client("polly")
if random_voice: voice = (
voice = self.randomvoice() get_random_voice(voices)
else: if self.random_voice
if not settings.config["settings"]["tts"]["aws_polly_voice"]: else str(settings.config['settings']['tts']['aws_polly_voice']).capitalize()
return ValueError( if str(settings.config['settings']['tts']['aws_polly_voice']).lower() in [voice.lower() for voice in voices]
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}" else get_random_voice(voices)
) )
voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize()
try: try:
# Request speech synthesis # Request speech synthesis
response = polly.synthesize_speech( response = polly.synthesize_speech(
@ -51,7 +58,7 @@ class AWSPolly:
sys.exit(-1) sys.exit(-1)
# Access the audio stream from the response # Access the audio stream from the response
if "AudioStream" in response: if 'AudioStream' in response:
file = open(filepath, "wb") file = open(filepath, "wb")
file.write(response["AudioStream"].read()) file.write(response["AudioStream"].read())
file.close() file.close()
@ -59,8 +66,5 @@ class AWSPolly:
else: else:
# The response didn't contain audio data, exit gracefully # The response didn't contain audio data, exit gracefully
print("Could not stream audio") print('Could not stream audio')
sys.exit(-1) sys.exit(-1)
def randomvoice(self):
return random.choice(self.voices)

@ -0,0 +1,70 @@
from aiofiles import open
import base64
from random import choice
from typing import Union, Optional
class BaseApiTTS:
max_chars: int
decode_base64: bool = False
@staticmethod
def text_len_sanitize(
text: str,
max_length: int,
) -> list:
# Split by comma or dot (else you can lose intonations), if there is non, split by groups of 299 chars
if '.' in text and all([split_text.__len__() < max_length for split_text in text.split('.')]):
return text.split('.')
if ',' in text and all([split_text.__len__() < max_length for split_text in text.split(',')]):
return text.split(',')
return [text[i:i + max_length] for i in range(0, len(text), max_length)]
async def write_file(
self,
output_text: str,
filename: str,
) -> None:
decoded_text = base64.b64decode(output_text) if self.decode_base64 else output_text
async with open(filename, 'wb') as out:
await out.write(decoded_text)
async def run(
self,
req_text: str,
filename: str,
) -> None:
output_text = ''
if len(req_text) > self.max_chars:
for part in self.text_len_sanitize(req_text, self.max_chars):
if part:
output_text += await self.make_request(part)
else:
output_text = await self.make_request(req_text)
await self.write_file(output_text, filename)
def get_random_voice(
voices: Union[list, dict],
key: Optional[str] = None,
) -> str:
if isinstance(voices, list):
return choice(voices)
else:
return choice(voices[key])
def audio_length(
path: str,
) -> float | int:
from mutagen.mp3 import MP3
try:
audio = MP3(path)
return audio.info.length
except Exception as e: # TODO add logging
return 0

@ -1,23 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from pathlib import Path from asyncio import as_completed
from typing import Tuple
import re
# import sox from pathlib import Path
# from mutagen import MutagenError
# from mutagen.mp3 import MP3, HeaderNotFoundError
import translators as ts import translators as ts
from rich.progress import track from rich.progress import track
from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips from attr import attrs, attrib
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.voice import sanitize_text from utils.voice import sanitize_text
from utils import settings from utils import settings
from TTS.common import audio_length
DEFUALT_MAX_LENGTH: int = 50 # video length variable
@attrs(auto_attribs=True)
class TTSEngine: class TTSEngine:
"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines. """Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.
Args: Args:
@ -29,94 +25,81 @@ class TTSEngine:
Notes: Notes:
tts_module must take the arguments text and filepath. tts_module must take the arguments text and filepath.
""" """
tts_module: object
reddit_object: dict
path: str = 'assets/temp/mp3'
max_length: int = 50 # TODO move to config
__total_length: int = attrib(
default=0,
kw_only=True
)
def __init__( async def run(
self, self
tts_module, ) -> list:
reddit_object: dict,
path: str = "assets/temp/mp3",
max_length: int = DEFUALT_MAX_LENGTH,
):
self.tts_module = tts_module()
self.reddit_object = reddit_object
self.path = path
self.max_length = max_length
self.length = 0
def run(self) -> Tuple[int, int]:
Path(self.path).mkdir(parents=True, exist_ok=True) Path(self.path).mkdir(parents=True, exist_ok=True)
# This file needs to be removed in case this post does not use post text, so that it wont appear in the final video # This file needs to be removed in case this post does not use post text
# so that it won't appear in the final video
try: try:
Path(f"{self.path}/posttext.mp3").unlink() Path(f'{self.path}/posttext.mp3').unlink()
except OSError: except OSError:
pass pass
print_step("Saving Text to MP3 files...") print_step('Saving Text to MP3 files...')
self.call_tts("title", self.reddit_object["thread_title"]) await self.call_tts('title', self.reddit_object['thread_title'])
if ( async_tasks_offset = 1
self.reddit_object["thread_post"] != ""
and settings.config["settings"]["storymode"] == True
):
self.call_tts("posttext", self.reddit_object["thread_post"])
idx = None
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
# ! Stop creating mp3 files if the length is greater than max length.
if self.length > self.max_length:
break
if not self.tts_module.max_chars:
self.call_tts(f"{idx}", comment["comment_body"])
else:
self.split_post(comment["comment_body"], idx)
print_substep("Saved Text to MP3 files successfully.", style="bold green")
return self.length, idx
def split_post(self, text: str, idx: int):
split_files = []
split_text = [
x.group().strip()
for x in re.finditer(rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text)
]
idy = None if self.reddit_object['thread_post'] and settings.config['settings']['storymode']:
for idy, text_cut in enumerate(split_text): await self.call_tts('posttext', self.reddit_object['thread_post'])
# print(f"{idx}-{idy}: {text_cut}\n") async_tasks_offset += 1
self.call_tts(f"{idx}-{idy}.part", text_cut)
split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy}.part.mp3"))
CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile(
f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None
)
for i in split_files: async_tasks_primary = [
name = i.filename self.call_tts(str(idx), comment['comment_body'])
i.close() for idx, comment in enumerate(self.reddit_object['comments'])
Path(name).unlink() ]
# for i in range(0, idy + 1): for task in track(
# print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3") as_completed(async_tasks_primary),
description='Saving...',
total=async_tasks_primary.__len__()
):
await task
# Path(f"{self.path}/{idx}-{i}.part.mp3").unlink() print_substep('Saved Text to MP3 files successfully.', style='bold green')
return [
comments for comments, condition in
zip(self.reddit_object['comments'], async_tasks_primary[async_tasks_offset:])
if condition
]
async def call_tts(
self,
filename: str,
text: str
) -> bool:
await self.tts_module.run(
text=self.process_text(text),
filepath=f'{self.path}/{filename}.mp3'
)
def call_tts(self, filename: str, text: str): clip_length = audio_length(f'assets/audio/{filename}.mp3')
self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.length += clip.duration
clip.close()
if self.__total_length + clip_length <= self.max_length:
self.max_length += clip_length
return True
return False
def process_text(text: str): @staticmethod
lang = settings.config["reddit"]["thread"]["post_lang"] def process_text(
text: str,
) -> str:
lang = settings.config['reddit']['thread']['post_lang']
new_text = sanitize_text(text) new_text = sanitize_text(text)
if lang: if lang:
print_substep("Translating Text...") print_substep('Translating Text...')
translated_text = ts.google(text, to_language=lang) translated_text = ts.google(text, to_language=lang)
new_text = sanitize_text(translated_text) new_text = sanitize_text(translated_text)
return new_text return new_text

@ -1,62 +1,71 @@
import random from aiohttp import ClientSession
import requests
from requests.exceptions import JSONDecodeError from random import choice
from utils import settings from utils import settings
from utils.voice import check_ratelimit from attr import attrs, attrib
from attr.validators import instance_of
from TTS.common import BaseApiTTS, get_random_voice
voices = [ voices = [
"Brian", 'Brian',
"Emma", 'Emma',
"Russell", 'Russell',
"Joey", 'Joey',
"Matthew", 'Matthew',
"Joanna", 'Joanna',
"Kimberly", 'Kimberly',
"Amy", 'Amy',
"Geraint", 'Geraint',
"Nicole", 'Nicole',
"Justin", 'Justin',
"Ivy", 'Ivy',
"Kendra", 'Kendra',
"Salli", 'Salli',
"Raveena", 'Raveena',
] ]
# valid voices https://lazypy.ro/tts/ # valid voices https://lazypy.ro/tts/
class StreamlabsPolly: @attrs(auto_attribs=True)
def __init__(self): class StreamlabsPolly(BaseApiTTS):
self.url = "https://streamlabs.com/polly/speak" client: ClientSession = attrib(
self.max_chars = 550 validator=instance_of(ClientSession),
self.voices = voices )
random_voice: bool = False
url: str = attrib(
default='https://streamlabs.com/polly/speak',
kw_only=True,
)
def run(self, text, filepath, random_voice: bool = False): max_chars = 550
if random_voice:
voice = self.randomvoice() async def make_request(
else: self,
if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]: text: str,
return ValueError( ):
f"Please set the config variable STREAMLABS_VOICE to a valid voice. options are: {voices}" voice = (
get_random_voice(voices)
if self.random_voice
else str(settings.config['settings']['tts']['streamlabs_polly_voice']).capitalize()
if str(settings.config['settings']['tts']['streamlabs_polly_voice']).lower() in [
voice.lower() for voice in voices]
else get_random_voice(voices)
)
response = await self.client.post(
self.url,
data={
'voice': voice,
'text': text,
'service': 'polly',
}
) )
voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize() speak_url = await(
body = {"voice": voice, "text": text, "service": "polly"} await response.json()
response = requests.post(self.url, data=body) )['speak_url']
if not check_ratelimit(response):
self.run(text, filepath, random_voice) return await self.client.get(speak_url)
else:
try:
voice_data = requests.get(response.json()["speak_url"])
with open(filepath, "wb") as f:
f.write(voice_data.content)
except (KeyError, JSONDecodeError):
try:
if response.json()["error"] == "No text specified!":
raise ValueError("Please specify a text to convert to speech.")
except (KeyError, JSONDecodeError):
print("Error occurred calling Streamlabs Polly")
def randomvoice(self):
return random.choice(self.voices)

@ -41,13 +41,10 @@ async def main(
): ):
cleanup() cleanup()
reddit_object = get_subreddit_threads(POST_ID) reddit_object = get_subreddit_threads(POST_ID)
length, number_of_comments = save_text_to_mp3(reddit_object) comments_created = await save_text_to_mp3(reddit_object)
length = math.ceil(length) await RedditScreenshot(reddit_object, comments_created).download()
await RedditScreenshot(reddit_object, number_of_comments).download()
bg_config = get_background_config() bg_config = get_background_config()
download_background(bg_config) make_final_video(comments_created, reddit_object, bg_config)
chop_background_video(bg_config, length)
make_final_video(number_of_comments, length, reddit_object, bg_config)
async def run_many(times): async def run_many(times):

@ -10,3 +10,6 @@ toml==0.10.2
translators==5.3.1 translators==5.3.1
pyppeteer==1.0.2 pyppeteer==1.0.2
attrs==21.4.0 attrs==21.4.0
aiohttp==3.8.1 # There is security warning for <=3.8.1, no fixes for now
aiofiles==0.8.0
mutagen==1.45.1

@ -10,7 +10,9 @@ if sys.version_info[0] >= 3:
from datetime import timezone from datetime import timezone
def check_ratelimit(response: Response): def check_ratelimit(
response: Response,
):
""" """
Checks if the response is a ratelimit response. Checks if the response is a ratelimit response.
If it is, it sleeps for the time specified in the response. If it is, it sleeps for the time specified in the response.

@ -3,7 +3,7 @@ import multiprocessing
import os import os
import re import re
from os.path import exists from os.path import exists
from typing import Dict, Tuple, Any from typing import Tuple, Any
import translators as ts import translators as ts
@ -13,7 +13,6 @@ from moviepy.editor import (
ImageClip, ImageClip,
concatenate_videoclips, concatenate_videoclips,
concatenate_audioclips, concatenate_audioclips,
CompositeAudioClip,
CompositeVideoClip, CompositeVideoClip,
) )
from moviepy.video.io.ffmpeg_tools import ffmpeg_merge_video_audio, ffmpeg_extract_subclip from moviepy.video.io.ffmpeg_tools import ffmpeg_merge_video_audio, ffmpeg_extract_subclip
@ -23,24 +22,26 @@ from utils.cleanup import cleanup
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
from utils.videos import save_data from utils.videos import save_data
from utils import settings from utils import settings
from video_creation.background import download_background
console = Console() console = Console()
W, H = 1080, 1920 W, H = 1080, 1920 # TODO move to config
def name_normalize(name: str) -> str: def name_normalize(
name: str
) -> str:
name = re.sub(r'[?\\"%*:|<>]', "", name) name = re.sub(r'[?\\"%*:|<>]', "", name)
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name) name = re.sub(r'( [w,W]\s?\/\s?[o,O,0])', r' without', name)
name = re.sub(r"( [w,W]\s?\/)", r" with", name) name = re.sub(r'( [w,W]\s?\/)', r' with', name)
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name) name = re.sub(r'(\d+)\s?\/\s?(\d+)', r'\1 of \2', name)
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name) name = re.sub(r'(\w+)\s?\/\s?(\w+)', r'\1 or \2', name)
name = re.sub(r"\/", r"", name) name = re.sub(r'\/', '', name)
lang = settings.config["reddit"]["thread"]["post_lang"] lang = settings.config['reddit']['thread']['post_lang']
if lang: if lang:
print_substep("Translating filename...") print_substep('Translating filename...')
translated_name = ts.google(name, to_language=lang) translated_name = ts.google(name, to_language=lang)
return translated_name return translated_name
@ -49,48 +50,46 @@ def name_normalize(name: str) -> str:
def make_final_video( def make_final_video(
number_of_clips: int, length: int, reddit_obj: dict, background_config: Tuple[str, str, str, Any] indexes_of_clips: list,
): reddit_obj: dict,
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp background_config: Tuple[str, str, str, Any],
) -> None:
"""
Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
Args: Args:
number_of_clips (int): Index to end at when going through the screenshots' indexes_of_clips (list): Indexes with created comments'
length (int): Length of the video
reddit_obj (dict): The reddit object that contains the posts to read. reddit_obj (dict): The reddit object that contains the posts to read.
background_config (Tuple[str, str, str, Any]): The background config to use. background_config (Tuple[str, str, str, Any]): The background config to use.
""" """
print_step("Creating the final video 🎥") print_step('Creating the final video 🎥')
VideoFileClip.reW = lambda clip: clip.resize(width=W) VideoFileClip.reW = lambda clip: clip.resize(width=W)
VideoFileClip.reH = lambda clip: clip.resize(width=H) VideoFileClip.reH = lambda clip: clip.resize(width=H)
opacity = settings.config["settings"]["opacity"] opacity = settings.config['settings']['opacity']
background_clip = (
VideoFileClip("assets/temp/background.mp4") final_length = 0
.without_audio()
.resize(height=H)
.crop(x1=1166.6, y1=0, x2=2246.6, y2=1920)
)
# Gather all audio clips # Gather all audio clips
audio_clips = [AudioFileClip(f"assets/temp/mp3/{i}.mp3") for i in range(number_of_clips)] audio_clips = [AudioFileClip(f'assets/temp/mp3/{i}.mp3') for i in indexes_of_clips]
audio_clips.insert(0, AudioFileClip("assets/temp/mp3/title.mp3")) audio_clips.insert(0, AudioFileClip('assets/temp/mp3/title.mp3'))
audio_concat = concatenate_audioclips(audio_clips) audio_composite = concatenate_audioclips(audio_clips)
audio_composite = CompositeAudioClip([audio_concat])
console.log(f"[bold green] Video Will Be: {length} Seconds Long") console.log(f'[bold green] Video Will Be: {audio_composite.length} Seconds Long')
# add title to video # add title to video
image_clips = [] image_clips = []
# Gather all images # Gather all images
new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity) new_opacity = 1 if opacity is None or float(opacity) >= 1 else float(opacity)
image_clips.insert( image_clips.insert(
0, 0,
ImageClip("assets/temp/png/title.png") ImageClip('assets/temp/png/title.png')
.set_duration(audio_clips[0].duration) .set_duration(audio_clips[0].duration)
.resize(width=W - 100) .resize(width=W - 100)
.set_opacity(new_opacity), .set_opacity(new_opacity),
) )
for i in range(0, number_of_clips): for i in indexes_of_clips:
image_clips.append( image_clips.append(
ImageClip(f"assets/temp/png/comment_{i}.png") ImageClip(f'assets/temp/png/comment_{i}.png')
.set_duration(audio_clips[i + 1].duration) .set_duration(audio_clips[i + 1].duration)
.resize(width=W - 100) .resize(width=W - 100)
.set_opacity(new_opacity) .set_opacity(new_opacity)
@ -109,63 +108,96 @@ def make_final_video(
img_clip_pos = background_config[3] img_clip_pos = background_config[3]
image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos) image_concat = concatenate_videoclips(image_clips).set_position(img_clip_pos)
image_concat.audio = audio_composite image_concat.audio = audio_composite
download_background(background_config)
background_clip = (
VideoFileClip('assets/temp/background.mp4')
.set_start(0)
.set_end()
.without_audio()
.resize(height=H)
)
back_video_width, back_video_height = background_clip.size
# Fix for crop with vertical videos
if back_video_width < H:
background_clip = (
background_clip
.resize(width=W)
)
back_video_width, back_video_height = background_clip.size
background_clip = background_clip.crop(
x1=0,
x2=back_video_width,
y1=back_video_height / 2 - H / 2,
y2=back_video_height / 2 + H / 2
)
else:
background_clip = background_clip.crop(
x1=back_video_width / 2 - W / 2,
x2=back_video_width / 2 + W / 2,
y1=0,
y2=back_video_height
)
final = CompositeVideoClip([background_clip, image_concat]) final = CompositeVideoClip([background_clip, image_concat])
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"]) title = re.sub(r'[^\w\s-]', '', reddit_obj['thread_title'])
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"]) idx = re.sub(r'[^\w\s-]', '', reddit_obj['thread_id'])
filename = f"{name_normalize(title)}.mp4" filename = f'{name_normalize(title)}.mp4'
subreddit = settings.config["reddit"]["thread"]["subreddit"] subreddit = settings.config['reddit']['thread']['subreddit']
save_data(subreddit, filename, title, idx, background_config[2]) save_data(subreddit, filename, title, idx, background_config[2])
if not exists(f"./results/{subreddit}"): if not exists(f'./results/{subreddit}'):
print_substep("The results folder didn't exist so I made it") print_substep('The results folder didn\'t exist so I made it')
os.makedirs(f"./results/{subreddit}") os.makedirs(f'./results/{subreddit}')
final.write_videofile( final.write_videofile(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
fps=30, fps=30,
audio_codec="aac", audio_codec='aac',
audio_bitrate="192k", audio_bitrate='192k',
verbose=False, verbose=False,
threads=multiprocessing.cpu_count(), threads=multiprocessing.cpu_count(),
) )
if settings.config["settings"]["background_audio"]: if settings.config['settings']['background_audio']:
print("[bold green] Merging background audio with video") print('[bold green] Merging background audio with video')
if not exists(f"assets/backgrounds/background.mp3"): if not exists('assets/backgrounds/background.mp3'):
print_substep( print_substep(
"Cannot find assets/backgrounds/background.mp3 audio file didn't so skipping." 'Cannot find assets/backgrounds/background.mp3 audio file didn\'t so skipping.'
) )
ffmpeg_extract_subclip( ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
0, 0,
final.duration, final.duration,
targetname=f"results/{subreddit}/{filename}", targetname=f'results/{subreddit}/{filename}',
) )
else: else:
ffmpeg_merge_video_audio( ffmpeg_merge_video_audio(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
"assets/backgrounds/background.mp3", 'assets/backgrounds/background.mp3',
"assets/temp/temp_audio.mp4", 'assets/temp/temp_audio.mp4',
) )
ffmpeg_extract_subclip( # check if this gets run ffmpeg_extract_subclip( # check if this gets run
"assets/temp/temp_audio.mp4", 'assets/temp/temp_audio.mp4',
0, 0,
final.duration, final.duration,
targetname=f"results/{subreddit}/{filename}", targetname=f"results/{subreddit}/{filename}",
) )
else: else:
print("debug duck") print('debug duck')
ffmpeg_extract_subclip( ffmpeg_extract_subclip(
"assets/temp/temp.mp4", 'assets/temp/temp.mp4',
0, 0,
final.duration, final.duration,
targetname=f"results/{subreddit}/{filename}", targetname=f'results/{subreddit}/{filename}',
) )
print_step("Removing temporary files 🗑") print_step('Removing temporary files 🗑')
cleanups = cleanup() cleanups = cleanup()
print_substep(f"Removed {cleanups} temporary files 🗑") print_substep(f'Removed {cleanups} temporary files 🗑')
print_substep("See result in the results folder!") print_substep('See result in the results folder!')
print_step( print_step(
f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}' f'Reddit title: {reddit_obj["thread_title"]} \n Background Credit: {background_config[2]}'

@ -209,10 +209,10 @@ class RedditScreenshot(Browser, Wait):
""" """
Args: Args:
reddit_object (Dict): Reddit object received from reddit/subreddit.py reddit_object (Dict): Reddit object received from reddit/subreddit.py
screenshot_num (int): Number of screenshots to download screenshot_idx (int): List with indexes of voiced comments
""" """
reddit_object: dict reddit_object: dict
screenshot_num: int = attrib() screenshot_idx: list = attrib()
@screenshot_num.validator @screenshot_num.validator
def validate_screenshot_num(self, attribute, value): def validate_screenshot_num(self, attribute, value):
@ -348,9 +348,8 @@ class RedditScreenshot(Browser, Wait):
) )
async_tasks_primary = [ async_tasks_primary = [
self.__collect_comment(comment, idx) for idx, comment in self.__collect_comment(self.reddit_object['comments'][idx], idx) for idx in
enumerate(self.reddit_object['comments']) self.screenshot_idx
if idx < self.screenshot_num
] ]
for task in track( for task in track(

@ -1,55 +1,50 @@
#!/usr/bin/env python
from typing import Dict, Tuple
from rich.console import Console
from TTS.engine_wrapper import TTSEngine from TTS.engine_wrapper import TTSEngine
from TTS.GTTS import GTTS from TTS.GTTS import GTTS
from TTS.streamlabs_polly import StreamlabsPolly from TTS.streamlabs_polly import StreamlabsPolly
from TTS.aws_polly import AWSPolly from TTS.aws_polly import AWSPolly
from TTS.TikTok import TikTok from TTS.TikTok import TikTok
from utils import settings from utils import settings
from utils.console import print_table, print_step from utils.console import print_table, print_step
console = Console()
TTSProviders = { TTSProviders = {
"GoogleTranslate": GTTS, 'GoogleTranslate': GTTS,
"AWSPolly": AWSPolly, 'AWSPolly': AWSPolly,
"StreamlabsPolly": StreamlabsPolly, 'StreamlabsPolly': StreamlabsPolly,
"TikTok": TikTok, 'TikTok': TikTok,
} }
def save_text_to_mp3(reddit_obj) -> Tuple[int, int]: async def save_text_to_mp3(
reddit_obj: dict,
) -> list:
"""Saves text to MP3 files. """Saves text to MP3 files.
Args: Args:
reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py reddit_obj (): Reddit object received from reddit API in reddit/subreddit.py
Returns: Returns:
tuple[int,int]: (total length of the audio, the number of comments audio was generated for) The number of comments audio was generated for
""" """
voice = settings.config["settings"]["tts"]["choice"] voice = settings.config['settings']['tts']['choice']
if voice.casefold() in map(lambda _: _.casefold(), TTSProviders): if voice.casefold() not in map(lambda _: _.casefold(), TTSProviders):
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
else:
while True: while True:
print_step("Please choose one of the following TTS providers: ") print_step('Please choose one of the following TTS providers: ')
print_table(TTSProviders) print_table(TTSProviders)
choice = input("\n") voice = input('\n')
if choice.casefold() in map(lambda _: _.casefold(), TTSProviders): if voice.casefold() in map(lambda _: _.casefold(), TTSProviders):
break break
print("Unknown Choice") print('Unknown Choice')
text_to_mp3 = TTSEngine(get_case_insensitive_key_value(TTSProviders, choice), reddit_obj) engine_instance = TTSEngine(get_case_insensitive_key_value(TTSProviders, voice), reddit_obj)
return await engine_instance.run()
return text_to_mp3.run()
def get_case_insensitive_key_value(input_dict, key): def get_case_insensitive_key_value(
input_dict,
key
) -> object:
return next( return next(
(value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()), (value for dict_key, value in input_dict.items() if dict_key.lower() == key.lower()),
None, None,

Loading…
Cancel
Save