# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging from pathlib import Path import jsonlines import numpy as np import paddle import soundfile as sf import yaml from yacs.config import CfgNode from paddlespeech.t2s.datasets.data_table import DataTable from paddlespeech.t2s.models.fastspeech2 import FastSpeech2 from paddlespeech.t2s.models.fastspeech2 import FastSpeech2Inference from paddlespeech.t2s.models.parallel_wavegan import PWGGenerator from paddlespeech.t2s.models.parallel_wavegan import PWGInference from paddlespeech.t2s.modules.normalizer import ZScore def evaluate(args, fastspeech2_config, pwg_config): # dataloader has been too verbose logging.getLogger("DataLoader").disabled = True # construct dataset for evaluation with jsonlines.open(args.test_metadata, 'r') as reader: test_metadata = list(reader) fields = ["utt_id", "text"] spk_num = None if args.speaker_dict is not None: print("multiple speaker fastspeech2!") with open(args.speaker_dict, 'rt') as f: spk_id = [line.strip().split() for line in f.readlines()] spk_num = len(spk_id) fields += ["spk_id"] elif args.voice_cloning: print("voice cloning!") fields += ["spk_emb"] else: print("single speaker fastspeech2!") print("spk_num:", spk_num) test_dataset = DataTable(data=test_metadata, fields=fields) odim = fastspeech2_config.n_mels with open(args.phones_dict, "r") as f: phn_id = [line.strip().split() for line in f.readlines()] vocab_size = len(phn_id) print("vocab_size:", vocab_size) model = FastSpeech2( idim=vocab_size, odim=odim, spk_num=spk_num, **fastspeech2_config["model"]) model.set_state_dict( paddle.load(args.fastspeech2_checkpoint)["main_params"]) model.eval() vocoder = PWGGenerator(**pwg_config["generator_params"]) vocoder.set_state_dict(paddle.load(args.pwg_checkpoint)["generator_params"]) vocoder.remove_weight_norm() vocoder.eval() print("model done!") stat = np.load(args.fastspeech2_stat) mu, std = stat mu = paddle.to_tensor(mu) std = paddle.to_tensor(std) fastspeech2_normalizer = ZScore(mu, std) stat = np.load(args.pwg_stat) mu, std = stat mu = paddle.to_tensor(mu) std = paddle.to_tensor(std) pwg_normalizer = ZScore(mu, std) fastspeech2_inference = FastSpeech2Inference(fastspeech2_normalizer, model) pwg_inference = PWGInference(pwg_normalizer, vocoder) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) for datum in test_dataset: utt_id = datum["utt_id"] text = paddle.to_tensor(datum["text"]) spk_emb = None spk_id = None if args.voice_cloning and "spk_emb" in datum: spk_emb = paddle.to_tensor(np.load(datum["spk_emb"])) elif "spk_id" in datum: spk_id = paddle.to_tensor(datum["spk_id"]) with paddle.no_grad(): wav = pwg_inference( fastspeech2_inference(text, spk_id=spk_id, spk_emb=spk_emb)) sf.write( str(output_dir / (utt_id + ".wav")), wav.numpy(), samplerate=fastspeech2_config.fs) print(f"{utt_id} done!") def main(): # parse args and config and redirect to train_sp parser = argparse.ArgumentParser( description="Synthesize with fastspeech2 & parallel wavegan.") parser.add_argument( "--fastspeech2-config", type=str, help="fastspeech2 config file.") parser.add_argument( "--fastspeech2-checkpoint", type=str, help="fastspeech2 checkpoint to load.") parser.add_argument( "--fastspeech2-stat", type=str, help="mean and standard deviation used to normalize spectrogram when training fastspeech2." ) parser.add_argument( "--pwg-config", type=str, help="parallel wavegan config file.") parser.add_argument( "--pwg-checkpoint", type=str, help="parallel wavegan generator parameters to load.") parser.add_argument( "--pwg-stat", type=str, help="mean and standard deviation used to normalize spectrogram when training parallel wavegan." ) parser.add_argument( "--phones-dict", type=str, default=None, help="phone vocabulary file.") parser.add_argument( "--speaker-dict", type=str, default=None, help="speaker id map file for multiple speaker model.") def str2bool(str): return True if str.lower() == 'true' else False parser.add_argument( "--voice-cloning", type=str2bool, default=False, help="whether training voice cloning model.") parser.add_argument("--test-metadata", type=str, help="test metadata.") parser.add_argument("--output-dir", type=str, help="output dir.") parser.add_argument( "--ngpu", type=int, default=1, help="if ngpu == 0, use cpu.") parser.add_argument("--verbose", type=int, default=1, help="verbose.") args = parser.parse_args() if args.ngpu == 0: paddle.set_device("cpu") elif args.ngpu > 0: paddle.set_device("gpu") else: print("ngpu should >= 0 !") with open(args.fastspeech2_config) as f: fastspeech2_config = CfgNode(yaml.safe_load(f)) with open(args.pwg_config) as f: pwg_config = CfgNode(yaml.safe_load(f)) print("========Args========") print(yaml.safe_dump(vars(args))) print("========Config========") print(fastspeech2_config) print(pwg_config) evaluate(args, fastspeech2_config, pwg_config) if __name__ == "__main__": main()