From fee2d936e2a5d29c9efd3cc6da5c2b6b1d6d888c Mon Sep 17 00:00:00 2001 From: Drugsosos <44712637+Drugsosos@users.noreply.github.com> Date: Wed, 13 Jul 2022 00:09:49 +0300 Subject: [PATCH] cherry-picked split text from async-tts-api --- TTS/GTTS.py | 16 ++-- TTS/TikTok.py | 158 +++++++++++++++++++++------------------- TTS/aws_polly.py | 81 ++++++++++---------- TTS/common.py | 58 +++++++++++++++ TTS/engine_wrapper.py | 29 +------- TTS/streamlabs_polly.py | 95 +++++++++++++----------- utils/voice.py | 4 +- 7 files changed, 245 insertions(+), 196 deletions(-) diff --git a/TTS/GTTS.py b/TTS/GTTS.py index 31e29df..c8d6ae8 100644 --- a/TTS/GTTS.py +++ b/TTS/GTTS.py @@ -1,23 +1,19 @@ #!/usr/bin/env python3 -import random from utils import settings from gtts import gTTS -max_chars = 0 - class GTTS: - def __init__(self): - self.max_chars = 0 - self.voices = [] + max_chars = 0 - def run(self, text, filepath): + @staticmethod + async def run( + text, + filepath + ) -> None: tts = gTTS( text=text, lang=settings.config["reddit"]["thread"]["post_lang"] or "en", slow=False, ) tts.save(filepath) - - def randomvoice(self): - return random.choice(self.voices) diff --git a/TTS/TikTok.py b/TTS/TikTok.py index 9fa83b7..6a23bb8 100644 --- a/TTS/TikTok.py +++ b/TTS/TikTok.py @@ -1,102 +1,108 @@ import base64 from utils import settings -import random import requests from requests.adapters import HTTPAdapter, Retry -# from profanity_filter import ProfanityFilter -# pf = ProfanityFilter() -# Code by @JasonLovesDoggo -# https://twitter.com/scanlime/status/1512598559769702406 +from attr import attrs, attrib +from attr.validators import instance_of -nonhuman = [ # DISNEY VOICES - "en_us_ghostface", # Ghost Face - "en_us_chewbacca", # Chewbacca - "en_us_c3po", # C3PO - "en_us_stitch", # Stitch - "en_us_stormtrooper", # Stormtrooper - "en_us_rocket", # Rocket +from TTS.common import BaseApiTTS, get_random_voice + +# TTS examples: https://twitter.com/scanlime/status/1512598559769702406 + +voices = dict() + +voices['nonhuman'] = [ # DISNEY VOICES + 'en_us_ghostface', # Ghost Face + 'en_us_chewbacca', # Chewbacca + 'en_us_c3po', # C3PO + 'en_us_stitch', # Stitch + 'en_us_stormtrooper', # Stormtrooper + 'en_us_rocket', # Rocket # ENGLISH VOICES ] -human = [ - "en_au_001", # English AU - Female - "en_au_002", # English AU - Male - "en_uk_001", # English UK - Male 1 - "en_uk_003", # English UK - Male 2 - "en_us_001", # English US - Female (Int. 1) - "en_us_002", # English US - Female (Int. 2) - "en_us_006", # English US - Male 1 - "en_us_007", # English US - Male 2 - "en_us_009", # English US - Male 3 - "en_us_010", +voices['human'] = [ + 'en_au_001', # English AU - Female + 'en_au_002', # English AU - Male + 'en_uk_001', # English UK - Male 1 + 'en_uk_003', # English UK - Male 2 + 'en_us_001', # English US - Female (Int. 1) + 'en_us_002', # English US - Female (Int. 2) + 'en_us_006', # English US - Male 1 + 'en_us_007', # English US - Male 2 + 'en_us_009', # English US - Male 3 + 'en_us_010', ] -voices = nonhuman + human -noneng = [ - "fr_001", # French - Male 1 - "fr_002", # French - Male 2 - "de_001", # German - Female - "de_002", # German - Male - "es_002", # Spanish - Male +voices['non_eng'] = [ + 'fr_001', # French - Male 1 + 'fr_002', # French - Male 2 + 'de_001', # German - Female + 'de_002', # German - Male + 'es_002', # Spanish - Male # AMERICA VOICES - "es_mx_002", # Spanish MX - Male - "br_001", # Portuguese BR - Female 1 - "br_003", # Portuguese BR - Female 2 - "br_004", # Portuguese BR - Female 3 - "br_005", # Portuguese BR - Male + 'es_mx_002', # Spanish MX - Male + 'br_001', # Portuguese BR - Female 1 + 'br_003', # Portuguese BR - Female 2 + 'br_004', # Portuguese BR - Female 3 + 'br_005', # Portuguese BR - Male # ASIA VOICES - "id_001", # Indonesian - Female - "jp_001", # Japanese - Female 1 - "jp_003", # Japanese - Female 2 - "jp_005", # Japanese - Female 3 - "jp_006", # Japanese - Male - "kr_002", # Korean - Male 1 - "kr_003", # Korean - Female - "kr_004", # Korean - Male 2 + 'id_001', # Indonesian - Female + 'jp_001', # Japanese - Female 1 + 'jp_003', # Japanese - Female 2 + 'jp_005', # Japanese - Female 3 + 'jp_006', # Japanese - Male + 'kr_002', # Korean - Male 1 + 'kr_003', # Korean - Female + 'kr_004', # Korean - Male 2 ] -# good_voices = {'good': ['en_us_002', 'en_us_006'], -# 'ok': ['en_au_002', 'en_uk_001']} # less en_us_stormtrooper more less en_us_rocket en_us_ghostface +# good_voices: 'en_us_002', 'en_us_006' +# ok: 'en_au_002', 'en_uk_001' +# less: en_us_stormtrooper +# more or less: en_us_rocket, en_us_ghostface -class TikTok: # TikTok Text-to-Speech Wrapper - def __init__(self): - self.URI_BASE = ( - "https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/?text_speaker=" - ) - self.max_chars = 300 - self.voices = {"human": human, "nonhuman": nonhuman, "noneng": noneng} +@attrs(auto_attribs=True) +class TikTok(BaseApiTTS): # TikTok Text-to-Speech Wrapper + random_voice: bool = False + uri_base: str = attrib( + default='https://api16-normal-useast5.us.tiktokv.com/media/api/text/speech/invoke/', + kw_only=True, + ) + max_chars = 300 + decode_base64 = True - def run(self, text, filepath, random_voice: bool = False): - # if censor: - # req_text = pf.censor(req_text) - # pass - voice = ( - self.randomvoice() - if random_voice - else ( - settings.config["settings"]["tts"]["tiktok_voice"] - or random.choice(self.voices["human"]) - ) + def __attrs_post_init__(self): + self.voice = ( + get_random_voice(voices, 'human') + if self.random_voice + else str(settings.config['settings']['tts']['tiktok_voice']).lower() + if str(settings.config['settings']['tts']['tiktok_voice']).lower() in [ + voice.lower() for dict_title in voices for voice in voices[dict_title]] + else get_random_voice(voices, 'human') ) + + def make_request( + self, + text: str, + ): try: - r = requests.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") + r = requests.post( + self.uri_base, + params={ + 'text_speaker': self.voice, + 'req_text': text, + 'speaker_map_type': 0, + }) except requests.exceptions.SSLError: # https://stackoverflow.com/a/47475019/18516611 session = requests.Session() retry = Retry(connect=3, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) - session.mount("http://", adapter) - session.mount("https://", adapter) - r = session.post(f"{self.URI_BASE}{voice}&req_text={text}&speaker_map_type=0") + session.mount('http://', adapter) + session.mount('https://', adapter) + r = session.post(f'{self.uri_base}{self.voice}&req_text={text}&speaker_map_type=0') # print(r.text) - vstr = [r.json()["data"]["v_str"]][0] - b64d = base64.b64decode(vstr) - - - with open(filepath, "wb") as out: - out.write(b64d) - - def randomvoice(self): - return random.choice(self.voices["human"]) + return r.json()['data']['v_str'] diff --git a/TTS/aws_polly.py b/TTS/aws_polly.py index efd762b..9d52f6f 100644 --- a/TTS/aws_polly.py +++ b/TTS/aws_polly.py @@ -1,50 +1,58 @@ #!/usr/bin/env python3 from boto3 import Session from botocore.exceptions import BotoCoreError, ClientError, ProfileNotFound + import sys from utils import settings -import random +from attr import attrs + +from TTS.common import get_random_voice + voices = [ - "Brian", - "Emma", - "Russell", - "Joey", - "Matthew", - "Joanna", - "Kimberly", - "Amy", - "Geraint", - "Nicole", - "Justin", - "Ivy", - "Kendra", - "Salli", - "Raveena", + 'Brian', + 'Emma', + 'Russell', + 'Joey', + 'Matthew', + 'Joanna', + 'Kimberly', + 'Amy', + 'Geraint', + 'Nicole', + 'Justin', + 'Ivy', + 'Kendra', + 'Salli', + 'Raveena', ] +@attrs(auto_attribs=True) class AWSPolly: - def __init__(self): - self.max_chars = 0 - self.voices = voices + random_voice: bool = False + max_chars: int = 0 - def run(self, text, filepath, random_voice: bool = False): + def run( + self, + text, + filepath, + ): try: - session = Session(profile_name="polly") - polly = session.client("polly") - if random_voice: - voice = self.randomvoice() - else: - if not settings.config["settings"]["tts"]["aws_polly_voice"]: - raise ValueError( - f"Please set the TOML variable AWS_VOICE to a valid voice. options are: {voices}" - ) - voice = str(settings.config["settings"]["tts"]["aws_polly_voice"]).capitalize() + session = Session(profile_name='polly') + polly = session.client('polly') + voice = ( + get_random_voice(voices) + if self.random_voice + else str(settings.config['settings']['tts']['aws_polly_voice']).capitalize() + if str(settings.config['settings']['tts']['aws_polly_voice']).lower() in [voice.lower() for voice in + voices] + else get_random_voice(voices) + ) try: # Request speech synthesis response = polly.synthesize_speech( - Text=text, OutputFormat="mp3", VoiceId=voice, Engine="neural" + Text=text, OutputFormat='mp3', VoiceId=voice, Engine='neural' ) except (BotoCoreError, ClientError) as error: # The service returned an error, exit gracefully @@ -52,15 +60,15 @@ class AWSPolly: sys.exit(-1) # Access the audio stream from the response - if "AudioStream" in response: - file = open(filepath, "wb") - file.write(response["AudioStream"].read()) + if 'AudioStream' in response: + file = open(filepath, 'wb') + file.write(response['AudioStream'].read()) file.close() # print_substep(f"Saved Text {idx} to MP3 files successfully.", style="bold green") else: # The response didn't contain audio data, exit gracefully - print("Could not stream audio") + print('Could not stream audio') sys.exit(-1) except ProfileNotFound: print("You need to install the AWS CLI and configure your profile") @@ -71,6 +79,3 @@ class AWSPolly: """ ) sys.exit(-1) - - def randomvoice(self): - return random.choice(self.voices) diff --git a/TTS/common.py b/TTS/common.py index a56444e..73884f4 100644 --- a/TTS/common.py +++ b/TTS/common.py @@ -1,3 +1,61 @@ +import base64 +from random import choice +from typing import Union, Optional + + +class BaseApiTTS: + max_chars: int + decode_base64: bool = False + + @staticmethod + def text_len_sanitize( + text: str, + max_length: int, + ) -> list: + # Split by comma or dot (else you can lose intonations), if there is non, split by groups of 299 chars + if '.' in text and all([split_text.__len__() < max_length for split_text in text.split('.')]): + return text.split('.') + + if ',' in text and all([split_text.__len__() < max_length for split_text in text.split(',')]): + return text.split(',') + + return [text[i:i + max_length] for i in range(0, len(text), max_length)] + + def write_file( + self, + output_text: str, + filepath: str, + ) -> None: + decoded_text = base64.b64decode(output_text) if self.decode_base64 else output_text + + with open(filepath, 'wb') as out: + out.write(decoded_text) + + def run( + self, + text: str, + filepath: str, + ) -> None: + output_text = '' + if len(text) > self.max_chars: + for part in self.text_len_sanitize(text, self.max_chars): + if part: + output_text += self.make_request(part) + else: + output_text = self.make_request(text) + self.write_file(output_text, filepath) + + +def get_random_voice( + voices: Union[list, dict], + key: Optional[str] = None, +) -> str: + if isinstance(voices, list): + return choice(voices) + else: + return choice(voices[key]) + + def audio_length( path: str, ) -> float | int: diff --git a/TTS/engine_wrapper.py b/TTS/engine_wrapper.py index 762aa47..b968015 100644 --- a/TTS/engine_wrapper.py +++ b/TTS/engine_wrapper.py @@ -74,33 +74,6 @@ class TTSEngine: if condition ] - def split_post(self, text: str, idx: int): - split_files = [] - split_text = [ - x.group().strip() - for x in re.finditer( - r" *(((.|\n){0," + str(self.tts_module().max_chars) + "})(\.|.$))", text - ) - ] - offset = 0 - for idy, text_cut in enumerate(split_text): - # print(f"{idx}-{idy}: {text_cut}\n") - if not text_cut or text_cut.isspace(): - offset += 1 - continue - - self.call_tts(f"{idx}-{idy - offset}.part", text_cut) - split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy - offset}.part.mp3")) - - CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile( - f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None - ) - - for i in split_files: - name = i.filename - i.close() - Path(name).unlink() - def call_tts( self, filename: str, @@ -114,7 +87,7 @@ class TTSEngine: filepath=f'{self.path}/{filename}.mp3' ) - clip_length = audio_length(f'assets/temp/mp3/{filename}.mp3') + clip_length = audio_length(f'{self.path}/{filename}.mp3') if clip_length and self.__total_length + clip_length <= self.max_length: self.__total_length += clip_length diff --git a/TTS/streamlabs_polly.py b/TTS/streamlabs_polly.py index 75c4f49..d2b765a 100644 --- a/TTS/streamlabs_polly.py +++ b/TTS/streamlabs_polly.py @@ -1,62 +1,71 @@ -import random import requests from requests.exceptions import JSONDecodeError from utils import settings +from attr import attrs, attrib + +from TTS.common import BaseApiTTS, get_random_voice from utils.voice import check_ratelimit voices = [ - "Brian", - "Emma", - "Russell", - "Joey", - "Matthew", - "Joanna", - "Kimberly", - "Amy", - "Geraint", - "Nicole", - "Justin", - "Ivy", - "Kendra", - "Salli", - "Raveena", + 'Brian', + 'Emma', + 'Russell', + 'Joey', + 'Matthew', + 'Joanna', + 'Kimberly', + 'Amy', + 'Geraint', + 'Nicole', + 'Justin', + 'Ivy', + 'Kendra', + 'Salli', + 'Raveena', ] # valid voices https://lazypy.ro/tts/ -class StreamlabsPolly: - def __init__(self): - self.url = "https://streamlabs.com/polly/speak" - self.max_chars = 550 - self.voices = voices +@attrs(auto_attribs=True) +class StreamlabsPolly(BaseApiTTS): + random_voice: bool = False + url: str = attrib( + default='https://streamlabs.com/polly/speak', + kw_only=True, + ) - def run(self, text, filepath, random_voice: bool = False): - if random_voice: - voice = self.randomvoice() - else: - if not settings.config["settings"]["tts"]["streamlabs_polly_voice"]: - raise ValueError( - f"Please set the config variable STREAMLABS_POLLY_VOICE to a valid voice. options are: {voices}" - ) - voice = str(settings.config["settings"]["tts"]["streamlabs_polly_voice"]).capitalize() - body = {"voice": voice, "text": text, "service": "polly"} - response = requests.post(self.url, data=body) - if not check_ratelimit(response): - self.run(text, filepath, random_voice) + max_chars = 550 + def make_request( + self, + text, + ): + voice = ( + get_random_voice(voices) + if self.random_voice + else str(settings.config['settings']['tts']['streamlabs_polly_voice']).capitalize() + if str(settings.config['settings']['tts']['streamlabs_polly_voice']).lower() in [ + voice.lower() for voice in voices] + else get_random_voice(voices) + ) + response = requests.post( + self.url, + data={ + 'voice': voice, + 'text': text, + 'service': 'polly', + }) + if not check_ratelimit(response): + return self.make_request(text) else: try: - voice_data = requests.get(response.json()["speak_url"]) - with open(filepath, "wb") as f: - f.write(voice_data.content) + results = requests.get(response.json()['speak_url']) + return results except (KeyError, JSONDecodeError): try: - if response.json()["error"] == "No text specified!": - raise ValueError("Please specify a text to convert to speech.") + if response.json()['error'] == 'No text specified!': + raise ValueError('Please specify a text to convert to speech.') except (KeyError, JSONDecodeError): - print("Error occurred calling Streamlabs Polly") - - def randomvoice(self): - return random.choice(self.voices) + print('Error occurred calling Streamlabs Polly') diff --git a/utils/voice.py b/utils/voice.py index a0709fa..7d20b1b 100644 --- a/utils/voice.py +++ b/utils/voice.py @@ -10,7 +10,9 @@ if sys.version_info[0] >= 3: from datetime import timezone -def check_ratelimit(response: Response): +def check_ratelimit( + response: Response +): """ Checks if the response is a ratelimit response. If it is, it sleeps for the time specified in the response.