|
|
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
# You may obtain a copy of the License at
|
|
|
#
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
#
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
import os
|
|
|
import shutil
|
|
|
from pathlib import Path
|
|
|
|
|
|
import librosa
|
|
|
import numpy as np
|
|
|
import pypinyin
|
|
|
from praatio import textgrid
|
|
|
|
|
|
from paddlespeech.t2s.exps.ernie_sat.utils import get_dict
|
|
|
from paddlespeech.t2s.exps.ernie_sat.utils import get_tmp_name
|
|
|
|
|
|
DICT_EN = 'tools/aligner/cmudict-0.7b'
|
|
|
DICT_ZH = 'tools/aligner/simple.lexicon'
|
|
|
MODEL_DIR_EN = 'tools/aligner/vctk_model.zip'
|
|
|
MODEL_DIR_ZH = 'tools/aligner/aishell3_model.zip'
|
|
|
MFA_PATH = 'tools/montreal-forced-aligner/bin'
|
|
|
os.environ['PATH'] = MFA_PATH + '/:' + os.environ['PATH']
|
|
|
|
|
|
|
|
|
def _get_max_idx(dic):
|
|
|
return sorted([int(key.split('_')[0]) for key in dic.keys()])[-1]
|
|
|
|
|
|
|
|
|
def _readtg(tg_path: str, lang: str='en', fs: int=24000, n_shift: int=300):
|
|
|
alignment = textgrid.openTextgrid(tg_path, includeEmptyIntervals=True)
|
|
|
phones = []
|
|
|
ends = []
|
|
|
words = []
|
|
|
|
|
|
for interval in alignment.tierDict['words'].entryList:
|
|
|
word = interval.label
|
|
|
if word:
|
|
|
words.append(word)
|
|
|
for interval in alignment.tierDict['phones'].entryList:
|
|
|
phone = interval.label
|
|
|
phones.append(phone)
|
|
|
ends.append(interval.end)
|
|
|
frame_pos = librosa.time_to_frames(ends, sr=fs, hop_length=n_shift)
|
|
|
durations = np.diff(frame_pos, prepend=0)
|
|
|
assert len(durations) == len(phones)
|
|
|
# merge '' and sp in the end
|
|
|
if phones[-1] == '' and len(phones) > 1 and phones[-2] == 'sp':
|
|
|
phones = phones[:-1]
|
|
|
durations[-2] += durations[-1]
|
|
|
durations = durations[:-1]
|
|
|
|
|
|
# replace ' and 'sil' with 'sp'
|
|
|
phones = ['sp' if (phn == '' or phn == 'sil') else phn for phn in phones]
|
|
|
|
|
|
if lang == 'en':
|
|
|
DICT = DICT_EN
|
|
|
|
|
|
elif lang == 'zh':
|
|
|
DICT = DICT_ZH
|
|
|
|
|
|
word2phns_dict = get_dict(DICT)
|
|
|
|
|
|
phn2word_dict = []
|
|
|
for word in words:
|
|
|
if lang == 'en':
|
|
|
word = word.upper()
|
|
|
phn2word_dict.append([word2phns_dict[word].split(), word])
|
|
|
|
|
|
non_sp_idx = 0
|
|
|
word_idx = 0
|
|
|
i = 0
|
|
|
word2phns = {}
|
|
|
while i < len(phones):
|
|
|
phn = phones[i]
|
|
|
if phn == 'sp':
|
|
|
word2phns[str(word_idx) + '_sp'] = ['sp']
|
|
|
i += 1
|
|
|
else:
|
|
|
phns, word = phn2word_dict[non_sp_idx]
|
|
|
word2phns[str(word_idx) + '_' + word] = phns
|
|
|
non_sp_idx += 1
|
|
|
i += len(phns)
|
|
|
word_idx += 1
|
|
|
sum_phn = sum(len(word2phns[k]) for k in word2phns)
|
|
|
assert sum_phn == len(phones)
|
|
|
|
|
|
results = ''
|
|
|
for (p, d) in zip(phones, durations):
|
|
|
results += p + ' ' + str(d) + ' '
|
|
|
return results.strip(), word2phns
|
|
|
|
|
|
|
|
|
def alignment(wav_path: str,
|
|
|
text: str,
|
|
|
fs: int=24000,
|
|
|
lang='en',
|
|
|
n_shift: int=300):
|
|
|
wav_name = os.path.basename(wav_path)
|
|
|
utt = wav_name.split('.')[0]
|
|
|
# prepare data for MFA
|
|
|
tmp_name = get_tmp_name(text=text)
|
|
|
tmpbase = './tmp_dir/' + tmp_name
|
|
|
tmpbase = Path(tmpbase)
|
|
|
tmpbase.mkdir(parents=True, exist_ok=True)
|
|
|
print("tmp_name in alignment:", tmp_name)
|
|
|
|
|
|
shutil.copyfile(wav_path, tmpbase / wav_name)
|
|
|
txt_name = utt + '.txt'
|
|
|
txt_path = tmpbase / txt_name
|
|
|
with open(txt_path, 'w') as wf:
|
|
|
wf.write(text + '\n')
|
|
|
# MFA
|
|
|
if lang == 'en':
|
|
|
DICT = DICT_EN
|
|
|
MODEL_DIR = MODEL_DIR_EN
|
|
|
|
|
|
elif lang == 'zh':
|
|
|
DICT = DICT_ZH
|
|
|
MODEL_DIR = MODEL_DIR_ZH
|
|
|
else:
|
|
|
print('please input right lang!!')
|
|
|
|
|
|
CMD = 'mfa_align' + ' ' + str(
|
|
|
tmpbase) + ' ' + DICT + ' ' + MODEL_DIR + ' ' + str(tmpbase)
|
|
|
os.system(CMD)
|
|
|
tg_path = str(tmpbase) + '/' + tmp_name + '/' + utt + '.TextGrid'
|
|
|
phn_dur, word2phns = _readtg(tg_path, lang=lang)
|
|
|
phn_dur = phn_dur.split()
|
|
|
phns = phn_dur[::2]
|
|
|
durs = phn_dur[1::2]
|
|
|
durs = [int(d) for d in durs]
|
|
|
assert len(phns) == len(durs)
|
|
|
return phns, durs, word2phns
|
|
|
|
|
|
|
|
|
def words2phns(text: str, lang='en'):
|
|
|
'''
|
|
|
Args:
|
|
|
text (str):
|
|
|
input text.
|
|
|
eg: for that reason cover is impossible to be given.
|
|
|
lang (str):
|
|
|
'en' or 'zh'
|
|
|
Returns:
|
|
|
List[str]: phones of input text.
|
|
|
eg:
|
|
|
['F', 'AO1', 'R', 'DH', 'AE1', 'T', 'R', 'IY1', 'Z', 'AH0', 'N', 'K', 'AH1', 'V', 'ER0',
|
|
|
'IH1', 'Z', 'IH2', 'M', 'P', 'AA1', 'S', 'AH0', 'B', 'AH0', 'L', 'T', 'UW1', 'B', 'IY1',
|
|
|
'G', 'IH1', 'V', 'AH0', 'N']
|
|
|
|
|
|
Dict(str, str): key - idx_word
|
|
|
value - phones
|
|
|
eg:
|
|
|
{'0_FOR': ['F', 'AO1', 'R'], '1_THAT': ['DH', 'AE1', 'T'],
|
|
|
'2_REASON': ['R', 'IY1', 'Z', 'AH0', 'N'],'3_COVER': ['K', 'AH1', 'V', 'ER0'], '4_IS': ['IH1', 'Z'],
|
|
|
'5_IMPOSSIBLE': ['IH2', 'M', 'P', 'AA1', 'S', 'AH0', 'B', 'AH0', 'L'],
|
|
|
'6_TO': ['T', 'UW1'], '7_BE': ['B', 'IY1'], '8_GIVEN': ['G', 'IH1', 'V', 'AH0', 'N']}
|
|
|
'''
|
|
|
text = text.strip()
|
|
|
words = []
|
|
|
for pun in [
|
|
|
',', '.', ':', ';', '!', '?', '"', '(', ')', '--', '---', u',',
|
|
|
u'。', u':', u';', u'!', u'?', u'(', u')'
|
|
|
]:
|
|
|
text = text.replace(pun, ' ')
|
|
|
for wrd in text.split():
|
|
|
if (wrd[-1] == '-'):
|
|
|
wrd = wrd[:-1]
|
|
|
if (wrd[0] == "'"):
|
|
|
wrd = wrd[1:]
|
|
|
if wrd:
|
|
|
words.append(wrd)
|
|
|
if lang == 'en':
|
|
|
dictfile = DICT_EN
|
|
|
elif lang == 'zh':
|
|
|
dictfile = DICT_ZH
|
|
|
else:
|
|
|
print('please input right lang!!')
|
|
|
|
|
|
word2phns_dict = get_dict(dictfile)
|
|
|
ds = word2phns_dict.keys()
|
|
|
phns = []
|
|
|
wrd2phns = {}
|
|
|
for index, wrd in enumerate(words):
|
|
|
if lang == 'en':
|
|
|
wrd = wrd.upper()
|
|
|
if (wrd not in ds):
|
|
|
wrd2phns[str(index) + '_' + wrd] = 'spn'
|
|
|
phns.extend('spn')
|
|
|
else:
|
|
|
wrd2phns[str(index) + '_' + wrd] = word2phns_dict[wrd].split()
|
|
|
phns.extend(word2phns_dict[wrd].split())
|
|
|
return phns, wrd2phns
|
|
|
|
|
|
|
|
|
def get_phns_spans(wav_path: str,
|
|
|
old_str: str='',
|
|
|
new_str: str='',
|
|
|
source_lang: str='en',
|
|
|
target_lang: str='en',
|
|
|
fs: int=24000,
|
|
|
n_shift: int=300):
|
|
|
is_append = (old_str == new_str[:len(old_str)])
|
|
|
old_phns, mfa_start, mfa_end = [], [], []
|
|
|
# source
|
|
|
lang = source_lang
|
|
|
phn, dur, w2p = alignment(
|
|
|
wav_path=wav_path, text=old_str, lang=lang, fs=fs, n_shift=n_shift)
|
|
|
|
|
|
new_d_cumsum = np.pad(np.array(dur).cumsum(0), (1, 0), 'constant').tolist()
|
|
|
mfa_start = new_d_cumsum[:-1]
|
|
|
mfa_end = new_d_cumsum[1:]
|
|
|
old_phns = phn
|
|
|
|
|
|
# target
|
|
|
if is_append and (source_lang != target_lang):
|
|
|
cross_lingual_clone = True
|
|
|
else:
|
|
|
cross_lingual_clone = False
|
|
|
|
|
|
if cross_lingual_clone:
|
|
|
str_origin = new_str[:len(old_str)]
|
|
|
str_append = new_str[len(old_str):]
|
|
|
|
|
|
if target_lang == 'zh':
|
|
|
phns_origin, origin_w2p = words2phns(str_origin, lang='en')
|
|
|
phns_append, append_w2p_tmp = words2phns(str_append, lang='zh')
|
|
|
elif target_lang == 'en':
|
|
|
# 原始句子
|
|
|
phns_origin, origin_w2p = words2phns(str_origin, lang='zh')
|
|
|
# clone 句子
|
|
|
phns_append, append_w2p_tmp = words2phns(str_append, lang='en')
|
|
|
else:
|
|
|
assert target_lang == 'zh' or target_lang == 'en', \
|
|
|
'cloning is not support for this language, please check it.'
|
|
|
|
|
|
new_phns = phns_origin + phns_append
|
|
|
|
|
|
append_w2p = {}
|
|
|
length = len(origin_w2p)
|
|
|
for key, value in append_w2p_tmp.items():
|
|
|
idx, wrd = key.split('_')
|
|
|
append_w2p[str(int(idx) + length) + '_' + wrd] = value
|
|
|
new_w2p = origin_w2p.copy()
|
|
|
new_w2p.update(append_w2p)
|
|
|
|
|
|
else:
|
|
|
if source_lang == target_lang:
|
|
|
new_phns, new_w2p = words2phns(new_str, lang=source_lang)
|
|
|
else:
|
|
|
assert source_lang == target_lang, \
|
|
|
'source language is not same with target language...'
|
|
|
|
|
|
span_to_repl = [0, len(old_phns) - 1]
|
|
|
span_to_add = [0, len(new_phns) - 1]
|
|
|
left_idx = 0
|
|
|
new_phns_left = []
|
|
|
sp_count = 0
|
|
|
# find the left different index
|
|
|
# 因为可能 align 时候的 words2phns 和直接 words2phns, 前者会有 sp?
|
|
|
for key in w2p.keys():
|
|
|
idx, wrd = key.split('_')
|
|
|
if wrd == 'sp':
|
|
|
sp_count += 1
|
|
|
new_phns_left.append('sp')
|
|
|
else:
|
|
|
idx = str(int(idx) - sp_count)
|
|
|
if idx + '_' + wrd in new_w2p:
|
|
|
# 是 new_str phn 序列的 index
|
|
|
left_idx += len(new_w2p[idx + '_' + wrd])
|
|
|
# old phn 序列
|
|
|
new_phns_left.extend(w2p[key])
|
|
|
else:
|
|
|
span_to_repl[0] = len(new_phns_left)
|
|
|
span_to_add[0] = len(new_phns_left)
|
|
|
break
|
|
|
|
|
|
# reverse w2p and new_w2p
|
|
|
right_idx = 0
|
|
|
new_phns_right = []
|
|
|
sp_count = 0
|
|
|
w2p_max_idx = _get_max_idx(w2p)
|
|
|
new_w2p_max_idx = _get_max_idx(new_w2p)
|
|
|
new_phns_mid = []
|
|
|
if is_append:
|
|
|
new_phns_right = []
|
|
|
new_phns_mid = new_phns[left_idx:]
|
|
|
span_to_repl[0] = len(new_phns_left)
|
|
|
span_to_add[0] = len(new_phns_left)
|
|
|
span_to_add[1] = len(new_phns_left) + len(new_phns_mid)
|
|
|
span_to_repl[1] = len(old_phns) - len(new_phns_right)
|
|
|
# speech edit
|
|
|
else:
|
|
|
for key in list(w2p.keys())[::-1]:
|
|
|
idx, wrd = key.split('_')
|
|
|
if wrd == 'sp':
|
|
|
sp_count += 1
|
|
|
new_phns_right = ['sp'] + new_phns_right
|
|
|
else:
|
|
|
idx = str(new_w2p_max_idx - (w2p_max_idx - int(idx) - sp_count))
|
|
|
if idx + '_' + wrd in new_w2p:
|
|
|
right_idx -= len(new_w2p[idx + '_' + wrd])
|
|
|
new_phns_right = w2p[key] + new_phns_right
|
|
|
else:
|
|
|
span_to_repl[1] = len(old_phns) - len(new_phns_right)
|
|
|
new_phns_mid = new_phns[left_idx:right_idx]
|
|
|
span_to_add[1] = len(new_phns_left) + len(new_phns_mid)
|
|
|
if len(new_phns_mid) == 0:
|
|
|
span_to_add[1] = min(span_to_add[1] + 1, len(new_phns))
|
|
|
span_to_add[0] = max(0, span_to_add[0] - 1)
|
|
|
span_to_repl[0] = max(0, span_to_repl[0] - 1)
|
|
|
span_to_repl[1] = min(span_to_repl[1] + 1,
|
|
|
len(old_phns))
|
|
|
break
|
|
|
new_phns = new_phns_left + new_phns_mid + new_phns_right
|
|
|
'''
|
|
|
For that reason cover should not be given.
|
|
|
For that reason cover is impossible to be given.
|
|
|
span_to_repl: [17, 23] "should not"
|
|
|
span_to_add: [17, 30] "is impossible to"
|
|
|
'''
|
|
|
outs = {}
|
|
|
outs['mfa_start'] = mfa_start
|
|
|
outs['mfa_end'] = mfa_end
|
|
|
outs['old_phns'] = old_phns
|
|
|
outs['new_phns'] = new_phns
|
|
|
outs['span_to_repl'] = span_to_repl
|
|
|
outs['span_to_add'] = span_to_add
|
|
|
|
|
|
return outs
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
text = "For that reason cover should not be given."
|
|
|
phn, dur, word2phns = alignment("source/p243_313.wav", text, lang='en')
|
|
|
print(phn, dur)
|
|
|
print(word2phns)
|
|
|
print("---------------------------------")
|
|
|
# 这里可以用我们的中文前端得到 pinyin 序列
|
|
|
text_zh = "卡尔普陪外孙玩滑梯。"
|
|
|
text_zh = pypinyin.lazy_pinyin(
|
|
|
text_zh,
|
|
|
neutral_tone_with_five=True,
|
|
|
style=pypinyin.Style.TONE3,
|
|
|
tone_sandhi=True)
|
|
|
text_zh = " ".join(text_zh)
|
|
|
phn, dur, word2phns = alignment("source/000001.wav", text_zh, lang='zh')
|
|
|
print(phn, dur)
|
|
|
print(word2phns)
|
|
|
print("---------------------------------")
|
|
|
phns, wrd2phns = words2phns(text, lang='en')
|
|
|
print("phns:", phns)
|
|
|
print("wrd2phns:", wrd2phns)
|
|
|
print("---------------------------------")
|
|
|
|
|
|
phns, wrd2phns = words2phns(text_zh, lang='zh')
|
|
|
print("phns:", phns)
|
|
|
print("wrd2phns:", wrd2phns)
|
|
|
print("---------------------------------")
|
|
|
|
|
|
outs = get_phns_spans(
|
|
|
wav_path="source/p243_313.wav",
|
|
|
old_str="For that reason cover should not be given.",
|
|
|
new_str="for that reason cover is impossible to be given.")
|
|
|
|
|
|
mfa_start = outs["mfa_start"]
|
|
|
mfa_end = outs["mfa_end"]
|
|
|
old_phns = outs["old_phns"]
|
|
|
new_phns = outs["new_phns"]
|
|
|
span_to_repl = outs["span_to_repl"]
|
|
|
span_to_add = outs["span_to_add"]
|
|
|
print("mfa_start:", mfa_start)
|
|
|
print("mfa_end:", mfa_end)
|
|
|
print("old_phns:", old_phns)
|
|
|
print("new_phns:", new_phns)
|
|
|
print("span_to_repl:", span_to_repl)
|
|
|
print("span_to_add:", span_to_add)
|
|
|
print("---------------------------------")
|