commit
093be0f544
@ -1,13 +1,19 @@
|
|||||||
from gtts import gTTS
|
#!/usr/bin/env python3
|
||||||
|
import random
|
||||||
import os
|
import os
|
||||||
|
from gtts import gTTS
|
||||||
|
|
||||||
|
max_chars = 0
|
||||||
|
|
||||||
|
|
||||||
class GTTS:
|
class GTTS:
|
||||||
def tts(
|
def __init__(self):
|
||||||
self,
|
self.max_chars = 0
|
||||||
req_text: str = "Google Text To Speech",
|
self.voices = []
|
||||||
filename: str = "title.mp3",
|
|
||||||
random_speaker=False,
|
def run(self, text, filepath):
|
||||||
censor=False,
|
tts = gTTS(text=text, lang=os.getenv("POSTLANG") or "en", slow=False)
|
||||||
):
|
tts.save(filepath)
|
||||||
tts = gTTS(text=req_text, lang=os.getenv("POSTLANG") or "en", slow=False)
|
|
||||||
tts.save(f"{filename}")
|
def randomvoice(self):
|
||||||
|
return random.choice(self.voices)
|
||||||
|
@ -1,106 +0,0 @@
|
|||||||
import os
|
|
||||||
import random
|
|
||||||
import re
|
|
||||||
|
|
||||||
import requests
|
|
||||||
import sox
|
|
||||||
from moviepy.audio.AudioClip import concatenate_audioclips, CompositeAudioClip
|
|
||||||
from moviepy.audio.io.AudioFileClip import AudioFileClip
|
|
||||||
from requests.exceptions import JSONDecodeError
|
|
||||||
|
|
||||||
voices = [
|
|
||||||
"Brian",
|
|
||||||
"Emma",
|
|
||||||
"Russell",
|
|
||||||
"Joey",
|
|
||||||
"Matthew",
|
|
||||||
"Joanna",
|
|
||||||
"Kimberly",
|
|
||||||
"Amy",
|
|
||||||
"Geraint",
|
|
||||||
"Nicole",
|
|
||||||
"Justin",
|
|
||||||
"Ivy",
|
|
||||||
"Kendra",
|
|
||||||
"Salli",
|
|
||||||
"Raveena",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
# valid voices https://lazypy.ro/tts/
|
|
||||||
|
|
||||||
|
|
||||||
class POLLY:
|
|
||||||
def __init__(self):
|
|
||||||
self.url = "https://streamlabs.com/polly/speak"
|
|
||||||
|
|
||||||
def tts(
|
|
||||||
self,
|
|
||||||
req_text: str = "Amazon Text To Speech",
|
|
||||||
filename: str = "title.mp3",
|
|
||||||
random_speaker=False,
|
|
||||||
censor=False,
|
|
||||||
):
|
|
||||||
if random_speaker:
|
|
||||||
voice = self.randomvoice()
|
|
||||||
else:
|
|
||||||
if not os.getenv("VOICE"):
|
|
||||||
return ValueError(
|
|
||||||
"Please set the environment variable VOICE to a valid voice. options are: {}".format(
|
|
||||||
voices
|
|
||||||
)
|
|
||||||
)
|
|
||||||
voice = str(os.getenv("VOICE")).capitalize()
|
|
||||||
body = {"voice": voice, "text": req_text, "service": "polly"}
|
|
||||||
response = requests.post(self.url, data=body)
|
|
||||||
try:
|
|
||||||
voice_data = requests.get(response.json()["speak_url"])
|
|
||||||
with open(filename, "wb") as f:
|
|
||||||
f.write(voice_data.content)
|
|
||||||
except (KeyError, JSONDecodeError):
|
|
||||||
if response.json()["error"] == "Text length is too long!":
|
|
||||||
chunks = [m.group().strip() for m in re.finditer(r" *((.{0,499})(\.|.$))", req_text)]
|
|
||||||
|
|
||||||
audio_clips = []
|
|
||||||
cbn = sox.Combiner()
|
|
||||||
|
|
||||||
chunkId = 0
|
|
||||||
for chunk in chunks:
|
|
||||||
body = {"voice": voice, "text": chunk, "service": "polly"}
|
|
||||||
resp = requests.post(self.url, data=body)
|
|
||||||
voice_data = requests.get(resp.json()["speak_url"])
|
|
||||||
with open(filename.replace(".mp3", f"-{chunkId}.mp3"), "wb") as out:
|
|
||||||
out.write(voice_data.content)
|
|
||||||
|
|
||||||
audio_clips.append(filename.replace(".mp3", f"-{chunkId}.mp3"))
|
|
||||||
|
|
||||||
chunkId = chunkId + 1
|
|
||||||
try:
|
|
||||||
if len(audio_clips) > 1:
|
|
||||||
cbn.convert(samplerate=44100, n_channels=2)
|
|
||||||
cbn.build(audio_clips, filename, "concatenate")
|
|
||||||
else:
|
|
||||||
os.rename(audio_clips[0], filename)
|
|
||||||
except (
|
|
||||||
sox.core.SoxError,
|
|
||||||
FileNotFoundError,
|
|
||||||
): # https://github.com/JasonLovesDoggo/RedditVideoMakerBot/issues/67#issuecomment-1150466339
|
|
||||||
for clip in audio_clips:
|
|
||||||
i = audio_clips.index(clip) # get the index of the clip
|
|
||||||
audio_clips = (
|
|
||||||
audio_clips[:i] + [AudioFileClip(clip)] + audio_clips[i + 1 :]
|
|
||||||
) # replace the clip with an AudioFileClip
|
|
||||||
audio_concat = concatenate_audioclips(audio_clips)
|
|
||||||
audio_composite = CompositeAudioClip([audio_concat])
|
|
||||||
audio_composite.write_audiofile(filename, 44100, 2, 2000, None)
|
|
||||||
|
|
||||||
def make_readable(self, text):
|
|
||||||
"""
|
|
||||||
Amazon Polly fails to read some symbols properly such as '& (and)'.
|
|
||||||
So we normalize input text before passing it to the service
|
|
||||||
"""
|
|
||||||
text = text.replace("&", "and")
|
|
||||||
return text
|
|
||||||
|
|
||||||
def randomvoice(self):
|
|
||||||
return random.choice(voices)
|
|
@ -0,0 +1,66 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from boto3 import Session
|
||||||
|
from botocore.exceptions import BotoCoreError, ClientError
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
|
||||||
|
voices = [
|
||||||
|
"Brian",
|
||||||
|
"Emma",
|
||||||
|
"Russell",
|
||||||
|
"Joey",
|
||||||
|
"Matthew",
|
||||||
|
"Joanna",
|
||||||
|
"Kimberly",
|
||||||
|
"Amy",
|
||||||
|
"Geraint",
|
||||||
|
"Nicole",
|
||||||
|
"Justin",
|
||||||
|
"Ivy",
|
||||||
|
"Kendra",
|
||||||
|
"Salli",
|
||||||
|
"Raveena",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class AWSPolly:
|
||||||
|
def __init__(self):
|
||||||
|
self.max_chars = 0
|
||||||
|
self.voices = voices
|
||||||
|
|
||||||
|
def run(self, text, filepath, random_voice: bool = False):
|
||||||
|
session = Session(profile_name="polly")
|
||||||
|
polly = session.client("polly")
|
||||||
|
if random_voice:
|
||||||
|
voice = self.randomvoice()
|
||||||
|
else:
|
||||||
|
if not os.getenv("VOICE"):
|
||||||
|
return ValueError(
|
||||||
|
f"Please set the environment variable VOICE to a valid voice. options are: {voices}"
|
||||||
|
)
|
||||||
|
voice = str(os.getenv("AWS_VOICE")).capitalize()
|
||||||
|
try:
|
||||||
|
# Request speech synthesis
|
||||||
|
response = polly.synthesize_speech(
|
||||||
|
Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural"
|
||||||
|
)
|
||||||
|
except (BotoCoreError, ClientError) as error:
|
||||||
|
# The service returned an error, exit gracefully
|
||||||
|
print(error)
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
# Access the audio stream from the response
|
||||||
|
if "AudioStream" in response:
|
||||||
|
file = open(filepath, "wb")
|
||||||
|
file.write(response["AudioStream"].read())
|
||||||
|
file.close()
|
||||||
|
# print_substep(f"Saved Text {idx} to MP3 files successfully.", style="bold green")
|
||||||
|
|
||||||
|
else:
|
||||||
|
# The response didn't contain audio data, exit gracefully
|
||||||
|
print("Could not stream audio")
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
def randomvoice(self):
|
||||||
|
return random.choice(self.voices)
|
@ -0,0 +1,110 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Tuple
|
||||||
|
import re
|
||||||
|
from os import getenv
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
|
import translators as ts
|
||||||
|
from rich.progress import track
|
||||||
|
from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips
|
||||||
|
from utils.console import print_step, print_substep
|
||||||
|
from utils.voice import sanitize_text
|
||||||
|
|
||||||
|
|
||||||
|
class TTSEngine:
|
||||||
|
|
||||||
|
"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tts_module : The TTS module. Your module should handle the TTS itself and saving to the given path under the run method.
|
||||||
|
reddit_object : The reddit object that contains the posts to read.
|
||||||
|
path (Optional) : The unix style path to save the mp3 files to. This must not have leading or trailing slashes.
|
||||||
|
max_length (Optional) : The maximum length of the mp3 files in total.
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
tts_module must take the arguments text and filepath.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
tts_module,
|
||||||
|
reddit_object: dict,
|
||||||
|
path: str = "assets/temp/mp3",
|
||||||
|
max_length: int = 50,
|
||||||
|
):
|
||||||
|
self.tts_module = tts_module()
|
||||||
|
self.reddit_object = reddit_object
|
||||||
|
self.path = path
|
||||||
|
self.max_length = max_length
|
||||||
|
self.length = 0
|
||||||
|
|
||||||
|
def run(self) -> Tuple[int, int]:
|
||||||
|
|
||||||
|
Path(self.path).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# This file needs to be removed in case this post does not use post text, so that it wont appear in the final video
|
||||||
|
try:
|
||||||
|
Path(f"{self.path}/posttext.mp3").unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
print_step("Saving Text to MP3 files...")
|
||||||
|
|
||||||
|
self.call_tts("title", self.reddit_object["thread_title"])
|
||||||
|
if (
|
||||||
|
self.reddit_object["thread_post"] != ""
|
||||||
|
and getenv("STORYMODE", "").casefold() == "true"
|
||||||
|
):
|
||||||
|
self.call_tts("posttext", self.reddit_object["thread_post"])
|
||||||
|
|
||||||
|
idx = None
|
||||||
|
for idx, comment in track(
|
||||||
|
enumerate(self.reddit_object["comments"]), "Saving..."
|
||||||
|
):
|
||||||
|
# ! Stop creating mp3 files if the length is greater than max length.
|
||||||
|
if self.length > self.max_length:
|
||||||
|
break
|
||||||
|
if not self.tts_module.max_chars:
|
||||||
|
self.call_tts(f"{idx}", comment["comment_body"])
|
||||||
|
else:
|
||||||
|
self.split_post(comment["comment_body"], idx)
|
||||||
|
|
||||||
|
print_substep("Saved Text to MP3 files successfully.", style="bold green")
|
||||||
|
return self.length, idx
|
||||||
|
|
||||||
|
def split_post(self, text: str, idx: int) -> str:
|
||||||
|
split_files = []
|
||||||
|
split_text = [
|
||||||
|
x.group().strip()
|
||||||
|
for x in re.finditer(
|
||||||
|
rf" *((.{{0,{self.tts_module.max_chars}}})(\.|.$))", text
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
idy = None
|
||||||
|
for idy, text_cut in enumerate(split_text):
|
||||||
|
# print(f"{idx}-{idy}: {text_cut}\n")
|
||||||
|
self.call_tts(f"{idx}-{idy}.part", text_cut)
|
||||||
|
split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy}.part.mp3"))
|
||||||
|
CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile(
|
||||||
|
f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None
|
||||||
|
)
|
||||||
|
|
||||||
|
for i in range(0, idy + 1):
|
||||||
|
# print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3")
|
||||||
|
Path(f"{self.path}/{idx}-{i}.part.mp3").unlink()
|
||||||
|
|
||||||
|
def call_tts(self, filename: str, text: str):
|
||||||
|
self.tts_module.run(
|
||||||
|
text=process_text(text), filepath=f"{self.path}/{filename}.mp3"
|
||||||
|
)
|
||||||
|
self.length += MP3(f"{self.path}/{filename}.mp3").info.length
|
||||||
|
|
||||||
|
|
||||||
|
def process_text(text: str):
|
||||||
|
lang = getenv("POSTLANG", "")
|
||||||
|
new_text = sanitize_text(text)
|
||||||
|
if lang:
|
||||||
|
print_substep("Translating Text...")
|
||||||
|
new_text = ts.google(text, to_language=lang)
|
||||||
|
return new_text
|
@ -0,0 +1,53 @@
|
|||||||
|
import random
|
||||||
|
import os
|
||||||
|
import requests
|
||||||
|
from requests.exceptions import JSONDecodeError
|
||||||
|
|
||||||
|
voices = [
|
||||||
|
"Brian",
|
||||||
|
"Emma",
|
||||||
|
"Russell",
|
||||||
|
"Joey",
|
||||||
|
"Matthew",
|
||||||
|
"Joanna",
|
||||||
|
"Kimberly",
|
||||||
|
"Amy",
|
||||||
|
"Geraint",
|
||||||
|
"Nicole",
|
||||||
|
"Justin",
|
||||||
|
"Ivy",
|
||||||
|
"Kendra",
|
||||||
|
"Salli",
|
||||||
|
"Raveena",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# valid voices https://lazypy.ro/tts/
|
||||||
|
|
||||||
|
|
||||||
|
class StreamlabsPolly:
|
||||||
|
def __init__(self):
|
||||||
|
self.url = "https://streamlabs.com/polly/speak"
|
||||||
|
self.max_chars = 550
|
||||||
|
self.voices = voices
|
||||||
|
|
||||||
|
def run(self, text, filepath, random_voice: bool = False):
|
||||||
|
if random_voice:
|
||||||
|
voice = self.randomvoice()
|
||||||
|
else:
|
||||||
|
if not os.getenv("VOICE"):
|
||||||
|
return ValueError(
|
||||||
|
f"Please set the environment variable VOICE to a valid voice. options are: {voices}"
|
||||||
|
)
|
||||||
|
voice = str(os.getenv("STREAMLABS_VOICE")).capitalize()
|
||||||
|
body = {"voice": voice, "text": text, "service": "polly"}
|
||||||
|
response = requests.post(self.url, data=body)
|
||||||
|
try:
|
||||||
|
voice_data = requests.get(response.json()["speak_url"])
|
||||||
|
with open(filepath, "wb") as f:
|
||||||
|
f.write(voice_data.content)
|
||||||
|
except (KeyError, JSONDecodeError):
|
||||||
|
print("Error occured calling Streamlabs Polly")
|
||||||
|
|
||||||
|
def randomvoice(self):
|
||||||
|
return random.choice(self.voices)
|
@ -1,24 +0,0 @@
|
|||||||
from os import getenv
|
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
from TTS.GTTS import GTTS
|
|
||||||
from TTS.POLLY import POLLY
|
|
||||||
from TTS.TikTok import TikTok
|
|
||||||
from utils.console import print_substep
|
|
||||||
|
|
||||||
CHOICE_DIR = {"tiktok": TikTok, "gtts": GTTS, "polly": POLLY}
|
|
||||||
|
|
||||||
|
|
||||||
class TTS:
|
|
||||||
def __new__(cls):
|
|
||||||
load_dotenv()
|
|
||||||
try:
|
|
||||||
CHOICE = getenv("TTsChoice").casefold()
|
|
||||||
except AttributeError:
|
|
||||||
print_substep("None defined. Defaulting to 'polly.'")
|
|
||||||
CHOICE = "polly"
|
|
||||||
valid_keys = [key.lower() for key in CHOICE_DIR.keys()]
|
|
||||||
if CHOICE not in valid_keys:
|
|
||||||
raise ValueError(f"{CHOICE} is not valid. Please use one of these {valid_keys} options")
|
|
||||||
return CHOICE_DIR.get(CHOICE)()
|
|
Loading…
Reference in new issue