cherry-picked split text from async-tts-api

pull/963/head
Drugsosos 2 years ago
parent 62ac8fe0b0
commit fee2d936e2
No known key found for this signature in database
GPG Key ID: 8E35176FE617E28D

@ -1,23 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import random
from utils import settings from utils import settings
from gtts import gTTS from gtts import gTTS
max_chars = 0
class GTTS: class GTTS:
def __init__(self): max_chars = 0
self.max_chars = 0
self.voices = []
def run(self, text, filepath): @staticmethod
async def run(
text,
filepath
) -> None:
tts = gTTS( tts = gTTS(
text=text, text=text,
lang=settings.config["reddit"]["thread"]["post_lang"] or "en", lang=settings.config["reddit"]["thread"]["post_lang"] or "en",
slow=False, slow=False,
) )
tts.save(filepath) tts.save(filepath)
def randomvoice(self):
return random.choice(self.voices)

@ -1,102 +1,108 @@
import base64 import base64
from utils import settings from utils import settings
import random
import requests import requests
from requests.adapters import HTTPAdapter, Retry from requests.adapters import HTTPAdapter, Retry
# from profanity_filter import ProfanityFilter from attr import attrs, attrib
# pf = ProfanityFilter() from attr.validators import instance_of
# Code by @JasonLovesDoggo
# https://twitter.com/scanlime/status/1512598559769702406
nonhuman = [ # DISNEY VOICES from TTS.common import BaseApiTTS, get_random_voice
"en_us_ghostface", # Ghost Face
"en_us_chewbacca", # Chewbacca # TTS examples: https://twitter.com/scanlime/status/1512598559769702406
"en_us_c3po", # C3PO
"en_us_stitch", # Stitch voices = dict()
"en_us_stormtrooper", # Stormtrooper
"en_us_rocket", # Rocket voices['nonhuman'] = [ # DISNEY VOICES
'en_us_ghostface', # Ghost Face
'en_us_chewbacca', # Chewbacca
'en_us_c3po', # C3PO
'en_us_stitch', # Stitch
'en_us_stormtrooper', # Stormtrooper
'en_us_rocket', # Rocket
# ENGLISH VOICES # ENGLISH VOICES
] ]
human = [ voices['human'] = [
"en_au_001", # English AU - Female 'en_au_001', # English AU - Female
"en_au_002", # English AU - Male 'en_au_002', # English AU - Male
"en_uk_001", # English UK - Male 1 'en_uk_001', # English UK - Male 1
"en_uk_003", # English UK - Male 2 'en_uk_003', # English UK - Male 2
"en_us_001", # English US - Female (Int. 1) 'en_us_001', # English US - Female (Int. 1)
"en_us_002", # English US - Female (Int. 2) 'en_us_002', # English US - Female (Int. 2)
"en_us_006", # English US - Male 1 'en_us_006', # English US - Male 1
"en_us_007", # English US - Male 2 'en_us_007', # English US - Male 2
"en_us_009", # English US - Male 3 'en_us_009', # English US - Male 3
"en_us_010", 'en_us_010',
] ]
voices = nonhuman + human
noneng = [ voices['non_eng'] = [
"fr_001", # French - Male 1 'fr_001', # French - Male 1
"fr_002", # French - Male 2 'fr_002', # French - Male 2
"de_001", # German - Female 'de_001', # German - Female
"de_002", # German - Male 'de_002', # German - Male
"es_002", # Spanish - Male 'es_002', # Spanish - Male
# AMERICA VOICES # AMERICA VOICES
"es_mx_002", # Spanish MX - Male 'es_mx_002', # Spanish MX - Male
"br_001", # Portuguese BR - Female 1 'br_001', # Portuguese BR - Female 1
"br_003", # Portuguese BR - Female 2 'br_003', # Portuguese BR - Female 2
"br_004", # Portuguese BR - Female 3 'br_004', # Portuguese BR - Female 3
"br_005", # Portuguese BR - Male 'br_005', # Portuguese BR - Male
# ASIA VOICES # ASIA VOICES
"id_001", # Indonesian - Female 'id_001', # Indonesian - Female
"jp_001", # Japanese - Female 1 'jp_001', # Japanese - Female 1
"jp_003", # Japanese - Female 2 'jp_003', # Japanese - Female 2
"jp_005", # Japanese - Female 3 'jp_005', # Japanese - Female 3
"jp_006", # Japanese - Male 'jp_006', # Japanese - Male
"kr_002", # Korean - Male 1 'kr_002', # Korean - Male 1
"kr_003", # Korean - Female 'kr_003', # Korean - Female
"kr_004", # Korean - Male 2 'kr_004', # Korean - Male 2
] ]
# good_voices = {'good': ['en_us_002', 'en_us_006'], # good_voices: 'en_us_002', 'en_us_006'
# 'ok': ['en_au_002', 'en_uk_001']} # less en_us_stormtrooper more less en_us_rocket en_us_ghostface # ok: 'en_au_002', 'en_uk_001'
# less: en_us_stormtrooper
# more or less: en_us_rocket, en_us_ghostface
class TikTok: # TikTok Text-to-Speech Wrapper @attrs(auto_attribs=True)
def __init__(self): class TikTok(BaseApiTTS): # TikTok Text-to-Speech Wrapper
self.URI_BASE = ( random_voice: bool = False
"https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker=" uri_base: str = attrib(
) default='https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/',
self.max_chars = 300 kw_only=True,
self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng} )
max_chars = 300
decode_base64 = True
def run(self, text, filepath, random_voice: bool = False): def __attrs_post_init__(self):
# if censor: self.voice = (
# req_text = pf.censor(req_text) get_random_voice(voices, 'human')
# pass if self.random_voice
voice = ( else str(settings.config['settings']['tts']['tiktok_voice']).lower()
self.randomvoice() if str(settings.config['settings']['tts']['tiktok_voice']).lower() in [
if random_voice voice.lower() for dict_title in voices for voice in voices[dict_title]]
else ( else get_random_voice(voices, 'human')
settings.config["settings"]["tts"]["tiktok_voice"]
or random.choice(self.voices["human"])
)
) )
def make_request(
self,
text: str,
):
try: try:
r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") r = requests.post(
self.uri_base,
params={
'text_speaker': self.voice,
'req_text': text,
'speaker_map_type': 0,
})
except requests.exceptions.SSLError: except requests.exceptions.SSLError:
# https://stackoverflow.com/a/47475019/18516611 # https://stackoverflow.com/a/47475019/18516611
session = requests.Session() session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5) retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry) adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter) session.mount('http://', adapter)
session.mount("https://", adapter) session.mount('https://', adapter)
r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") r = session.post(f'{self.uri_base}{self.voice}&req_text={text}&speaker_map_type=0')
# print(r.text) # print(r.text)
vstr = [r.json()["data"]["v_str"]][0] return r.json()['data']['v_str']
b64d = base64.b64decode(vstr)
with open(filepath, "wb") as out:
out.write(b64d)
def randomvoice(self):
return random.choice(self.voices["human"])

@ -1,50 +1,58 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from boto3 import Session from boto3 import Session
from botocore.exceptions import BotoCoreError, ClientError, ProfileNotFound from botocore.exceptions import BotoCoreError, ClientError, ProfileNotFound
import sys import sys
from utils import settings from utils import settings
import random from attr import attrs
from TTS.common import get_random_voice
voices = [ voices = [
"Brian", 'Brian',
"Emma", 'Emma',
"Russell", 'Russell',
"Joey", 'Joey',
"Matthew", 'Matthew',
"Joanna", 'Joanna',
"Kimberly", 'Kimberly',
"Amy", 'Amy',
"Geraint", 'Geraint',
"Nicole", 'Nicole',
"Justin", 'Justin',
"Ivy", 'Ivy',
"Kendra", 'Kendra',
"Salli", 'Salli',
"Raveena", 'Raveena',
] ]
@attrs(auto_attribs=True)
class AWSPolly: class AWSPolly:
def __init__(self): random_voice: bool = False
self.max_chars = 0 max_chars: int = 0
self.voices = voices
def run(self, text, filepath, random_voice: bool = False): def run(
self,
text,
filepath,
):
try: try:
session = Session(profile_name="polly") session = Session(profile_name='polly')
polly = session.client("polly") polly = session.client('polly')
if random_voice: voice = (
voice = self.randomvoice() get_random_voice(voices)
else: if self.random_voice
if not settings.config["settings"]["tts"]["aws_polly_voice"]: else str(settings.config['settings']['tts']['aws_polly_voice']).capitalize()
raise ValueError( if str(settings.config['settings']['tts']['aws_polly_voice']).lower() in [voice.lower() for voice in
f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}" voices]
) else get_random_voice(voices)
voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize() )
try: try:
# Request speech synthesis # Request speech synthesis
response = polly.synthesize_speech( response = polly.synthesize_speech(
Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural" Text=text, OutputFormat='mp3', VoiceId=voice, Engine='neural'
) )
except (BotoCoreError, ClientError) as error: except (BotoCoreError, ClientError) as error:
# The service returned an error, exit gracefully # The service returned an error, exit gracefully
@ -52,15 +60,15 @@ class AWSPolly:
sys.exit(-1) sys.exit(-1)
# Access the audio stream from the response # Access the audio stream from the response
if "AudioStream" in response: if 'AudioStream' in response:
file = open(filepath, "wb") file = open(filepath, 'wb')
file.write(response["AudioStream"].read()) file.write(response['AudioStream'].read())
file.close() file.close()
# print_substep(f"Saved Text {idx} to MP3 files successfully.", style="bold green") # print_substep(f"Saved Text {idx} to MP3 files successfully.", style="bold green")
else: else:
# The response didn't contain audio data, exit gracefully # The response didn't contain audio data, exit gracefully
print("Could not stream audio") print('Could not stream audio')
sys.exit(-1) sys.exit(-1)
except ProfileNotFound: except ProfileNotFound:
print("You need to install the AWS CLI and configure your profile") print("You need to install the AWS CLI and configure your profile")
@ -71,6 +79,3 @@ class AWSPolly:
""" """
) )
sys.exit(-1) sys.exit(-1)
def randomvoice(self):
return random.choice(self.voices)

@ -1,3 +1,61 @@
import base64
from random import choice
from typing import Union, Optional
class BaseApiTTS:
max_chars: int
decode_base64: bool = False
@staticmethod
def text_len_sanitize(
text: str,
max_length: int,
) -> list:
# Split by comma or dot (else you can lose intonations), if there is non, split by groups of 299 chars
if '.' in text and all([split_text.__len__() < max_length for split_text in text.split('.')]):
return text.split('.')
if ',' in text and all([split_text.__len__() < max_length for split_text in text.split(',')]):
return text.split(',')
return [text[i:i + max_length] for i in range(0, len(text), max_length)]
def write_file(
self,
output_text: str,
filepath: str,
) -> None:
decoded_text = base64.b64decode(output_text) if self.decode_base64 else output_text
with open(filepath, 'wb') as out:
out.write(decoded_text)
def run(
self,
text: str,
filepath: str,
) -> None:
output_text = ''
if len(text) > self.max_chars:
for part in self.text_len_sanitize(text, self.max_chars):
if part:
output_text += self.make_request(part)
else:
output_text = self.make_request(text)
self.write_file(output_text, filepath)
def get_random_voice(
voices: Union[list, dict],
key: Optional[str] = None,
) -> str:
if isinstance(voices, list):
return choice(voices)
else:
return choice(voices[key])
def audio_length( def audio_length(
path: str, path: str,
) -> float | int: ) -> float | int:

@ -74,33 +74,6 @@ class TTSEngine:
if condition if condition
] ]
def split_post(self, text: str, idx: int):
split_files = []
split_text = [
x.group().strip()
for x in re.finditer(
r" *(((.|\n){0," + str(self.tts_module().max_chars) + "})(\.|.$))", text
)
]
offset = 0
for idy, text_cut in enumerate(split_text):
# print(f"{idx}-{idy}: {text_cut}\n")
if not text_cut or text_cut.isspace():
offset += 1
continue
self.call_tts(f"{idx}-{idy - offset}.part", text_cut)
split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy - offset}.part.mp3"))
CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile(
f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None
)
for i in split_files:
name = i.filename
i.close()
Path(name).unlink()
def call_tts( def call_tts(
self, self,
filename: str, filename: str,
@ -114,7 +87,7 @@ class TTSEngine:
filepath=f'{self.path}/{filename}.mp3' filepath=f'{self.path}/{filename}.mp3'
) )
clip_length = audio_length(f'assets/temp/mp3/{filename}.mp3') clip_length = audio_length(f'{self.path}/{filename}.mp3')
if clip_length and self.__total_length + clip_length <= self.max_length: if clip_length and self.__total_length + clip_length <= self.max_length:
self.__total_length += clip_length self.__total_length += clip_length

@ -1,62 +1,71 @@
import random
import requests import requests
from requests.exceptions import JSONDecodeError from requests.exceptions import JSONDecodeError
from utils import settings from utils import settings
from attr import attrs, attrib
from TTS.common import BaseApiTTS, get_random_voice
from utils.voice import check_ratelimit from utils.voice import check_ratelimit
voices = [ voices = [
"Brian", 'Brian',
"Emma", 'Emma',
"Russell", 'Russell',
"Joey", 'Joey',
"Matthew", 'Matthew',
"Joanna", 'Joanna',
"Kimberly", 'Kimberly',
"Amy", 'Amy',
"Geraint", 'Geraint',
"Nicole", 'Nicole',
"Justin", 'Justin',
"Ivy", 'Ivy',
"Kendra", 'Kendra',
"Salli", 'Salli',
"Raveena", 'Raveena',
] ]
# valid voices https://lazypy.ro/tts/ # valid voices https://lazypy.ro/tts/
class StreamlabsPolly: @attrs(auto_attribs=True)
def __init__(self): class StreamlabsPolly(BaseApiTTS):
self.url = "https://streamlabs.com/polly/speak" random_voice: bool = False
self.max_chars = 550 url: str = attrib(
self.voices = voices default='https://streamlabs.com/polly/speak',
kw_only=True,
)
def run(self, text, filepath, random_voice: bool = False): max_chars = 550
if random_voice:
voice = self.randomvoice()
else:
if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]:
raise ValueError(
f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}"
)
voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize()
body = {"voice": voice, "text": text, "service": "polly"}
response = requests.post(self.url, data=body)
if not check_ratelimit(response):
self.run(text, filepath, random_voice)
def make_request(
self,
text,
):
voice = (
get_random_voice(voices)
if self.random_voice
else str(settings.config['settings']['tts']['streamlabs_polly_voice']).capitalize()
if str(settings.config['settings']['tts']['streamlabs_polly_voice']).lower() in [
voice.lower() for voice in voices]
else get_random_voice(voices)
)
response = requests.post(
self.url,
data={
'voice': voice,
'text': text,
'service': 'polly',
})
if not check_ratelimit(response):
return self.make_request(text)
else: else:
try: try:
voice_data = requests.get(response.json()["speak_url"]) results = requests.get(response.json()['speak_url'])
with open(filepath, "wb") as f: return results
f.write(voice_data.content)
except (KeyError, JSONDecodeError): except (KeyError, JSONDecodeError):
try: try:
if response.json()["error"] == "No text specified!": if response.json()['error'] == 'No text specified!':
raise ValueError("Please specify a text to convert to speech.") raise ValueError('Please specify a text to convert to speech.')
except (KeyError, JSONDecodeError): except (KeyError, JSONDecodeError):
print("Error occurred calling Streamlabs Polly") print('Error occurred calling Streamlabs Polly')
def randomvoice(self):
return random.choice(self.voices)

@ -10,7 +10,9 @@ if sys.version_info[0] >= 3:
from datetime import timezone from datetime import timezone
def check_ratelimit(response: Response): def check_ratelimit(
response: Response
):
""" """
Checks if the response is a ratelimit response. Checks if the response is a ratelimit response.
If it is, it sleeps for the time specified in the response. If it is, it sleeps for the time specified in the response.

Loading…
Cancel
Save