parent
82c97138fc
commit
b5f376e63b
@ -1,13 +1,11 @@
|
||||
aiofiles
|
||||
faiss-cpu
|
||||
fastapi
|
||||
librosa
|
||||
numpy
|
||||
paddlenlp
|
||||
paddlepaddle
|
||||
paddlespeech
|
||||
pydantic
|
||||
python-multipartscikit_learn
|
||||
SoundFile
|
||||
python-multipart
|
||||
scikit_learn
|
||||
starlette
|
||||
uvicorn
|
||||
numpy==1.20.0
|
||||
librosa==0.8.1
|
||||
praatio==5.0.0
|
||||
pyworld==0.3.0
|
@ -0,0 +1,247 @@
|
||||
from .ernie_sat_tool import ernie_sat_web
|
||||
import os
|
||||
|
||||
class SAT:
|
||||
def __init__(self, mfa_version='v1'):
|
||||
self.mfa_version = mfa_version
|
||||
|
||||
def zh_synthesize_edit(self,
|
||||
old_str:str,
|
||||
new_str:str,
|
||||
input_name:os.PathLike,
|
||||
output_name:os.PathLike,
|
||||
task_name:str="synthesize"
|
||||
):
|
||||
|
||||
if task_name not in ['synthesize', 'edit']:
|
||||
print("task name only in ['edit', 'synthesize']")
|
||||
return None
|
||||
|
||||
# erniesat model
|
||||
erniesat_config = "source/model/erniesat_aishell3_ckpt_1.2.0/default.yaml"
|
||||
erniesat_ckpt = "source/model/erniesat_aishell3_ckpt_1.2.0/snapshot_iter_289500.pdz"
|
||||
erniesat_stat = "source/model/erniesat_aishell3_ckpt_1.2.0/speech_stats.npy"
|
||||
phones_dict = "source/model/erniesat_aishell3_ckpt_1.2.0/phone_id_map.txt"
|
||||
duration_adjust = True
|
||||
# vocoder
|
||||
voc = "hifigan_aishell3"
|
||||
voc_config = "source/model/hifigan_aishell3_ckpt_0.2.0/default.yaml"
|
||||
voc_ckpt = "source/model/hifigan_aishell3_ckpt_0.2.0/snapshot_iter_2500000.pdz"
|
||||
voc_stat = "source/model/hifigan_aishell3_ckpt_0.2.0/feats_stats.npy"
|
||||
|
||||
source_lang = "zh"
|
||||
target_lang = "zh"
|
||||
wav_path = input_name
|
||||
output_name = output_name
|
||||
|
||||
output_name = ernie_sat_web(erniesat_config,
|
||||
old_str,
|
||||
new_str,
|
||||
source_lang,
|
||||
target_lang,
|
||||
task_name,
|
||||
erniesat_ckpt,
|
||||
erniesat_stat,
|
||||
phones_dict,
|
||||
voc_config,
|
||||
voc,
|
||||
voc_ckpt,
|
||||
voc_stat,
|
||||
duration_adjust,
|
||||
wav_path,
|
||||
output_name,
|
||||
mfa_version=self.mfa_version
|
||||
)
|
||||
return output_name
|
||||
|
||||
|
||||
def crossclone(self,
|
||||
old_str:str,
|
||||
new_str:str,input_name:os.PathLike,
|
||||
output_name:os.PathLike,
|
||||
source_lang:str,
|
||||
target_lang:str,
|
||||
):
|
||||
# erniesat model
|
||||
erniesat_config = "source/model/erniesat_aishell3_vctk_ckpt_1.2.0/default.yaml"
|
||||
erniesat_ckpt = "source/model/erniesat_aishell3_vctk_ckpt_1.2.0/snapshot_iter_489000.pdz"
|
||||
erniesat_stat = "source/model/erniesat_aishell3_vctk_ckpt_1.2.0/speech_stats.npy"
|
||||
phones_dict = "source/model/erniesat_aishell3_vctk_ckpt_1.2.0/phone_id_map.txt"
|
||||
duration_adjust = True
|
||||
# vocoder
|
||||
voc = "hifigan_aishell3"
|
||||
voc_config = "source/model/hifigan_aishell3_ckpt_0.2.0/default.yaml"
|
||||
voc_ckpt = "source/model/hifigan_aishell3_ckpt_0.2.0/snapshot_iter_2500000.pdz"
|
||||
voc_stat = "source/model/hifigan_aishell3_ckpt_0.2.0/feats_stats.npy"
|
||||
|
||||
task_name = 'synthesize'
|
||||
wav_path = input_name
|
||||
output_name = output_name
|
||||
|
||||
output_name = ernie_sat_web(erniesat_config,
|
||||
old_str,
|
||||
new_str,
|
||||
source_lang,
|
||||
target_lang,
|
||||
task_name,
|
||||
erniesat_ckpt,
|
||||
erniesat_stat,
|
||||
phones_dict,
|
||||
voc_config,
|
||||
voc,
|
||||
voc_ckpt,
|
||||
voc_stat,
|
||||
duration_adjust,
|
||||
wav_path,
|
||||
output_name,
|
||||
mfa_version=self.mfa_version
|
||||
)
|
||||
return output_name
|
||||
|
||||
def en_synthesize_edit(self,
|
||||
old_str:str,
|
||||
new_str:str,input_name:os.PathLike,
|
||||
output_name:os.PathLike,
|
||||
task_name:str="synthesize"):
|
||||
# erniesat model
|
||||
erniesat_config = "source/model/erniesat_vctk_ckpt_1.2.0/default.yaml"
|
||||
erniesat_ckpt = "source/model/erniesat_vctk_ckpt_1.2.0/snapshot_iter_199500.pdz"
|
||||
erniesat_stat = "source/model/erniesat_vctk_ckpt_1.2.0/speech_stats.npy"
|
||||
phones_dict = "source/model/erniesat_vctk_ckpt_1.2.0/phone_id_map.txt"
|
||||
duration_adjust = True
|
||||
# vocoder
|
||||
voc = "hifigan_aishell3"
|
||||
voc_config = "source/model/hifigan_vctk_ckpt_0.2.0/default.yaml"
|
||||
voc_ckpt = "source/model/hifigan_vctk_ckpt_0.2.0/snapshot_iter_2500000.pdz"
|
||||
voc_stat = "source/model/hifigan_vctk_ckpt_0.2.0/feats_stats.npy"
|
||||
|
||||
source_lang = "en"
|
||||
target_lang = "en"
|
||||
wav_path = input_name
|
||||
output_name = output_name
|
||||
|
||||
output_name = ernie_sat_web(erniesat_config,
|
||||
old_str,
|
||||
new_str,
|
||||
source_lang,
|
||||
target_lang,
|
||||
task_name,
|
||||
erniesat_ckpt,
|
||||
erniesat_stat,
|
||||
phones_dict,
|
||||
voc_config,
|
||||
voc,
|
||||
voc_ckpt,
|
||||
voc_stat,
|
||||
duration_adjust,
|
||||
wav_path,
|
||||
output_name,
|
||||
mfa_version=self.mfa_version
|
||||
)
|
||||
return output_name
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
sat = SAT(mfa_version='v2')
|
||||
# 中文语音克隆
|
||||
print("######## 中文语音克隆 #######")
|
||||
old_str = "请播放歌曲小苹果。"
|
||||
new_str = "歌曲真好听。"
|
||||
input_name = "source/wav/SAT/upload/SSB03540307.wav"
|
||||
output_name = "source/wav/SAT/out/sat_syn.wav"
|
||||
output_name = os.path.realpath(output_name)
|
||||
sat.zh_synthesize_edit(
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
input_name=input_name,
|
||||
output_name=output_name,
|
||||
task_name="synthesize"
|
||||
)
|
||||
|
||||
# 中文语音编辑
|
||||
print("######## 中文语音编辑 #######")
|
||||
old_str = "今天天气很好"
|
||||
new_str = "今天心情很好"
|
||||
input_name = "source/wav/SAT/upload/SSB03540428.wav"
|
||||
output_name = "source/wav/SAT/out/sat_edit.wav"
|
||||
output_name = os.path.realpath(output_name)
|
||||
print(os.path.realpath(output_name))
|
||||
sat.zh_synthesize_edit(
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
input_name=input_name,
|
||||
output_name=output_name,
|
||||
task_name="edit"
|
||||
)
|
||||
|
||||
# 中文跨语言克隆
|
||||
print("######## 中文 跨语言音色克隆 #######")
|
||||
old_str = "请播放歌曲小苹果。"
|
||||
new_str = "Thank you very mych! what can i do for you"
|
||||
source_lang='zh'
|
||||
target_lang='en'
|
||||
input_name = "source/wav/SAT/upload/SSB03540307.wav"
|
||||
output_name = "source/wav/SAT/out/sat_cross_zh2en.wav"
|
||||
output_name = os.path.realpath(output_name)
|
||||
print(os.path.realpath(output_name))
|
||||
sat.crossclone(
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
input_name=input_name,
|
||||
output_name=output_name,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang
|
||||
)
|
||||
|
||||
# 英文跨语言克隆
|
||||
print("######## 英文 跨语言音色克隆 #######")
|
||||
old_str = "For that reason cover should not be given."
|
||||
new_str = "今天天气很好"
|
||||
source_lang='en'
|
||||
target_lang='zh'
|
||||
input_name = "source/wav/SAT/upload/p243_313.wav"
|
||||
output_name = "source/wav/SAT/out/sat_cross_en2zh.wav"
|
||||
output_name = os.path.realpath(output_name)
|
||||
print(os.path.realpath(output_name))
|
||||
sat.crossclone(
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
input_name=input_name,
|
||||
output_name=output_name,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang
|
||||
)
|
||||
|
||||
# 英文语音克隆
|
||||
print("######## 英文音色克隆 #######")
|
||||
old_str = "For that reason cover should not be given."
|
||||
new_str = "I love you very much do you love me"
|
||||
input_name = "source/wav/SAT/upload/p243_313.wav"
|
||||
output_name = "source/wav/SAT/out/sat_syn_en.wav"
|
||||
output_name = os.path.realpath(output_name)
|
||||
sat.en_synthesize_edit(
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
input_name=input_name,
|
||||
output_name=output_name,
|
||||
task_name="synthesize"
|
||||
)
|
||||
|
||||
# 英文语音编辑
|
||||
print("######## 英文语音编辑 #######")
|
||||
old_str = "For that reason cover should not be given."
|
||||
new_str = "For that reason cover is not impossible to be given."
|
||||
input_name = "source/wav/SAT/upload/p243_313.wav"
|
||||
output_name = "source/wav/SAT/out/sat_edit_en.wav"
|
||||
output_name = os.path.realpath(output_name)
|
||||
sat.en_synthesize_edit(
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
input_name=input_name,
|
||||
output_name=output_name,
|
||||
task_name="edit"
|
||||
)
|
||||
|
@ -0,0 +1,542 @@
|
||||
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import argparse
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from unittest import main
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
import paddle
|
||||
import pypinyin
|
||||
import soundfile as sf
|
||||
import yaml
|
||||
from pypinyin_dict.phrase_pinyin_data import large_pinyin
|
||||
from yacs.config import CfgNode
|
||||
|
||||
from paddlespeech.t2s.datasets.am_batch_fn import build_erniesat_collate_fn
|
||||
from paddlespeech.t2s.datasets.get_feats import LogMelFBank
|
||||
# from paddlespeech.t2s.exps.ernie_sat.align import get_phns_spans
|
||||
from paddlespeech.t2s.exps.ernie_sat.utils import get_dur_adj_factor
|
||||
from paddlespeech.t2s.exps.ernie_sat.utils import get_span_bdy
|
||||
from paddlespeech.t2s.exps.ernie_sat.utils import get_tmp_name
|
||||
from paddlespeech.t2s.exps.syn_utils import get_am_inference
|
||||
from paddlespeech.t2s.exps.syn_utils import get_voc_inference
|
||||
from paddlespeech.t2s.exps.syn_utils import norm
|
||||
from paddlespeech.t2s.utils import str2bool
|
||||
large_pinyin.load()
|
||||
|
||||
from .align import get_phns_spans
|
||||
|
||||
def eval_durs(phns, target_lang: str='zh', fs: int=24000, n_shift: int=300):
|
||||
|
||||
if target_lang == 'en':
|
||||
am = "fastspeech2_ljspeech"
|
||||
am_config = "source/model/fastspeech2_nosil_ljspeech_ckpt_0.5/default.yaml"
|
||||
am_ckpt = "source/model/fastspeech2_nosil_ljspeech_ckpt_0.5/snapshot_iter_100000.pdz"
|
||||
am_stat = "source/model/fastspeech2_nosil_ljspeech_ckpt_0.5/speech_stats.npy"
|
||||
phones_dict = "source/model/fastspeech2_nosil_ljspeech_ckpt_0.5/phone_id_map.txt"
|
||||
|
||||
elif target_lang == 'zh':
|
||||
am = "fastspeech2_csmsc"
|
||||
am_config = "source/model/fastspeech2_conformer_baker_ckpt_0.5/conformer.yaml"
|
||||
am_ckpt = "source/model/fastspeech2_conformer_baker_ckpt_0.5/snapshot_iter_76000.pdz"
|
||||
am_stat = "source/model/fastspeech2_conformer_baker_ckpt_0.5/speech_stats.npy"
|
||||
phones_dict = "source/model/fastspeech2_conformer_baker_ckpt_0.5/phone_id_map.txt"
|
||||
|
||||
# Init body.
|
||||
with open(am_config) as f:
|
||||
am_config = CfgNode(yaml.safe_load(f))
|
||||
|
||||
am_inference, am = get_am_inference(
|
||||
am=am,
|
||||
am_config=am_config,
|
||||
am_ckpt=am_ckpt,
|
||||
am_stat=am_stat,
|
||||
phones_dict=phones_dict,
|
||||
return_am=True)
|
||||
|
||||
vocab_phones = {}
|
||||
with open(phones_dict, "r") as f:
|
||||
phn_id = [line.strip().split() for line in f.readlines()]
|
||||
for tone, id in phn_id:
|
||||
vocab_phones[tone] = int(id)
|
||||
vocab_size = len(vocab_phones)
|
||||
phonemes = [phn if phn in vocab_phones else "sp" for phn in phns]
|
||||
|
||||
phone_ids = [vocab_phones[item] for item in phonemes]
|
||||
phone_ids = paddle.to_tensor(np.array(phone_ids, np.int64))
|
||||
_, d_outs, _, _ = am.inference(phone_ids)
|
||||
d_outs = d_outs.tolist()
|
||||
return d_outs
|
||||
|
||||
|
||||
|
||||
def _p2id(phonemes: List[str], vocab_phones) -> np.ndarray:
|
||||
# replace unk phone with sp
|
||||
phonemes = [phn if phn in vocab_phones else "sp" for phn in phonemes]
|
||||
phone_ids = [vocab_phones[item] for item in phonemes]
|
||||
return np.array(phone_ids, np.int64)
|
||||
|
||||
|
||||
def prep_feats_with_dur(wav_path: str,
|
||||
old_str: str='',
|
||||
new_str: str='',
|
||||
source_lang: str='en',
|
||||
target_lang: str='en',
|
||||
duration_adjust: bool=True,
|
||||
fs: int=24000,
|
||||
n_shift: int=300,
|
||||
mfa_version='v1'):
|
||||
'''
|
||||
Returns:
|
||||
np.ndarray: new wav, replace the part to be edited in original wav with 0
|
||||
List[str]: new phones
|
||||
List[float]: mfa start of new wav
|
||||
List[float]: mfa end of new wav
|
||||
List[int]: masked mel boundary of original wav
|
||||
List[int]: masked mel boundary of new wav
|
||||
'''
|
||||
wav_org, _ = librosa.load(wav_path, sr=fs)
|
||||
phns_spans_outs = get_phns_spans(
|
||||
wav_path=wav_path,
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang,
|
||||
fs=fs,
|
||||
n_shift=n_shift,
|
||||
mfa_version=mfa_version)
|
||||
|
||||
mfa_start = phns_spans_outs['mfa_start']
|
||||
mfa_end = phns_spans_outs['mfa_end']
|
||||
old_phns = phns_spans_outs['old_phns']
|
||||
new_phns = phns_spans_outs['new_phns']
|
||||
span_to_repl = phns_spans_outs['span_to_repl']
|
||||
span_to_add = phns_spans_outs['span_to_add']
|
||||
|
||||
# 中文的 phns 不一定都在 fastspeech2 的字典里, 用 sp 代替
|
||||
if target_lang in {'en', 'zh'}:
|
||||
old_durs = eval_durs(old_phns, target_lang=source_lang)
|
||||
else:
|
||||
assert target_lang in {'en', 'zh'}, \
|
||||
"calculate duration_predict is not support for this language..."
|
||||
|
||||
orig_old_durs = [e - s for e, s in zip(mfa_end, mfa_start)]
|
||||
|
||||
if duration_adjust:
|
||||
d_factor = get_dur_adj_factor(
|
||||
orig_dur=orig_old_durs, pred_dur=old_durs, phns=old_phns)
|
||||
d_factor = d_factor * 1.25
|
||||
else:
|
||||
d_factor = 1
|
||||
|
||||
if target_lang in {'en', 'zh'}:
|
||||
new_durs = eval_durs(new_phns, target_lang=target_lang)
|
||||
else:
|
||||
assert target_lang == "zh" or target_lang == "en", \
|
||||
"calculate duration_predict is not support for this language..."
|
||||
|
||||
# duration 要是整数
|
||||
new_durs_adjusted = [int(np.ceil(d_factor * i)) for i in new_durs]
|
||||
|
||||
new_span_dur_sum = sum(new_durs_adjusted[span_to_add[0]:span_to_add[1]])
|
||||
old_span_dur_sum = sum(orig_old_durs[span_to_repl[0]:span_to_repl[1]])
|
||||
dur_offset = new_span_dur_sum - old_span_dur_sum
|
||||
new_mfa_start = mfa_start[:span_to_repl[0]]
|
||||
new_mfa_end = mfa_end[:span_to_repl[0]]
|
||||
|
||||
for dur in new_durs_adjusted[span_to_add[0]:span_to_add[1]]:
|
||||
if len(new_mfa_end) == 0:
|
||||
new_mfa_start.append(0)
|
||||
new_mfa_end.append(dur)
|
||||
else:
|
||||
new_mfa_start.append(new_mfa_end[-1])
|
||||
new_mfa_end.append(new_mfa_end[-1] + dur)
|
||||
|
||||
new_mfa_start += [i + dur_offset for i in mfa_start[span_to_repl[1]:]]
|
||||
new_mfa_end += [i + dur_offset for i in mfa_end[span_to_repl[1]:]]
|
||||
|
||||
# 3. get new wav
|
||||
# 在原始句子后拼接
|
||||
if span_to_repl[0] >= len(mfa_start):
|
||||
wav_left_idx = len(wav_org)
|
||||
wav_right_idx = wav_left_idx
|
||||
# 在原始句子中间替换
|
||||
else:
|
||||
wav_left_idx = int(np.floor(mfa_start[span_to_repl[0]] * n_shift))
|
||||
wav_right_idx = int(np.ceil(mfa_end[span_to_repl[1] - 1] * n_shift))
|
||||
blank_wav = np.zeros(
|
||||
(int(np.ceil(new_span_dur_sum * n_shift)), ), dtype=wav_org.dtype)
|
||||
# 原始音频,需要编辑的部分替换成空音频,空音频的时间由 fs2 的 duration_predictor 决定
|
||||
new_wav = np.concatenate(
|
||||
[wav_org[:wav_left_idx], blank_wav, wav_org[wav_right_idx:]])
|
||||
|
||||
# 4. get old and new mel span to be mask
|
||||
old_span_bdy = get_span_bdy(
|
||||
mfa_start=mfa_start, mfa_end=mfa_end, span_to_repl=span_to_repl)
|
||||
|
||||
new_span_bdy = get_span_bdy(
|
||||
mfa_start=new_mfa_start, mfa_end=new_mfa_end, span_to_repl=span_to_add)
|
||||
|
||||
# old_span_bdy, new_span_bdy 是帧级别的范围
|
||||
outs = {}
|
||||
outs['new_wav'] = new_wav
|
||||
outs['new_phns'] = new_phns
|
||||
outs['new_mfa_start'] = new_mfa_start
|
||||
outs['new_mfa_end'] = new_mfa_end
|
||||
outs['old_span_bdy'] = old_span_bdy
|
||||
outs['new_span_bdy'] = new_span_bdy
|
||||
return outs
|
||||
|
||||
|
||||
def prep_feats(wav_path: str,
|
||||
mel_extractor,
|
||||
vocab_phones,
|
||||
erniesat_stat,
|
||||
collate_fn,
|
||||
old_str: str='',
|
||||
new_str: str='',
|
||||
source_lang: str='en',
|
||||
target_lang: str='en',
|
||||
duration_adjust: bool=True,
|
||||
fs: int=24000,
|
||||
n_shift: int=300,
|
||||
mfa_version: str='v1'
|
||||
):
|
||||
|
||||
with_dur_outs = prep_feats_with_dur(
|
||||
wav_path=wav_path,
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang,
|
||||
duration_adjust=duration_adjust,
|
||||
fs=fs,
|
||||
n_shift=n_shift,
|
||||
mfa_version=mfa_version
|
||||
)
|
||||
|
||||
wav_name = os.path.basename(wav_path)
|
||||
utt_id = wav_name.split('.')[0]
|
||||
|
||||
wav = with_dur_outs['new_wav']
|
||||
phns = with_dur_outs['new_phns']
|
||||
mfa_start = with_dur_outs['new_mfa_start']
|
||||
mfa_end = with_dur_outs['new_mfa_end']
|
||||
old_span_bdy = with_dur_outs['old_span_bdy']
|
||||
new_span_bdy = with_dur_outs['new_span_bdy']
|
||||
span_bdy = np.array(new_span_bdy)
|
||||
|
||||
mel = mel_extractor.get_log_mel_fbank(wav)
|
||||
erniesat_mean, erniesat_std = np.load(erniesat_stat)
|
||||
normed_mel = norm(mel, erniesat_mean, erniesat_std)
|
||||
tmp_name = 'ernie_sat/' + get_tmp_name(text=old_str)
|
||||
tmpbase = './tmp_dir/' + tmp_name
|
||||
tmpbase = Path(tmpbase)
|
||||
tmpbase.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
mel_path = tmpbase / 'mel.npy'
|
||||
np.save(mel_path, normed_mel)
|
||||
durations = [e - s for e, s in zip(mfa_end, mfa_start)]
|
||||
text = _p2id(phns, vocab_phones)
|
||||
|
||||
datum = {
|
||||
"utt_id": utt_id,
|
||||
"spk_id": 0,
|
||||
"text": text,
|
||||
"text_lengths": len(text),
|
||||
"speech_lengths": len(normed_mel),
|
||||
"durations": durations,
|
||||
"speech": np.load(mel_path),
|
||||
"align_start": mfa_start,
|
||||
"align_end": mfa_end,
|
||||
"span_bdy": span_bdy
|
||||
}
|
||||
|
||||
batch = collate_fn([datum])
|
||||
outs = dict()
|
||||
outs['batch'] = batch
|
||||
outs['old_span_bdy'] = old_span_bdy
|
||||
outs['new_span_bdy'] = new_span_bdy
|
||||
return outs
|
||||
|
||||
|
||||
def get_mlm_output(wav_path: str,
|
||||
erniesat_inference,
|
||||
mel_extractor,
|
||||
vocab_phones,
|
||||
erniesat_stat,
|
||||
collate_fn,
|
||||
old_str: str='',
|
||||
new_str: str='',
|
||||
source_lang: str='en',
|
||||
target_lang: str='en',
|
||||
duration_adjust: bool=True,
|
||||
fs: int=24000,
|
||||
n_shift: int=300,
|
||||
mfa_version: str='v1' ):
|
||||
|
||||
prep_feats_outs = prep_feats(
|
||||
wav_path=wav_path,
|
||||
mel_extractor=mel_extractor,
|
||||
vocab_phones=vocab_phones,
|
||||
erniesat_stat=erniesat_stat,
|
||||
collate_fn=collate_fn,
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang,
|
||||
duration_adjust=duration_adjust,
|
||||
fs=fs,
|
||||
n_shift=n_shift,
|
||||
mfa_version=mfa_version)
|
||||
|
||||
batch = prep_feats_outs['batch']
|
||||
new_span_bdy = prep_feats_outs['new_span_bdy']
|
||||
old_span_bdy = prep_feats_outs['old_span_bdy']
|
||||
|
||||
out_mels = erniesat_inference(
|
||||
speech=batch['speech'],
|
||||
text=batch['text'],
|
||||
masked_pos=batch['masked_pos'],
|
||||
speech_mask=batch['speech_mask'],
|
||||
text_mask=batch['text_mask'],
|
||||
speech_seg_pos=batch['speech_seg_pos'],
|
||||
text_seg_pos=batch['text_seg_pos'],
|
||||
span_bdy=new_span_bdy)
|
||||
|
||||
# 拼接音频
|
||||
output_feat = paddle.concat(x=out_mels, axis=0)
|
||||
wav_org, _ = librosa.load(wav_path, sr=fs)
|
||||
outs = dict()
|
||||
outs['wav_org'] = wav_org
|
||||
outs['output_feat'] = output_feat
|
||||
outs['old_span_bdy'] = old_span_bdy
|
||||
outs['new_span_bdy'] = new_span_bdy
|
||||
|
||||
return outs
|
||||
|
||||
|
||||
def get_wav(wav_path: str,
|
||||
task_name,
|
||||
voc_inference,
|
||||
erniesat_inference,
|
||||
mel_extractor,
|
||||
vocab_phones,
|
||||
erniesat_stat,
|
||||
collate_fn,
|
||||
source_lang: str='en',
|
||||
target_lang: str='en',
|
||||
old_str: str='',
|
||||
new_str: str='',
|
||||
duration_adjust: bool=True,
|
||||
fs: int=24000,
|
||||
n_shift: int=300,
|
||||
mfa_version: str='v1'):
|
||||
|
||||
outs = get_mlm_output(
|
||||
wav_path=wav_path,
|
||||
erniesat_inference=erniesat_inference,
|
||||
mel_extractor=mel_extractor,
|
||||
vocab_phones=vocab_phones,
|
||||
erniesat_stat=erniesat_stat,
|
||||
collate_fn=collate_fn,
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang,
|
||||
duration_adjust=duration_adjust,
|
||||
fs=fs,
|
||||
n_shift=n_shift,
|
||||
mfa_version=mfa_version)
|
||||
|
||||
wav_org = outs['wav_org']
|
||||
output_feat = outs['output_feat']
|
||||
old_span_bdy = outs['old_span_bdy']
|
||||
new_span_bdy = outs['new_span_bdy']
|
||||
|
||||
masked_feat = output_feat[new_span_bdy[0]:new_span_bdy[1]]
|
||||
|
||||
with paddle.no_grad():
|
||||
alt_wav = voc_inference(masked_feat)
|
||||
alt_wav = np.squeeze(alt_wav)
|
||||
|
||||
old_time_bdy = [n_shift * x for x in old_span_bdy]
|
||||
if task_name == 'edit':
|
||||
wav_replaced = np.concatenate(
|
||||
[wav_org[:old_time_bdy[0]], alt_wav, wav_org[old_time_bdy[1]:]])
|
||||
else:
|
||||
wav_replaced = alt_wav
|
||||
|
||||
wav_dict = {"origin": wav_org, "output": wav_replaced}
|
||||
return wav_dict
|
||||
|
||||
|
||||
def ernie_sat_web(erniesat_config,
|
||||
old_str,
|
||||
new_str,
|
||||
source_lang,
|
||||
target_lang,
|
||||
task_name,
|
||||
erniesat_ckpt,
|
||||
erniesat_stat,
|
||||
phones_dict,
|
||||
voc_config,
|
||||
voc,
|
||||
voc_ckpt,
|
||||
voc_stat,
|
||||
duration_adjust,
|
||||
wav_path,
|
||||
output_name,
|
||||
mfa_version='v1'
|
||||
):
|
||||
with open(erniesat_config) as f:
|
||||
erniesat_config = CfgNode(yaml.safe_load(f))
|
||||
|
||||
# convert Chinese characters to pinyin
|
||||
if source_lang == 'zh':
|
||||
old_str = pypinyin.lazy_pinyin(
|
||||
old_str,
|
||||
neutral_tone_with_five=True,
|
||||
style=pypinyin.Style.TONE3,
|
||||
tone_sandhi=True)
|
||||
old_str = ' '.join(old_str)
|
||||
if target_lang == 'zh':
|
||||
new_str = pypinyin.lazy_pinyin(
|
||||
new_str,
|
||||
neutral_tone_with_five=True,
|
||||
style=pypinyin.Style.TONE3,
|
||||
tone_sandhi=True)
|
||||
new_str = ' '.join(new_str)
|
||||
|
||||
if task_name == 'edit':
|
||||
new_str = new_str
|
||||
elif task_name == 'synthesize':
|
||||
new_str = old_str + ' ' + new_str
|
||||
else:
|
||||
new_str = old_str + ' ' + new_str
|
||||
print("new_str:", new_str)
|
||||
|
||||
# Extractor
|
||||
mel_extractor = LogMelFBank(
|
||||
sr=erniesat_config.fs,
|
||||
n_fft=erniesat_config.n_fft,
|
||||
hop_length=erniesat_config.n_shift,
|
||||
win_length=erniesat_config.win_length,
|
||||
window=erniesat_config.window,
|
||||
n_mels=erniesat_config.n_mels,
|
||||
fmin=erniesat_config.fmin,
|
||||
fmax=erniesat_config.fmax)
|
||||
|
||||
collate_fn = build_erniesat_collate_fn(
|
||||
mlm_prob=erniesat_config.mlm_prob,
|
||||
mean_phn_span=erniesat_config.mean_phn_span,
|
||||
seg_emb=erniesat_config.model['enc_input_layer'] == 'sega_mlm',
|
||||
text_masking=False)
|
||||
|
||||
vocab_phones = {}
|
||||
|
||||
with open(phones_dict, 'rt') as f:
|
||||
phn_id = [line.strip().split() for line in f.readlines()]
|
||||
for phn, id in phn_id:
|
||||
vocab_phones[phn] = int(id)
|
||||
|
||||
# ernie sat model
|
||||
erniesat_inference = get_am_inference(
|
||||
am='erniesat_dataset',
|
||||
am_config=erniesat_config,
|
||||
am_ckpt=erniesat_ckpt,
|
||||
am_stat=erniesat_stat,
|
||||
phones_dict=phones_dict)
|
||||
|
||||
with open(voc_config) as f:
|
||||
voc_config = CfgNode(yaml.safe_load(f))
|
||||
|
||||
# vocoder
|
||||
voc_inference = get_voc_inference(
|
||||
voc=voc,
|
||||
voc_config=voc_config,
|
||||
voc_ckpt=voc_ckpt,
|
||||
voc_stat=voc_stat)
|
||||
|
||||
erniesat_stat = erniesat_stat
|
||||
|
||||
wav_dict = get_wav(
|
||||
wav_path=wav_path,
|
||||
task_name=task_name,
|
||||
voc_inference=voc_inference,
|
||||
erniesat_inference=erniesat_inference,
|
||||
mel_extractor=mel_extractor,
|
||||
vocab_phones=vocab_phones,
|
||||
erniesat_stat=erniesat_stat,
|
||||
collate_fn=collate_fn,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang,
|
||||
old_str=old_str,
|
||||
new_str=new_str,
|
||||
duration_adjust=duration_adjust,
|
||||
fs=erniesat_config.fs,
|
||||
n_shift=erniesat_config.n_shift,
|
||||
mfa_version=mfa_version)
|
||||
|
||||
sf.write(
|
||||
output_name, wav_dict['output'], samplerate=erniesat_config.fs)
|
||||
return output_name
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
erniesat_config = "source/model/erniesat_aishell3_ckpt_1.2.0/default.yaml"
|
||||
erniesat_ckpt = "source/model/erniesat_aishell3_ckpt_1.2.0/snapshot_iter_289500.pdz"
|
||||
erniesat_stat = "source/model/erniesat_aishell3_ckpt_1.2.0/speech_stats.npy"
|
||||
phones_dict = "source/model/erniesat_aishell3_ckpt_1.2.0/phone_id_map.txt"
|
||||
duration_adjust = True
|
||||
|
||||
voc = "hifigan_aishell3"
|
||||
voc_config = "source/model/hifigan_aishell3_ckpt_0.2.0/default.yaml"
|
||||
voc_ckpt = "source/model/hifigan_aishell3_ckpt_0.2.0/snapshot_iter_2500000.pdz"
|
||||
voc_stat = "source/model/hifigan_aishell3_ckpt_0.2.0/feats_stats.npy"
|
||||
|
||||
|
||||
old_str = "今天天气很好"
|
||||
new_str = "今天心情很好"
|
||||
source_lang = "zh"
|
||||
target_lang = "zh"
|
||||
task_name = "edit"
|
||||
wav_path = "source/wav/SAT/upload/SSB03540428.wav"
|
||||
output_name = "source/wav/SAT/out/demo_edit.wav"
|
||||
|
||||
mfa_version='v2'
|
||||
|
||||
ernie_sat_web(erniesat_config,
|
||||
old_str,
|
||||
new_str,
|
||||
source_lang,
|
||||
target_lang,
|
||||
task_name,
|
||||
erniesat_ckpt,
|
||||
erniesat_stat,
|
||||
phones_dict,
|
||||
voc_config,
|
||||
voc,
|
||||
voc_ckpt,
|
||||
voc_stat,
|
||||
duration_adjust,
|
||||
wav_path,
|
||||
output_name,
|
||||
mfa_version=mfa_version
|
||||
)
|
||||
|
@ -0,0 +1,142 @@
|
||||
#
|
||||
# GE2E 里面的函数会干扰这边的训练过程,引起错误
|
||||
# 单独运行此处的 finetune 微调过程
|
||||
#
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
# from src.ft.finetune_tool import finetune_model
|
||||
# from ft.finetune_tool import finetune_model, synthesize
|
||||
|
||||
def find_max_ckpt(model_path):
|
||||
max_ckpt = 0
|
||||
for filename in os.listdir(model_path):
|
||||
if filename.endswith('.pdz'):
|
||||
files = filename[:-4]
|
||||
a1, a2, it = files.split("_")
|
||||
if int(it) > max_ckpt:
|
||||
max_ckpt = int(it)
|
||||
return max_ckpt
|
||||
|
||||
|
||||
class FineTune:
|
||||
def __init__(self, mfa_version='v1', pretrained_model_dir="source/model/fastspeech2_aishell3_ckpt_1.1.0"):
|
||||
self.mfa_version = mfa_version
|
||||
self.pretrained_model_dir = pretrained_model_dir
|
||||
|
||||
def finetune(self, input_dir, exp_dir = 'temp', epoch=10, batch_size=2):
|
||||
|
||||
mfa_dir = os.path.join(exp_dir, 'mfa_result')
|
||||
dump_dir = os.path.join(exp_dir, 'dump')
|
||||
output_dir = os.path.join(exp_dir, 'exp')
|
||||
lang = "zh"
|
||||
ngpu = 0
|
||||
|
||||
cmd = f"""
|
||||
python src/ft/finetune_tool.py --input_dir {input_dir} \
|
||||
--pretrained_model_dir {self.pretrained_model_dir} \
|
||||
--mfa_dir {mfa_dir} \
|
||||
--dump_dir {dump_dir} \
|
||||
--output_dir {output_dir} \
|
||||
--lang {lang} \
|
||||
--ngpu {ngpu} \
|
||||
--epoch {epoch} \
|
||||
--batch_size {batch_size} \
|
||||
--mfa_version {self.mfa_version}
|
||||
"""
|
||||
|
||||
return self.run_cmd(cmd=cmd, output_name=exp_dir)
|
||||
|
||||
|
||||
def synthesize(self, text, wav_name, out_wav_dir, exp_dir = 'tmp_dir'):
|
||||
|
||||
# 合成测试
|
||||
pretrained_model_dir = self.pretrained_model_dir
|
||||
print("exp_dir: ", exp_dir)
|
||||
dump_dir = os.path.join(exp_dir, 'dump')
|
||||
output_dir = os.path.join(exp_dir, 'exp')
|
||||
text_path = os.path.join(exp_dir, 'sentences.txt')
|
||||
lang = "zh"
|
||||
|
||||
model_path = f"{output_dir}/checkpoints"
|
||||
ckpt = find_max_ckpt(model_path)
|
||||
|
||||
# 生成对应的语句
|
||||
with open(text_path, "w", encoding='utf8') as f:
|
||||
f.write(wav_name+" "+text)
|
||||
|
||||
lang = "zh"
|
||||
spk_id = 0
|
||||
ngpu = 0
|
||||
am = "fastspeech2_aishell3"
|
||||
am_config = f"{pretrained_model_dir}/default.yaml"
|
||||
am_ckpt = f"{output_dir}/checkpoints/snapshot_iter_{ckpt}.pdz"
|
||||
am_stat = f"{pretrained_model_dir}/speech_stats.npy"
|
||||
speaker_dict = f"{dump_dir}/speaker_id_map.txt"
|
||||
phones_dict = f"{dump_dir}/phone_id_map.txt"
|
||||
tones_dict = None
|
||||
voc = "hifigan_aishell3"
|
||||
voc_config = "source/model/hifigan_aishell3_ckpt_0.2.0/default.yaml"
|
||||
voc_ckpt = "source/model/hifigan_aishell3_ckpt_0.2.0/snapshot_iter_2500000.pdz"
|
||||
voc_stat = "source/model/hifigan_aishell3_ckpt_0.2.0/feats_stats.npy"
|
||||
|
||||
cmd = f"""
|
||||
python src/ft/synthesize.py \
|
||||
--am={am} \
|
||||
--am_config={am_config} \
|
||||
--am_ckpt={am_ckpt} \
|
||||
--am_stat={am_stat} \
|
||||
--voc={voc} \
|
||||
--voc_config={voc_config} \
|
||||
--voc_ckpt={voc_ckpt} \
|
||||
--voc_stat={voc_stat} \
|
||||
--lang={lang} \
|
||||
--text={text_path}\
|
||||
--output_dir={out_wav_dir} \
|
||||
--phones_dict={phones_dict} \
|
||||
--speaker_dict={speaker_dict} \
|
||||
--ngpu {ngpu} \
|
||||
--spk_id={spk_id}
|
||||
"""
|
||||
out_wav_path = os.path.join(out_wav_dir, wav_name)
|
||||
return self.run_cmd(cmd, out_wav_path+'.wav')
|
||||
|
||||
def run_cmd(self, cmd, output_name):
|
||||
p = subprocess.Popen(cmd, shell=True)
|
||||
res = p.wait()
|
||||
print(cmd)
|
||||
print("运行结果:", res)
|
||||
if res == 0:
|
||||
# 运行成功
|
||||
print(f"cmd 合成结果: {output_name}")
|
||||
if os.path.exists(output_name):
|
||||
return output_name
|
||||
else:
|
||||
# 合成的文件不存在
|
||||
return None
|
||||
else:
|
||||
# 运行失败
|
||||
return None
|
||||
|
||||
if __name__ == '__main__':
|
||||
ft_model = FineTune(mfa_version='v2')
|
||||
|
||||
exp_dir = os.path.realpath("tmp_dir/finetune")
|
||||
input_dir = os.path.realpath("source/wav/finetune/default")
|
||||
output_dir = os.path.realpath("source/wav/finetune/out")
|
||||
|
||||
#################################
|
||||
######## 试验轮次验证 #############
|
||||
#################################
|
||||
lab = 1
|
||||
# 先删除数据
|
||||
cmd = f"rm -rf {exp_dir}"
|
||||
os.system(cmd)
|
||||
ft_model.finetune(input_dir=input_dir, exp_dir = exp_dir, epoch=10, batch_size=2)
|
||||
|
||||
# 合成
|
||||
text = "今天的天气真不错"
|
||||
wav_name = "demo" + str(lab) + "_a"
|
||||
out_wav_dir = os.path.realpath("source/wav/finetune/out")
|
||||
ft_model.synthesize(text, wav_name, out_wav_dir, exp_dir = exp_dir)
|
||||
|
@ -0,0 +1,125 @@
|
||||
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Union
|
||||
|
||||
|
||||
def check_phone(label_file: Union[str, Path],
|
||||
pinyin_phones: Dict[str, str],
|
||||
mfa_phones: List[str],
|
||||
am_phones: List[str],
|
||||
oov_record: str="./oov_info.txt"):
|
||||
"""Check whether the phoneme corresponding to the audio text content
|
||||
is in the phoneme list of the pretrained mfa model to ensure that the alignment is normal.
|
||||
Check whether the phoneme corresponding to the audio text content
|
||||
is in the phoneme list of the pretrained am model to ensure finetune (normalize) is normal.
|
||||
|
||||
Args:
|
||||
label_file (Union[str, Path]): label file, format: utt_id|phone seq
|
||||
pinyin_phones (dict): pinyin to phones map dict
|
||||
mfa_phones (list): the phone list of pretrained mfa model
|
||||
am_phones (list): the phone list of pretrained mfa model
|
||||
|
||||
Returns:
|
||||
oov_words (list): oov words
|
||||
oov_files (list): utt id list that exist oov
|
||||
oov_file_words (dict): the oov file and oov phone in this file
|
||||
"""
|
||||
oov_words = []
|
||||
oov_files = []
|
||||
oov_file_words = {}
|
||||
|
||||
with open(label_file, "r") as f:
|
||||
for line in f.readlines():
|
||||
utt_id = line.split("|")[0]
|
||||
transcription = line.strip().split("|")[1]
|
||||
flag = 0
|
||||
temp_oov_words = []
|
||||
for word in transcription.split(" "):
|
||||
if word not in pinyin_phones.keys():
|
||||
temp_oov_words.append(word)
|
||||
flag = 1
|
||||
if word not in oov_words:
|
||||
oov_words.append(word)
|
||||
else:
|
||||
for p in pinyin_phones[word]:
|
||||
if p not in mfa_phones or p not in am_phones:
|
||||
temp_oov_words.append(word)
|
||||
flag = 1
|
||||
if word not in oov_words:
|
||||
oov_words.append(word)
|
||||
if flag == 1:
|
||||
oov_files.append(utt_id)
|
||||
oov_file_words[utt_id] = temp_oov_words
|
||||
|
||||
if oov_record is not None:
|
||||
with open(oov_record, "w") as fw:
|
||||
fw.write("oov_words: " + str(oov_words) + "\n")
|
||||
fw.write("oov_files: " + str(oov_files) + "\n")
|
||||
fw.write("oov_file_words: " + str(oov_file_words) + "\n")
|
||||
|
||||
return oov_words, oov_files, oov_file_words
|
||||
|
||||
|
||||
def get_pinyin_phones(lexicon_file: Union[str, Path]):
|
||||
# pinyin to phones
|
||||
pinyin_phones = {}
|
||||
with open(lexicon_file, "r") as f2:
|
||||
for line in f2.readlines():
|
||||
line_list = line.strip().split(" ")
|
||||
pinyin = line_list[0]
|
||||
if line_list[1] == '':
|
||||
phones = line_list[2:]
|
||||
else:
|
||||
phones = line_list[1:]
|
||||
pinyin_phones[pinyin] = phones
|
||||
|
||||
return pinyin_phones
|
||||
|
||||
|
||||
def get_mfa_phone(mfa_phone_file: Union[str, Path]):
|
||||
# get phones from pretrained mfa model (meta.yaml)
|
||||
mfa_phones = []
|
||||
with open(mfa_phone_file, "r") as f:
|
||||
for line in f.readlines():
|
||||
if line.startswith("-"):
|
||||
phone = line.strip().split(" ")[-1]
|
||||
mfa_phones.append(phone)
|
||||
|
||||
return mfa_phones
|
||||
|
||||
|
||||
def get_am_phone(am_phone_file: Union[str, Path]):
|
||||
# get phones from pretrained am model (phone_id_map.txt)
|
||||
am_phones = []
|
||||
with open(am_phone_file, "r") as f:
|
||||
for line in f.readlines():
|
||||
phone = line.strip().split(" ")[0]
|
||||
am_phones.append(phone)
|
||||
|
||||
return am_phones
|
||||
|
||||
|
||||
def get_check_result(label_file: Union[str, Path],
|
||||
lexicon_file: Union[str, Path],
|
||||
mfa_phone_file: Union[str, Path],
|
||||
am_phone_file: Union[str, Path]):
|
||||
pinyin_phones = get_pinyin_phones(lexicon_file)
|
||||
mfa_phones = get_mfa_phone(mfa_phone_file)
|
||||
am_phones = get_am_phone(am_phone_file)
|
||||
oov_words, oov_files, oov_file_words = check_phone(
|
||||
label_file, pinyin_phones, mfa_phones, am_phones)
|
||||
return oov_words, oov_files, oov_file_words
|
@ -0,0 +1,287 @@
|
||||
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
from operator import itemgetter
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
from typing import Union
|
||||
|
||||
import jsonlines
|
||||
import numpy as np
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
from tqdm import tqdm
|
||||
|
||||
from paddlespeech.t2s.datasets.data_table import DataTable
|
||||
from paddlespeech.t2s.datasets.get_feats import Energy
|
||||
from paddlespeech.t2s.datasets.get_feats import LogMelFBank
|
||||
from paddlespeech.t2s.datasets.get_feats import Pitch
|
||||
from paddlespeech.t2s.datasets.preprocess_utils import get_phn_dur
|
||||
from paddlespeech.t2s.datasets.preprocess_utils import merge_silence
|
||||
from paddlespeech.t2s.exps.fastspeech2.preprocess import process_sentences
|
||||
|
||||
|
||||
def read_stats(stats_file: Union[str, Path]):
|
||||
scaler = StandardScaler()
|
||||
scaler.mean_ = np.load(stats_file)[0]
|
||||
scaler.scale_ = np.load(stats_file)[1]
|
||||
scaler.n_features_in_ = scaler.mean_.shape[0]
|
||||
return scaler
|
||||
|
||||
|
||||
def get_stats(pretrained_model_dir: Path):
|
||||
speech_stats_file = pretrained_model_dir / "speech_stats.npy"
|
||||
pitch_stats_file = pretrained_model_dir / "pitch_stats.npy"
|
||||
energy_stats_file = pretrained_model_dir / "energy_stats.npy"
|
||||
speech_scaler = read_stats(speech_stats_file)
|
||||
pitch_scaler = read_stats(pitch_stats_file)
|
||||
energy_scaler = read_stats(energy_stats_file)
|
||||
|
||||
return speech_scaler, pitch_scaler, energy_scaler
|
||||
|
||||
|
||||
def get_map(duration_file: Union[str, Path],
|
||||
dump_dir: Path,
|
||||
pretrained_model_dir: Path):
|
||||
"""get phone map and speaker map, save on dump_dir
|
||||
|
||||
Args:
|
||||
duration_file (str): durantions.txt
|
||||
dump_dir (Path): dump dir
|
||||
pretrained_model_dir (Path): pretrained model dir
|
||||
"""
|
||||
# copy phone map file from pretrained model path
|
||||
phones_dict = dump_dir / "phone_id_map.txt"
|
||||
os.system("cp %s %s" %
|
||||
(pretrained_model_dir / "phone_id_map.txt", phones_dict))
|
||||
|
||||
# create a new speaker map file, replace the previous speakers.
|
||||
sentences, speaker_set = get_phn_dur(duration_file)
|
||||
merge_silence(sentences)
|
||||
speakers = sorted(list(speaker_set))
|
||||
num = len(speakers)
|
||||
speaker_dict = dump_dir / "speaker_id_map.txt"
|
||||
with open(speaker_dict, 'w') as f, open(pretrained_model_dir /
|
||||
"speaker_id_map.txt", 'r') as fr:
|
||||
for i, spk in enumerate(speakers):
|
||||
f.write(spk + ' ' + str(i) + '\n')
|
||||
for line in fr.readlines():
|
||||
spk_id = line.strip().split(" ")[-1]
|
||||
if int(spk_id) >= num:
|
||||
f.write(line)
|
||||
|
||||
vocab_phones = {}
|
||||
with open(phones_dict, 'rt') as f:
|
||||
phn_id = [line.strip().split() for line in f.readlines()]
|
||||
for phn, id in phn_id:
|
||||
vocab_phones[phn] = int(id)
|
||||
|
||||
vocab_speaker = {}
|
||||
with open(speaker_dict, 'rt') as f:
|
||||
spk_id = [line.strip().split() for line in f.readlines()]
|
||||
for spk, id in spk_id:
|
||||
vocab_speaker[spk] = int(id)
|
||||
|
||||
return sentences, vocab_phones, vocab_speaker
|
||||
|
||||
|
||||
def get_extractor(config):
|
||||
# Extractor
|
||||
mel_extractor = LogMelFBank(
|
||||
sr=config.fs,
|
||||
n_fft=config.n_fft,
|
||||
hop_length=config.n_shift,
|
||||
win_length=config.win_length,
|
||||
window=config.window,
|
||||
n_mels=config.n_mels,
|
||||
fmin=config.fmin,
|
||||
fmax=config.fmax)
|
||||
pitch_extractor = Pitch(
|
||||
sr=config.fs,
|
||||
hop_length=config.n_shift,
|
||||
f0min=config.f0min,
|
||||
f0max=config.f0max)
|
||||
energy_extractor = Energy(
|
||||
n_fft=config.n_fft,
|
||||
hop_length=config.n_shift,
|
||||
win_length=config.win_length,
|
||||
window=config.window)
|
||||
|
||||
return mel_extractor, pitch_extractor, energy_extractor
|
||||
|
||||
|
||||
def normalize(speech_scaler,
|
||||
pitch_scaler,
|
||||
energy_scaler,
|
||||
vocab_phones: Dict,
|
||||
vocab_speaker: Dict,
|
||||
raw_dump_dir: Path,
|
||||
type: str):
|
||||
|
||||
dumpdir = raw_dump_dir / type / "norm"
|
||||
dumpdir = Path(dumpdir).expanduser()
|
||||
dumpdir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# get dataset
|
||||
metadata_file = raw_dump_dir / type / "raw" / "metadata.jsonl"
|
||||
with jsonlines.open(metadata_file, 'r') as reader:
|
||||
metadata = list(reader)
|
||||
dataset = DataTable(
|
||||
metadata,
|
||||
converters={
|
||||
"speech": np.load,
|
||||
"pitch": np.load,
|
||||
"energy": np.load,
|
||||
})
|
||||
logging.info(f"The number of files = {len(dataset)}.")
|
||||
|
||||
# process each file
|
||||
output_metadata = []
|
||||
|
||||
for item in tqdm(dataset):
|
||||
utt_id = item['utt_id']
|
||||
speech = item['speech']
|
||||
pitch = item['pitch']
|
||||
energy = item['energy']
|
||||
# normalize
|
||||
speech = speech_scaler.transform(speech)
|
||||
speech_dir = dumpdir / "data_speech"
|
||||
speech_dir.mkdir(parents=True, exist_ok=True)
|
||||
speech_path = speech_dir / f"{utt_id}_speech.npy"
|
||||
np.save(speech_path, speech.astype(np.float32), allow_pickle=False)
|
||||
|
||||
pitch = pitch_scaler.transform(pitch)
|
||||
pitch_dir = dumpdir / "data_pitch"
|
||||
pitch_dir.mkdir(parents=True, exist_ok=True)
|
||||
pitch_path = pitch_dir / f"{utt_id}_pitch.npy"
|
||||
np.save(pitch_path, pitch.astype(np.float32), allow_pickle=False)
|
||||
|
||||
energy = energy_scaler.transform(energy)
|
||||
energy_dir = dumpdir / "data_energy"
|
||||
energy_dir.mkdir(parents=True, exist_ok=True)
|
||||
energy_path = energy_dir / f"{utt_id}_energy.npy"
|
||||
np.save(energy_path, energy.astype(np.float32), allow_pickle=False)
|
||||
|
||||
phone_ids = [vocab_phones[p] for p in item['phones']]
|
||||
spk_id = vocab_speaker[item["speaker"]]
|
||||
record = {
|
||||
"utt_id": item['utt_id'],
|
||||
"spk_id": spk_id,
|
||||
"text": phone_ids,
|
||||
"text_lengths": item['text_lengths'],
|
||||
"speech_lengths": item['speech_lengths'],
|
||||
"durations": item['durations'],
|
||||
"speech": str(speech_path),
|
||||
"pitch": str(pitch_path),
|
||||
"energy": str(energy_path)
|
||||
}
|
||||
# add spk_emb for voice cloning
|
||||
if "spk_emb" in item:
|
||||
record["spk_emb"] = str(item["spk_emb"])
|
||||
|
||||
output_metadata.append(record)
|
||||
output_metadata.sort(key=itemgetter('utt_id'))
|
||||
output_metadata_path = Path(dumpdir) / "metadata.jsonl"
|
||||
with jsonlines.open(output_metadata_path, 'w') as writer:
|
||||
for item in output_metadata:
|
||||
writer.write(item)
|
||||
logging.info(f"metadata dumped into {output_metadata_path}")
|
||||
|
||||
|
||||
def extract_feature(duration_file: str,
|
||||
config,
|
||||
input_dir: Path,
|
||||
dump_dir: Path,
|
||||
pretrained_model_dir: Path):
|
||||
|
||||
sentences, vocab_phones, vocab_speaker = get_map(duration_file, dump_dir,
|
||||
pretrained_model_dir)
|
||||
mel_extractor, pitch_extractor, energy_extractor = get_extractor(config)
|
||||
|
||||
wav_files = sorted(list((input_dir).rglob("*.wav")))
|
||||
# split data into 3 sections, train: 80%, dev: 10%, test: 10%
|
||||
num_train = math.ceil(len(wav_files) * 0.8)
|
||||
num_dev = math.ceil(len(wav_files) * 0.1)
|
||||
print(num_train, num_dev)
|
||||
|
||||
train_wav_files = wav_files[:num_train]
|
||||
dev_wav_files = wav_files[num_train:num_train + num_dev]
|
||||
test_wav_files = wav_files[num_train + num_dev:]
|
||||
|
||||
train_dump_dir = dump_dir / "train" / "raw"
|
||||
train_dump_dir.mkdir(parents=True, exist_ok=True)
|
||||
dev_dump_dir = dump_dir / "dev" / "raw"
|
||||
dev_dump_dir.mkdir(parents=True, exist_ok=True)
|
||||
test_dump_dir = dump_dir / "test" / "raw"
|
||||
test_dump_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# process for the 3 sections
|
||||
num_cpu = 4
|
||||
cut_sil = True
|
||||
spk_emb_dir = None
|
||||
write_metadata_method = "w"
|
||||
speech_scaler, pitch_scaler, energy_scaler = get_stats(pretrained_model_dir)
|
||||
|
||||
if train_wav_files:
|
||||
process_sentences(
|
||||
config=config,
|
||||
fps=train_wav_files,
|
||||
sentences=sentences,
|
||||
output_dir=train_dump_dir,
|
||||
mel_extractor=mel_extractor,
|
||||
pitch_extractor=pitch_extractor,
|
||||
energy_extractor=energy_extractor,
|
||||
nprocs=num_cpu,
|
||||
cut_sil=cut_sil,
|
||||
spk_emb_dir=spk_emb_dir,
|
||||
write_metadata_method=write_metadata_method)
|
||||
# norm
|
||||
normalize(speech_scaler, pitch_scaler, energy_scaler, vocab_phones,
|
||||
vocab_speaker, dump_dir, "train")
|
||||
|
||||
if dev_wav_files:
|
||||
process_sentences(
|
||||
config=config,
|
||||
fps=dev_wav_files,
|
||||
sentences=sentences,
|
||||
output_dir=dev_dump_dir,
|
||||
mel_extractor=mel_extractor,
|
||||
pitch_extractor=pitch_extractor,
|
||||
energy_extractor=energy_extractor,
|
||||
nprocs=num_cpu,
|
||||
cut_sil=cut_sil,
|
||||
spk_emb_dir=spk_emb_dir,
|
||||
write_metadata_method=write_metadata_method)
|
||||
# norm
|
||||
normalize(speech_scaler, pitch_scaler, energy_scaler, vocab_phones,
|
||||
vocab_speaker, dump_dir, "dev")
|
||||
|
||||
if test_wav_files:
|
||||
process_sentences(
|
||||
config=config,
|
||||
fps=test_wav_files,
|
||||
sentences=sentences,
|
||||
output_dir=test_dump_dir,
|
||||
mel_extractor=mel_extractor,
|
||||
pitch_extractor=pitch_extractor,
|
||||
energy_extractor=energy_extractor,
|
||||
nprocs=num_cpu,
|
||||
cut_sil=cut_sil,
|
||||
spk_emb_dir=spk_emb_dir,
|
||||
write_metadata_method=write_metadata_method)
|
||||
|
||||
# norm
|
||||
normalize(speech_scaler, pitch_scaler, energy_scaler, vocab_phones,
|
||||
vocab_speaker, dump_dir, "test")
|
@ -0,0 +1,316 @@
|
||||
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
import yaml
|
||||
from paddle import distributed as dist
|
||||
from yacs.config import CfgNode
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import paddle
|
||||
import soundfile as sf
|
||||
import yaml
|
||||
from timer import timer
|
||||
from yacs.config import CfgNode
|
||||
|
||||
from paddlespeech.t2s.exps.fastspeech2.train import train_sp
|
||||
|
||||
# from .check_oov import get_check_result
|
||||
# from .extract import extract_feature
|
||||
# from .label_process import get_single_label
|
||||
# from .prepare_env import generate_finetune_env
|
||||
|
||||
from check_oov import get_check_result
|
||||
from extract import extract_feature
|
||||
from label_process import get_single_label
|
||||
from prepare_env import generate_finetune_env
|
||||
|
||||
from utils.gen_duration_from_textgrid import gen_duration_from_textgrid
|
||||
|
||||
DICT_EN = 'source/tools/aligner/cmudict-0.7b'
|
||||
DICT_EN_v2 = 'source/tools/aligner/cmudict-0.7b.dict'
|
||||
DICT_ZH = 'source/tools/aligner/simple.lexicon'
|
||||
DICT_ZH_v2 = 'source/tools/aligner/simple.dict'
|
||||
MODEL_DIR_EN = 'source/tools/aligner/vctk_model.zip'
|
||||
MODEL_DIR_ZH = 'source/tools/aligner/aishell3_model.zip'
|
||||
MFA_PHONE_EN = 'source/tools/aligner/vctk_model/meta.yaml'
|
||||
MFA_PHONE_ZH = 'source/tools/aligner/aishell3_model/meta.yaml'
|
||||
MFA_PATH = 'source/tools/montreal-forced-aligner/bin'
|
||||
os.environ['PATH'] = MFA_PATH + '/:' + os.environ['PATH']
|
||||
|
||||
|
||||
class TrainArgs():
|
||||
def __init__(self, ngpu, config_file, dump_dir: Path, output_dir: Path):
|
||||
self.config = str(config_file)
|
||||
self.train_metadata = str(dump_dir / "train/norm/metadata.jsonl")
|
||||
self.dev_metadata = str(dump_dir / "dev/norm/metadata.jsonl")
|
||||
self.output_dir = str(output_dir)
|
||||
self.ngpu = ngpu
|
||||
self.phones_dict = str(dump_dir / "phone_id_map.txt")
|
||||
self.speaker_dict = str(dump_dir / "speaker_id_map.txt")
|
||||
self.voice_cloning = False
|
||||
|
||||
|
||||
def get_mfa_result(
|
||||
input_dir: Union[str, Path],
|
||||
mfa_dir: Union[str, Path],
|
||||
lang: str='en',
|
||||
mfa_version='v1'):
|
||||
"""get mfa result
|
||||
|
||||
Args:
|
||||
input_dir (Union[str, Path]): input dir including wav file and label
|
||||
mfa_dir (Union[str, Path]): mfa result dir
|
||||
lang (str, optional): input audio language. Defaults to 'en'.
|
||||
"""
|
||||
input_dir = str(input_dir).replace("/newdir", "")
|
||||
# MFA
|
||||
if mfa_version == 'v1':
|
||||
if lang == 'en':
|
||||
DICT = DICT_EN
|
||||
MODEL_DIR = MODEL_DIR_EN
|
||||
|
||||
elif lang == 'zh':
|
||||
DICT = DICT_ZH
|
||||
MODEL_DIR = MODEL_DIR_ZH
|
||||
else:
|
||||
print('please input right lang!!')
|
||||
|
||||
CMD = 'mfa_align' + ' ' + str(
|
||||
input_dir) + ' ' + DICT + ' ' + MODEL_DIR + ' ' + str(mfa_dir)
|
||||
os.system(CMD)
|
||||
else:
|
||||
if lang == 'en':
|
||||
DICT = DICT_EN_v2
|
||||
MODEL_DIR = MODEL_DIR_EN
|
||||
|
||||
elif lang == 'zh':
|
||||
DICT = DICT_ZH_v2
|
||||
MODEL_DIR = MODEL_DIR_ZH
|
||||
else:
|
||||
print('please input right lang!!')
|
||||
|
||||
CMD = 'mfa align' + ' ' + str(
|
||||
input_dir) + ' ' + DICT + ' ' + MODEL_DIR + ' ' + str(mfa_dir)
|
||||
os.system(CMD)
|
||||
|
||||
|
||||
def finetune_model(input_dir,
|
||||
pretrained_model_dir,
|
||||
mfa_dir,
|
||||
dump_dir,
|
||||
lang,
|
||||
output_dir,
|
||||
ngpu,
|
||||
epoch,
|
||||
batch_size,
|
||||
mfa_version='v1'):
|
||||
fs = 24000
|
||||
n_shift = 300
|
||||
input_dir = Path(input_dir).expanduser()
|
||||
mfa_dir = Path(mfa_dir).expanduser()
|
||||
mfa_dir.mkdir(parents=True, exist_ok=True)
|
||||
dump_dir = Path(dump_dir).expanduser()
|
||||
dump_dir.mkdir(parents=True, exist_ok=True)
|
||||
output_dir = Path(output_dir).expanduser()
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
pretrained_model_dir = Path(pretrained_model_dir).expanduser()
|
||||
|
||||
# read config
|
||||
config_file = pretrained_model_dir / "default.yaml"
|
||||
print("config_path: ")
|
||||
print(f"########### { config_file } ###########")
|
||||
with open(config_file) as f:
|
||||
config = CfgNode(yaml.safe_load(f))
|
||||
config.max_epoch = config.max_epoch + epoch
|
||||
if batch_size > 0:
|
||||
config.batch_size = batch_size
|
||||
|
||||
if lang == 'en':
|
||||
lexicon_file = DICT_EN
|
||||
mfa_phone_file = MFA_PHONE_EN
|
||||
elif lang == 'zh':
|
||||
lexicon_file = DICT_ZH
|
||||
mfa_phone_file = MFA_PHONE_ZH
|
||||
else:
|
||||
print('please input right lang!!')
|
||||
am_phone_file = pretrained_model_dir / "phone_id_map.txt"
|
||||
label_file = input_dir / "labels.txt"
|
||||
|
||||
#check phone for mfa and am finetune
|
||||
oov_words, oov_files, oov_file_words = get_check_result(
|
||||
label_file, lexicon_file, mfa_phone_file, am_phone_file)
|
||||
input_dir = get_single_label(label_file, oov_files, input_dir)
|
||||
|
||||
# get mfa result
|
||||
print("input_dir: ", input_dir)
|
||||
get_mfa_result(input_dir, mfa_dir, lang, mfa_version=mfa_version)
|
||||
|
||||
# # generate durations.txt
|
||||
duration_file = "./durations.txt"
|
||||
print("mfa_dir: ", mfa_dir)
|
||||
gen_duration_from_textgrid(mfa_dir, duration_file, fs, n_shift)
|
||||
|
||||
# generate phone and speaker map files
|
||||
extract_feature(duration_file, config, input_dir, dump_dir,
|
||||
pretrained_model_dir)
|
||||
|
||||
# create finetune env
|
||||
generate_finetune_env(output_dir, pretrained_model_dir)
|
||||
|
||||
# create a new args for training
|
||||
train_args = TrainArgs(ngpu, config_file, dump_dir, output_dir)
|
||||
|
||||
# finetune models
|
||||
# dispatch
|
||||
if ngpu > 1:
|
||||
dist.spawn(train_sp, (train_args, config), nprocs=ngpu)
|
||||
else:
|
||||
train_sp(train_args, config)
|
||||
return output_dir
|
||||
|
||||
# 合成
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# parse config and args
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Preprocess audio and then extract features.")
|
||||
|
||||
parser.add_argument(
|
||||
"--input_dir",
|
||||
type=str,
|
||||
help="directory containing audio and label file")
|
||||
|
||||
parser.add_argument(
|
||||
"--pretrained_model_dir",
|
||||
type=str,
|
||||
help="Path to pretrained model")
|
||||
|
||||
parser.add_argument(
|
||||
"--mfa_dir",
|
||||
type=str,
|
||||
default="./mfa_result",
|
||||
help="directory to save aligned files")
|
||||
|
||||
parser.add_argument(
|
||||
"--dump_dir",
|
||||
type=str,
|
||||
default="./dump",
|
||||
help="directory to save feature files and metadata.")
|
||||
|
||||
parser.add_argument(
|
||||
"--output_dir",
|
||||
type=str,
|
||||
default="./exp/default/",
|
||||
help="directory to save finetune model.")
|
||||
|
||||
parser.add_argument(
|
||||
'--lang',
|
||||
type=str,
|
||||
default='zh',
|
||||
choices=['zh', 'en'],
|
||||
help='Choose input audio language. zh or en')
|
||||
|
||||
parser.add_argument(
|
||||
"--ngpu", type=int, default=1, help="if ngpu=0, use cpu.")
|
||||
|
||||
parser.add_argument("--epoch", type=int, default=100, help="finetune epoch")
|
||||
|
||||
parser.add_argument(
|
||||
"--batch_size",
|
||||
type=int,
|
||||
default=-1,
|
||||
help="batch size, default -1 means same as pretrained model")
|
||||
|
||||
parser.add_argument(
|
||||
"--mfa_version",
|
||||
type=str,
|
||||
default='v1',
|
||||
help="mfa version , you can choose v1 or v2")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
finetune_model(input_dir=args.input_dir,
|
||||
pretrained_model_dir=args.pretrained_model_dir,
|
||||
mfa_dir=args.mfa_dir,
|
||||
dump_dir=args.dump_dir,
|
||||
lang=args.lang,
|
||||
output_dir=args.output_dir,
|
||||
ngpu=args.ngpu,
|
||||
epoch=args.epoch,
|
||||
batch_size=args.batch_size,
|
||||
mfa_version=args.mfa_version)
|
||||
|
||||
|
||||
# 10 句话 finetune 测试
|
||||
# input_dir = "source/wav/finetune/default"
|
||||
# pretrained_model_dir = "source/model/fastspeech2_aishell3_ckpt_1.1.0"
|
||||
# mfa_dir = "tmp_dir/finetune/mfa"
|
||||
# dump_dir = "tmp_dir/finetune/dump"
|
||||
# lang = "zh"
|
||||
# output_dir = "tmp_dir/finetune/out"
|
||||
# ngpu = 0
|
||||
# epoch = 2
|
||||
# batch_size = 2
|
||||
# mfa_version = 'v2'
|
||||
# 微调
|
||||
# finetune_model(input_dir,
|
||||
# pretrained_model_dir,
|
||||
# mfa_dir,
|
||||
# dump_dir,
|
||||
# lang,
|
||||
# output_dir,
|
||||
# ngpu,
|
||||
# epoch,
|
||||
# batch_size,
|
||||
# mfa_version=mfa_version)
|
||||
|
||||
# # 合成测试
|
||||
# text = "source/wav/finetune/test.txt"
|
||||
|
||||
# lang = "zh"
|
||||
# spk_id = 0
|
||||
# am = "fastspeech2_aishell3"
|
||||
# am_config = f"{pretrained_model_dir}/default.yaml"
|
||||
# am_ckpt = f"{output_dir}/checkpoints/snapshot_iter_96408.pdz"
|
||||
# am_stat = f"{pretrained_model_dir}/speech_stats.npy"
|
||||
# speaker_dict = f"{dump_dir}/speaker_id_map.txt"
|
||||
# phones_dict = f"{dump_dir}/phone_id_map.txt"
|
||||
# tones_dict = None
|
||||
# voc = "hifigan_aishell3"
|
||||
# voc_config = "source/model/hifigan_aishell3_ckpt_0.2.0/default.yaml"
|
||||
# voc_ckpt = "source/model/hifigan_aishell3_ckpt_0.2.0/snapshot_iter_2500000.pdz"
|
||||
# voc_stat = "source/model/hifigan_aishell3_ckpt_0.2.0/feats_stats.npy"
|
||||
|
||||
# wav_output_dir = "source/wav/finetune/out"
|
||||
|
||||
# synthesize(text,
|
||||
# wav_output_dir,
|
||||
# lang,
|
||||
# spk_id,
|
||||
# am,
|
||||
# am_config,
|
||||
# am_ckpt,
|
||||
# am_stat,
|
||||
# speaker_dict,
|
||||
# phones_dict,
|
||||
# tones_dict,
|
||||
# voc,
|
||||
# voc_config,
|
||||
# voc_ckpt,
|
||||
# voc_stat
|
||||
# )
|
@ -0,0 +1,63 @@
|
||||
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from typing import Union
|
||||
|
||||
|
||||
def change_baker_label(baker_label_file: Union[str, Path],
|
||||
out_label_file: Union[str, Path]):
|
||||
"""change baker label file to regular label file
|
||||
|
||||
Args:
|
||||
baker_label_file (Union[str, Path]): Original baker label file
|
||||
out_label_file (Union[str, Path]): regular label file
|
||||
"""
|
||||
with open(baker_label_file) as f:
|
||||
lines = f.readlines()
|
||||
|
||||
with open(out_label_file, "w") as fw:
|
||||
for i in range(0, len(lines), 2):
|
||||
utt_id = lines[i].split()[0]
|
||||
transcription = lines[i + 1].strip()
|
||||
fw.write(utt_id + "|" + transcription + "\n")
|
||||
|
||||
|
||||
def get_single_label(label_file: Union[str, Path],
|
||||
oov_files: List[Union[str, Path]],
|
||||
input_dir: Union[str, Path]):
|
||||
"""Divide the label file into individual files according to label_file
|
||||
|
||||
Args:
|
||||
label_file (str or Path): label file, format: utt_id|phones id
|
||||
input_dir (Path): input dir including audios
|
||||
"""
|
||||
input_dir = Path(input_dir).expanduser()
|
||||
new_dir = input_dir / "newdir"
|
||||
new_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(label_file, "r") as f:
|
||||
for line in f.readlines():
|
||||
utt_id = line.split("|")[0]
|
||||
if utt_id not in oov_files:
|
||||
transcription = line.split("|")[1].strip()
|
||||
wav_file = str(input_dir) + "/" + utt_id + ".wav"
|
||||
new_wav_file = str(new_dir) + "/" + utt_id + ".wav"
|
||||
os.system("cp %s %s" % (wav_file, new_wav_file))
|
||||
single_file = str(new_dir) + "/" + utt_id + ".txt"
|
||||
with open(single_file, "w") as fw:
|
||||
fw.write(transcription)
|
||||
|
||||
return new_dir
|
@ -0,0 +1,35 @@
|
||||
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def generate_finetune_env(output_dir: Path, pretrained_model_dir: Path):
|
||||
|
||||
output_dir = output_dir / "checkpoints/"
|
||||
output_dir = output_dir.resolve()
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
model_path = sorted(list((pretrained_model_dir).rglob("*.pdz")))[0]
|
||||
model_path = model_path.resolve()
|
||||
iter = int(str(model_path).split("_")[-1].split(".")[0])
|
||||
model_file = str(model_path).split("/")[-1]
|
||||
|
||||
os.system("cp %s %s" % (model_path, output_dir))
|
||||
|
||||
records_file = output_dir / "records.jsonl"
|
||||
with open(records_file, "w") as f:
|
||||
line = "\"time\": \"2022-08-06 07:51:53.463650\", \"path\": \"%s\", \"iteration\": %d" % (
|
||||
str(output_dir / model_file), iter)
|
||||
f.write("{" + line + "}" + "\n")
|
@ -0,0 +1,262 @@
|
||||
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import paddle
|
||||
import soundfile as sf
|
||||
import yaml
|
||||
from timer import timer
|
||||
from yacs.config import CfgNode
|
||||
|
||||
from paddlespeech.t2s.exps.syn_utils import am_to_static
|
||||
from paddlespeech.t2s.exps.syn_utils import get_am_inference
|
||||
from paddlespeech.t2s.exps.syn_utils import get_frontend
|
||||
from paddlespeech.t2s.exps.syn_utils import get_sentences
|
||||
from paddlespeech.t2s.exps.syn_utils import get_voc_inference
|
||||
from paddlespeech.t2s.exps.syn_utils import run_frontend
|
||||
from paddlespeech.t2s.exps.syn_utils import voc_to_static
|
||||
|
||||
|
||||
def evaluate(args):
|
||||
|
||||
# Init body.
|
||||
with open(args.am_config) as f:
|
||||
am_config = CfgNode(yaml.safe_load(f))
|
||||
with open(args.voc_config) as f:
|
||||
voc_config = CfgNode(yaml.safe_load(f))
|
||||
|
||||
print("========Args========")
|
||||
print(yaml.safe_dump(vars(args)))
|
||||
print("========Config========")
|
||||
print(am_config)
|
||||
print(voc_config)
|
||||
|
||||
sentences = get_sentences(text_file=args.text, lang=args.lang)
|
||||
|
||||
# frontend
|
||||
frontend = get_frontend(
|
||||
lang=args.lang,
|
||||
phones_dict=args.phones_dict,
|
||||
tones_dict=args.tones_dict)
|
||||
print("frontend done!")
|
||||
|
||||
# acoustic model
|
||||
am_name = args.am[:args.am.rindex('_')]
|
||||
am_dataset = args.am[args.am.rindex('_') + 1:]
|
||||
|
||||
am_inference = get_am_inference(
|
||||
am=args.am,
|
||||
am_config=am_config,
|
||||
am_ckpt=args.am_ckpt,
|
||||
am_stat=args.am_stat,
|
||||
phones_dict=args.phones_dict,
|
||||
tones_dict=args.tones_dict,
|
||||
speaker_dict=args.speaker_dict)
|
||||
print("acoustic model done!")
|
||||
# vocoder
|
||||
voc_inference = get_voc_inference(
|
||||
voc=args.voc,
|
||||
voc_config=voc_config,
|
||||
voc_ckpt=args.voc_ckpt,
|
||||
voc_stat=args.voc_stat)
|
||||
print("voc done!")
|
||||
|
||||
# whether dygraph to static
|
||||
if args.inference_dir:
|
||||
# acoustic model
|
||||
am_inference = am_to_static(
|
||||
am_inference=am_inference,
|
||||
am=args.am,
|
||||
inference_dir=args.inference_dir,
|
||||
speaker_dict=args.speaker_dict)
|
||||
# vocoder
|
||||
voc_inference = voc_to_static(
|
||||
voc_inference=voc_inference,
|
||||
voc=args.voc,
|
||||
inference_dir=args.inference_dir)
|
||||
|
||||
output_dir = Path(args.output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
merge_sentences = False
|
||||
# Avoid not stopping at the end of a sub sentence when tacotron2_ljspeech dygraph to static graph
|
||||
# but still not stopping in the end (NOTE by yuantian01 Feb 9 2022)
|
||||
if am_name == 'tacotron2':
|
||||
merge_sentences = True
|
||||
|
||||
get_tone_ids = False
|
||||
if am_name == 'speedyspeech':
|
||||
get_tone_ids = True
|
||||
|
||||
N = 0
|
||||
T = 0
|
||||
for utt_id, sentence in sentences:
|
||||
with timer() as t:
|
||||
frontend_dict = run_frontend(
|
||||
frontend=frontend,
|
||||
text=sentence,
|
||||
merge_sentences=merge_sentences,
|
||||
get_tone_ids=get_tone_ids,
|
||||
lang=args.lang)
|
||||
phone_ids = frontend_dict['phone_ids']
|
||||
with paddle.no_grad():
|
||||
flags = 0
|
||||
for i in range(len(phone_ids)):
|
||||
part_phone_ids = phone_ids[i]
|
||||
# acoustic model
|
||||
if am_name == 'fastspeech2':
|
||||
# multi speaker
|
||||
if am_dataset in {"aishell3", "vctk", "mix"}:
|
||||
spk_id = paddle.to_tensor(args.spk_id)
|
||||
mel = am_inference(part_phone_ids, spk_id)
|
||||
else:
|
||||
mel = am_inference(part_phone_ids)
|
||||
elif am_name == 'speedyspeech':
|
||||
part_tone_ids = frontend_dict['tone_ids'][i]
|
||||
if am_dataset in {"aishell3", "vctk", "mix"}:
|
||||
spk_id = paddle.to_tensor(args.spk_id)
|
||||
mel = am_inference(part_phone_ids, part_tone_ids,
|
||||
spk_id)
|
||||
else:
|
||||
mel = am_inference(part_phone_ids, part_tone_ids)
|
||||
elif am_name == 'tacotron2':
|
||||
mel = am_inference(part_phone_ids)
|
||||
# vocoder
|
||||
wav = voc_inference(mel)
|
||||
if flags == 0:
|
||||
wav_all = wav
|
||||
flags = 1
|
||||
else:
|
||||
wav_all = paddle.concat([wav_all, wav])
|
||||
wav = wav_all.numpy()
|
||||
N += wav.size
|
||||
T += t.elapse
|
||||
speed = wav.size / t.elapse
|
||||
rtf = am_config.fs / speed
|
||||
print(
|
||||
f"{utt_id}, mel: {mel.shape}, wave: {wav.shape}, time: {t.elapse}s, Hz: {speed}, RTF: {rtf}."
|
||||
)
|
||||
sf.write(
|
||||
str(output_dir / (utt_id + ".wav")), wav, samplerate=am_config.fs)
|
||||
print(f"{utt_id} done!")
|
||||
print(f"generation speed: {N / T}Hz, RTF: {am_config.fs / (N / T) }")
|
||||
|
||||
|
||||
def parse_args():
|
||||
# parse args and config
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Synthesize with acoustic model & vocoder")
|
||||
# acoustic model
|
||||
parser.add_argument(
|
||||
'--am',
|
||||
type=str,
|
||||
default='fastspeech2_csmsc',
|
||||
choices=[
|
||||
'speedyspeech_csmsc', 'speedyspeech_aishell3', 'fastspeech2_csmsc',
|
||||
'fastspeech2_ljspeech', 'fastspeech2_aishell3', 'fastspeech2_vctk',
|
||||
'tacotron2_csmsc', 'tacotron2_ljspeech', 'fastspeech2_mix'
|
||||
],
|
||||
help='Choose acoustic model type of tts task.')
|
||||
parser.add_argument(
|
||||
'--am_config', type=str, default=None, help='Config of acoustic model.')
|
||||
parser.add_argument(
|
||||
'--am_ckpt',
|
||||
type=str,
|
||||
default=None,
|
||||
help='Checkpoint file of acoustic model.')
|
||||
parser.add_argument(
|
||||
"--am_stat",
|
||||
type=str,
|
||||
default=None,
|
||||
help="mean and standard deviation used to normalize spectrogram when training acoustic model."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--phones_dict", type=str, default=None, help="phone vocabulary file.")
|
||||
parser.add_argument(
|
||||
"--tones_dict", type=str, default=None, help="tone vocabulary file.")
|
||||
parser.add_argument(
|
||||
"--speaker_dict", type=str, default=None, help="speaker id map file.")
|
||||
parser.add_argument(
|
||||
'--spk_id',
|
||||
type=int,
|
||||
default=0,
|
||||
help='spk id for multi speaker acoustic model')
|
||||
# vocoder
|
||||
parser.add_argument(
|
||||
'--voc',
|
||||
type=str,
|
||||
default='pwgan_csmsc',
|
||||
choices=[
|
||||
'pwgan_csmsc',
|
||||
'pwgan_ljspeech',
|
||||
'pwgan_aishell3',
|
||||
'pwgan_vctk',
|
||||
'mb_melgan_csmsc',
|
||||
'style_melgan_csmsc',
|
||||
'hifigan_csmsc',
|
||||
'hifigan_ljspeech',
|
||||
'hifigan_aishell3',
|
||||
'hifigan_vctk',
|
||||
'wavernn_csmsc',
|
||||
],
|
||||
help='Choose vocoder type of tts task.')
|
||||
parser.add_argument(
|
||||
'--voc_config', type=str, default=None, help='Config of voc.')
|
||||
parser.add_argument(
|
||||
'--voc_ckpt', type=str, default=None, help='Checkpoint file of voc.')
|
||||
parser.add_argument(
|
||||
"--voc_stat",
|
||||
type=str,
|
||||
default=None,
|
||||
help="mean and standard deviation used to normalize spectrogram when training voc."
|
||||
)
|
||||
# other
|
||||
parser.add_argument(
|
||||
'--lang',
|
||||
type=str,
|
||||
default='zh',
|
||||
help='Choose model language. zh or en or mix')
|
||||
|
||||
parser.add_argument(
|
||||
"--inference_dir",
|
||||
type=str,
|
||||
default=None,
|
||||
help="dir to save inference models")
|
||||
parser.add_argument(
|
||||
"--ngpu", type=int, default=1, help="if ngpu == 0, use cpu.")
|
||||
parser.add_argument(
|
||||
"--text",
|
||||
type=str,
|
||||
help="text to synthesize, a 'utt_id sentence' pair per line.")
|
||||
parser.add_argument("--output_dir", type=str, help="output dir.")
|
||||
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
if args.ngpu == 0:
|
||||
paddle.set_device("cpu")
|
||||
elif args.ngpu > 0:
|
||||
paddle.set_device("gpu")
|
||||
else:
|
||||
print("ngpu should >= 0 !")
|
||||
|
||||
evaluate(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -0,0 +1,130 @@
|
||||
"""
|
||||
G2p Voice Clone
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from pathlib import Path
|
||||
from paddlespeech.t2s.modules.normalizer import ZScore
|
||||
import numpy as np
|
||||
import paddle
|
||||
import soundfile as sf
|
||||
import yaml
|
||||
from yacs.config import CfgNode
|
||||
|
||||
from paddlespeech.t2s.frontend.zh_frontend import Frontend
|
||||
from paddlespeech.vector.exps.ge2e.audio_processor import SpeakerVerificationPreprocessor
|
||||
from paddlespeech.vector.models.lstm_speaker_encoder import LSTMSpeakerEncoder
|
||||
from paddlespeech.utils.dynamic_import import dynamic_import
|
||||
|
||||
class VoiceCloneGE2E():
|
||||
def __init__(self):
|
||||
# 设置预训练模型的路径和其他变量
|
||||
self.model_alias = {
|
||||
# acoustic model
|
||||
"fastspeech2":
|
||||
"paddlespeech.t2s.models.fastspeech2:FastSpeech2",
|
||||
"fastspeech2_inference":
|
||||
"paddlespeech.t2s.models.fastspeech2:FastSpeech2Inference",
|
||||
# voc
|
||||
"pwgan":
|
||||
"paddlespeech.t2s.models.parallel_wavegan:PWGGenerator",
|
||||
"pwgan_inference":
|
||||
"paddlespeech.t2s.models.parallel_wavegan:PWGInference",
|
||||
}
|
||||
# am
|
||||
self.am = "fastspeech2_aishell3"
|
||||
self.am_config = "source/model/fastspeech2_nosil_aishell3_vc1_ckpt_0.5/default.yaml"
|
||||
self.am_ckpt = "source/model/fastspeech2_nosil_aishell3_vc1_ckpt_0.5/snapshot_iter_96400.pdz"
|
||||
self.am_stat = "source/model/fastspeech2_nosil_aishell3_vc1_ckpt_0.5/speech_stats.npy"
|
||||
self.phones_dict = "source/model/fastspeech2_nosil_aishell3_vc1_ckpt_0.5/phone_id_map.txt"
|
||||
# voc
|
||||
self.voc = "pwgan_aishell3"
|
||||
self.voc_config = "source/model/pwg_aishell3_ckpt_0.5/default.yaml"
|
||||
self.voc_ckpt = "source/model/pwg_aishell3_ckpt_0.5/snapshot_iter_1000000.pdz"
|
||||
self.voc_stat = "source/model/pwg_aishell3_ckpt_0.5/feats_stats.npy"
|
||||
# ge2e
|
||||
self.ge2e_params_path = "source/model/ge2e_ckpt_0.3/step-3000000.pdparams"
|
||||
with open(self.am_config) as f:
|
||||
self.am_config = CfgNode(yaml.safe_load(f))
|
||||
with open(self.voc_config) as f:
|
||||
self.voc_config = CfgNode(yaml.safe_load(f))
|
||||
|
||||
self.p = SpeakerVerificationPreprocessor(
|
||||
sampling_rate=16000,
|
||||
audio_norm_target_dBFS=-30,
|
||||
vad_window_length=30,
|
||||
vad_moving_average_width=8,
|
||||
vad_max_silence_length=6,
|
||||
mel_window_length=25,
|
||||
mel_window_step=10,
|
||||
n_mels=40,
|
||||
partial_n_frames=160,
|
||||
min_pad_coverage=0.75,
|
||||
partial_overlap_ratio=0.5
|
||||
)
|
||||
self.speaker_encoder = LSTMSpeakerEncoder(
|
||||
n_mels=40, num_layers=3, hidden_size=256, output_size=256
|
||||
)
|
||||
self.speaker_encoder.set_state_dict(paddle.load(self.ge2e_params_path))
|
||||
self.speaker_encoder.eval()
|
||||
|
||||
with open(self.phones_dict, "r") as f:
|
||||
self.phn_id = [line.strip().split() for line in f.readlines()]
|
||||
self.vocab_size = len(self.phn_id)
|
||||
|
||||
self.frontend = Frontend(phone_vocab_path=self.phones_dict)
|
||||
|
||||
# am
|
||||
am_name = "fastspeech2"
|
||||
am_class = dynamic_import(am_name, self.model_alias)
|
||||
print(self.am_config.n_mels)
|
||||
self.am = am_class(
|
||||
idim=self.vocab_size, odim=self.am_config.n_mels, spk_num=None, **self.am_config["model"])
|
||||
self.am_inference_class = dynamic_import(am_name + '_inference', self.model_alias)
|
||||
self.am.set_state_dict(paddle.load(self.am_ckpt)["main_params"])
|
||||
self.am.eval()
|
||||
|
||||
am_mu, am_std = np.load(self.am_stat)
|
||||
am_mu = paddle.to_tensor(am_mu)
|
||||
am_std = paddle.to_tensor(am_std)
|
||||
self.am_normalizer = ZScore(am_mu, am_std)
|
||||
self.am_inference = self.am_inference_class(self.am_normalizer, self.am)
|
||||
self.am_inference.eval()
|
||||
|
||||
# voc
|
||||
voc_name = "pwgan"
|
||||
voc_class = dynamic_import(voc_name, self.model_alias)
|
||||
voc_inference_class = dynamic_import(voc_name + '_inference', self.model_alias)
|
||||
self.voc = voc_class(**self.voc_config["generator_params"])
|
||||
self.voc.set_state_dict(paddle.load(self.voc_ckpt)["generator_params"])
|
||||
self.voc.remove_weight_norm()
|
||||
self.voc.eval()
|
||||
voc_mu, voc_std = np.load(self.voc_stat)
|
||||
voc_mu = paddle.to_tensor(voc_mu)
|
||||
voc_std = paddle.to_tensor(voc_std)
|
||||
voc_normalizer = ZScore(voc_mu, voc_std)
|
||||
self.voc_inference = voc_inference_class(voc_normalizer, self.voc)
|
||||
self.voc_inference.eval()
|
||||
|
||||
def vc(self, text, input_wav, out_wav):
|
||||
|
||||
input_ids = self.frontend.get_input_ids(text, merge_sentences=True)
|
||||
phone_ids = input_ids["phone_ids"][0]
|
||||
mel_sequences = self.p.extract_mel_partials(self.p.preprocess_wav(input_wav))
|
||||
with paddle.no_grad():
|
||||
spk_emb = self.speaker_encoder.embed_utterance(
|
||||
paddle.to_tensor(mel_sequences))
|
||||
|
||||
with paddle.no_grad():
|
||||
wav = self.voc_inference(self.am_inference(phone_ids, spk_emb=spk_emb))
|
||||
sf.write(out_wav, wav.numpy(), samplerate=self.am_config.fs)
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
voiceclone = VoiceCloneGE2E()
|
||||
text = "测试一下你的合成效果"
|
||||
input_wav = "wav/009901.wav"
|
||||
out_wav = "wav/9901_clone.wav"
|
||||
voiceclone.vc(text, input_wav, out_wav)
|
@ -0,0 +1,113 @@
|
||||
"""
|
||||
G2p Voice Clone
|
||||
"""
|
||||
|
||||
import os
|
||||
from paddlespeech.t2s.modules.normalizer import ZScore
|
||||
import numpy as np
|
||||
import paddle
|
||||
import soundfile as sf
|
||||
import yaml
|
||||
from yacs.config import CfgNode
|
||||
|
||||
from paddlespeech.t2s.frontend.zh_frontend import Frontend
|
||||
from paddlespeech.utils.dynamic_import import dynamic_import
|
||||
from paddlespeech.cli.vector import VectorExecutor
|
||||
|
||||
|
||||
|
||||
model_alias = {
|
||||
# acoustic model
|
||||
"fastspeech2":
|
||||
"paddlespeech.t2s.models.fastspeech2:FastSpeech2",
|
||||
"fastspeech2_inference":
|
||||
"paddlespeech.t2s.models.fastspeech2:FastSpeech2Inference",
|
||||
# voc
|
||||
"pwgan":
|
||||
"paddlespeech.t2s.models.parallel_wavegan:PWGGenerator",
|
||||
"pwgan_inference":
|
||||
"paddlespeech.t2s.models.parallel_wavegan:PWGInference",
|
||||
}
|
||||
|
||||
|
||||
# 设置预训练模型的路径和其他变量
|
||||
# am
|
||||
|
||||
|
||||
class VoiceCloneTDNN():
|
||||
def __init__(self):
|
||||
|
||||
self.am = "fastspeech2_aishell3"
|
||||
self.am_config = "source/model/fastspeech2_aishell3_ckpt_vc2_1.2.0/default.yaml"
|
||||
self.am_ckpt = "source/model/fastspeech2_aishell3_ckpt_vc2_1.2.0/snapshot_iter_96400.pdz"
|
||||
self.am_stat = "source/model/fastspeech2_aishell3_ckpt_vc2_1.2.0/speech_stats.npy"
|
||||
self.phones_dict = "source/model/fastspeech2_aishell3_ckpt_vc2_1.2.0/phone_id_map.txt"
|
||||
# voc
|
||||
self.voc = "pwgan_aishell3"
|
||||
self.voc_config = "source/model/pwg_aishell3_ckpt_0.5/default.yaml"
|
||||
self.voc_ckpt = "source/model/pwg_aishell3_ckpt_0.5/snapshot_iter_1000000.pdz"
|
||||
self.voc_stat = "source/model/pwg_aishell3_ckpt_0.5/feats_stats.npy"
|
||||
|
||||
with open(self.am_config) as f:
|
||||
self.am_config = CfgNode(yaml.safe_load(f))
|
||||
with open(self.voc_config) as f:
|
||||
self.voc_config = CfgNode(yaml.safe_load(f))
|
||||
self.vec_executor = VectorExecutor()
|
||||
|
||||
|
||||
with open(self.phones_dict, "r") as f:
|
||||
phn_id = [line.strip().split() for line in f.readlines()]
|
||||
vocab_size = len(phn_id)
|
||||
|
||||
self.frontend = Frontend(phone_vocab_path=self.phones_dict)
|
||||
|
||||
# am
|
||||
am_name = "fastspeech2"
|
||||
am_class = dynamic_import(am_name, model_alias)
|
||||
print(self.am_config.n_mels)
|
||||
self.am = am_class(
|
||||
idim=vocab_size, odim=self.am_config.n_mels, spk_num=None, **self.am_config["model"])
|
||||
self.am_inference_class = dynamic_import(am_name + '_inference', model_alias)
|
||||
self.am.set_state_dict(paddle.load(self.am_ckpt)["main_params"])
|
||||
self.am.eval()
|
||||
|
||||
am_mu, am_std = np.load(self.am_stat)
|
||||
am_mu = paddle.to_tensor(am_mu)
|
||||
am_std = paddle.to_tensor(am_std)
|
||||
self.am_normalizer = ZScore(am_mu, am_std)
|
||||
self.am_inference = self.am_inference_class(self.am_normalizer, self.am)
|
||||
self.am_inference.eval()
|
||||
|
||||
# voc
|
||||
voc_name = "pwgan"
|
||||
voc_class = dynamic_import(voc_name, model_alias)
|
||||
voc_inference_class = dynamic_import(voc_name + '_inference', model_alias)
|
||||
self.voc = voc_class(**self.voc_config["generator_params"])
|
||||
self.voc.set_state_dict(paddle.load(self.voc_ckpt)["generator_params"])
|
||||
self.voc.remove_weight_norm()
|
||||
self.voc.eval()
|
||||
voc_mu, voc_std = np.load(self.voc_stat)
|
||||
voc_mu = paddle.to_tensor(voc_mu)
|
||||
voc_std = paddle.to_tensor(voc_std)
|
||||
voc_normalizer = ZScore(voc_mu, voc_std)
|
||||
self.voc_inference = voc_inference_class(voc_normalizer, self.voc)
|
||||
self.voc_inference.eval()
|
||||
|
||||
def vc(self, text, input_wav, out_wav):
|
||||
input_ids = self.frontend.get_input_ids(text, merge_sentences=True)
|
||||
phone_ids = input_ids["phone_ids"][0]
|
||||
spk_emb = self.vec_executor(audio_file=input_wav, force_yes=True)
|
||||
spk_emb = paddle.to_tensor(spk_emb)
|
||||
|
||||
with paddle.no_grad():
|
||||
wav = self.voc_inference(self.am_inference(phone_ids, spk_emb=spk_emb))
|
||||
sf.write(out_wav, wav.numpy(), samplerate=self.am_config.fs)
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
voiceclone =VoiceCloneTDNN()
|
||||
text = "测试一下你的合成效果"
|
||||
input_wav = os.path.realpath("source/wav/test/009901.wav")
|
||||
out_wav = os.path.realpath("source/wav/test/9901_clone.wav")
|
||||
voiceclone.vc(text, input_wav, out_wav)
|
@ -0,0 +1,88 @@
|
||||
import axios from 'axios'
|
||||
import {apiURL} from "./API.js"
|
||||
|
||||
// 上传音频-vc
|
||||
export async function vcUpload(params){
|
||||
const result = await axios.post(apiURL.VC_Upload, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 上传音频-sat
|
||||
export async function satUpload(params){
|
||||
const result = await axios.post(apiURL.SAT_Upload, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 上传音频-finetune
|
||||
export async function fineTuneUpload(params){
|
||||
const result = await axios.post(apiURL.FineTune_Upload, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 删除音频
|
||||
export async function vcDel(params){
|
||||
const result = await axios.post(apiURL.VC_Del, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 获取音频列表vc
|
||||
export async function vcList(){
|
||||
const result = await axios.get(apiURL.VC_List);
|
||||
return result
|
||||
}
|
||||
// 获取音频列表Sat
|
||||
export async function satList(){
|
||||
const result = await axios.get(apiURL.SAT_List);
|
||||
return result
|
||||
}
|
||||
|
||||
// 获取音频列表fineTune
|
||||
export async function fineTuneList(params){
|
||||
const result = await axios.post(apiURL.FineTune_List, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// fineTune 一键重置 获取新的文件夹
|
||||
export async function fineTuneNewDir(){
|
||||
const result = await axios.get(apiURL.FineTune_NewDir);
|
||||
return result
|
||||
}
|
||||
|
||||
// 获取音频数据
|
||||
export async function vcDownload(params){
|
||||
const result = await axios.post(apiURL.VC_Download, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 获取音频数据Base64
|
||||
export async function vcDownloadBase64(params){
|
||||
const result = await axios.post(apiURL.VC_Download_Base64, params);
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
// 克隆合成G2P
|
||||
export async function vcCloneG2P(params){
|
||||
const result = await axios.post(apiURL.VC_CloneG2p, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 克隆合成SAT
|
||||
export async function vcCloneSAT(params){
|
||||
const result = await axios.post(apiURL.VC_CloneSAT, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 克隆合成 - finetune 微调
|
||||
export async function vcCloneFineTune(params){
|
||||
const result = await axios.post(apiURL.VC_CloneFineTune, params);
|
||||
return result
|
||||
}
|
||||
|
||||
// 克隆合成 - finetune 合成
|
||||
export async function vcCloneFineTuneSyn(params){
|
||||
const result = await axios.post(apiURL.VC_CloneFineTuneSyn, params);
|
||||
return result
|
||||
}
|
||||
|
||||
|
Loading…
Reference in new issue