@ -33,13 +33,18 @@ logger = Log(__name__).getlog()
_MODELS = [ " large " ]
SAMPLE_RATE = 16000
N_FFT = 400
N_MELS = 80
HOP_LENGTH = 160
CHUNK_LENGTH = 30
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE # 480000: number of samples in a chunk
N_FRAMES = utils . exact_div (
N_SAMPLES , HOP_LENGTH ) # 3000: number of frames in a mel spectrogram input
N_SAMPLES_PER_TOKEN = HOP_LENGTH * 2 # the initial convolutions has stride 2
FRAMES_PER_SECOND = utils . exact_div ( SAMPLE_RATE ,
HOP_LENGTH ) # 10ms per audio frame
TOKENS_PER_SECOND = utils . exact_div ( SAMPLE_RATE ,
N_SAMPLES_PER_TOKEN ) # 20ms per audio token
@dataclass
class ModelDimensions :
@ -378,7 +383,9 @@ def detect_language(
"""
if tokenizer is None :
tokenizer = get_tokenizer (
model . is_multilingual , resource_path = resource_path )
multilingual = model . is_multilingual ,
resource_path = resource_path ,
num_languages = model . num_languages )
if tokenizer . language is None or tokenizer . language_token not in tokenizer . sot_sequence :
raise ValueError (
" This model doesn ' t have language tokens so it can ' t perform lang id "
@ -400,6 +407,7 @@ def detect_language(
# collect detected languages; suppress all non-language tokens
mask = paddle . ones ( paddle . to_tensor ( logits . shape [ - 1 ] ) , dtype = bool )
mask [ list ( tokenizer . all_language_tokens ) ] = False
logits . contiguous ( )
logits [ : , mask ] = - np . inf
language_tokens = paddle . argmax ( logits , axis = - 1 )
language_token_probs = F . softmax ( logits , axis = - 1 )
@ -428,6 +436,13 @@ def transcribe(
logprob_threshold : Optional [ float ] = - 1.0 ,
no_speech_threshold : Optional [ float ] = 0.6 ,
condition_on_previous_text : bool = True ,
initial_prompt : Optional [ str ] = None ,
carry_initial_prompt : bool = False ,
word_timestamps : bool = False ,
prepend_punctuations : str = " \" ' “¿([ { - " ,
append_punctuations : str = " \" ' .。,, !! ?? ::”)]}、 " ,
clip_timestamps : Union [ str , List [ float ] ] = " 0 " ,
hallucination_silence_threshold : Optional [ float ] = None ,
* * decode_options , ) :
"""
Transcribe an audio file using Whisper
@ -476,8 +491,9 @@ def transcribe(
if dtype == np . float32 :
decode_options [ " fp16 " ] = False
if decode_options . get ( " language " ) == ' None ' or decode_options . get (
" language " , None ) is None :
content_frames = mel . shape [ - 1 ] - N_FRAMES
content_duration = float ( content_frames * HOP_LENGTH / SAMPLE_RATE )
if decode_options . get ( " language " , None ) in { None , " None " } :
if not model . is_multilingual :
decode_options [ " language " ] = " en "
else :
@ -485,25 +501,47 @@ def transcribe(
print (
" Detecting language using up to the first 30 seconds. Use `--language` to specify the language "
)
segment = pad_or_trim ( mel , N_FRAMES )
_ , probs = model . detect_language ( segment , resource_path )
mel_segment = pad_or_trim ( mel ,
N_FRAMES ) . to ( model . device ) . astype ( dtype )
_ , probs = model . detect_language ( mel_segment , resource_path )
decode_options [ " language " ] = max ( probs , key = probs . get )
if verbose is not None :
print (
f " Detected language: { LANGUAGES [ decode_options [ ' language ' ] ] . title ( ) } "
)
language = decode_options [ " language " ]
task = decode_options . get ( " task " , " transcribe " )
language : str = decode_options [ " language " ]
task : str = decode_options . get ( " task " , " transcribe " )
tokenizer = get_tokenizer (
m odel. is_multilingual ,
m ultilingual= m odel. is_multilingual ,
resource_path = resource_path ,
num_languages = model . num_languages ,
language = language ,
task = task )
task = task , )
if isinstance ( clip_timestamps , str ) :
clip_timestamps = [
float ( ts )
for ts in ( clip_timestamps . split ( " , " ) if clip_timestamps else [ ] )
]
seek_points : List [
int ] = [ round ( ts * FRAMES_PER_SECOND ) for ts in clip_timestamps ]
if len ( seek_points ) == 0 :
seek_points . append ( 0 )
if len ( seek_points ) % 2 == 1 :
seek_points . append ( content_frames )
seek_clips : List [ Tuple [ int , int ] ] = list (
zip ( seek_points [ : : 2 ] , seek_points [ 1 : : 2 ] ) )
punctuation = " \" ' “¿([ { - \" ' .。,, !! ?? ::”)]}、 "
if word_timestamps and task == " translate " :
warnings . warn (
" Word-level timestamps on translations may not be reliable. " )
def decode_with_fallback ( segment : paddle . Tensor ) - > DecodingResult :
temperatures = [ temperature ] if isinstance ( temperature , (
int , float ) ) else temperature
temperatures = ( [temperature ] if isinstance ( temperature , ( int , float ) )
else temperature )
decode_result = None
for t in temperatures :
@ -517,20 +555,29 @@ def transcribe(
kwargs . pop ( " best_of " , None )
options = DecodingOptions ( * * kwargs , temperature = t )
decode_result = model . decode ( segment , options , resource_path )
needs_fallback = False
if compression_ratio_threshold is not None and decode_result . compression_ratio > compression_ratio_threshold :
if ( compression_ratio_threshold is not None and
decode_result . compression_ratio >
compression_ratio_threshold ) :
needs_fallback = True # too repetitive
if logprob_threshold is not None and decode_result . avg_logprob < logprob_threshold :
if ( logprob_threshold is not None and
decode_result . avg_logprob < logprob_threshold ) :
needs_fallback = True # average log probability is too low
if ( no_speech_threshold is not None and
decode_result . no_speech_prob > no_speech_threshold and
logprob_threshold is not None and
decode_result . avg_logprob < logprob_threshold ) :
needs_fallback = False # silence
if not needs_fallback :
break
return decode_result
seek = 0
clip_idx = 0
seek = seek_clips [ clip_idx ] [ 0 ]
input_stride = utils . exact_div (
N_FRAMES , model . dims . n_audio_ctx ) # mel frames per output token: 2
time_precision = ( input_stride * HOP_LENGTH /
@ -539,127 +586,210 @@ def transcribe(
all_segments = [ ]
prompt_reset_since = 0
initial_prompt = decode_options . pop ( " initial_prompt " , None ) or [ ]
if initial_prompt :
initial_prompt = tokenizer . encode ( " " +
initial_prompt . strip ( ) ) . input_ids
all_tokens . extend ( initial_prompt )
remaining_prompt_length = model . dims . n_text_ctx / / 2 - 1
if initial_prompt is not None :
initial_prompt_tokens = tokenizer . encode ( " " + initial_prompt . strip ( ) )
all_tokens . extend ( initial_prompt_tokens )
remaining_prompt_length - = len ( initial_prompt_tokens )
else :
initial_prompt_tokens = [ ]
def add_segment ( * ,
def new _segment( * ,
start : float ,
end : float ,
t ext_t okens: paddle . Tensor ,
t okens: paddle . Tensor ,
result : DecodingResult ) :
text = tokenizer . decode (
[ token for token in text_tokens if token < tokenizer . eot ] )
if len ( text . strip ( ) ) == 0 : # skip empty text output
return
all_segments . append ( {
" id " : len ( all_segments ) ,
tokens = tokens . tolist ( )
text_tokens = [ token for token in tokens if token < tokenizer . eot ]
return {
" seek " : seek ,
" start " : start ,
" end " : end ,
" text " : t ext,
" tokens " : result. tokens,
" text " : tokenizer . decode ( text_tokens ) ,
" tokens " : tokens,
" temperature " : result . temperature ,
" avg_logprob " : result . avg_logprob ,
" compression_ratio " : result . compression_ratio ,
" no_speech_prob " : result . no_speech_prob ,
} )
if verbose :
print (
f " [ { utils . format_timestamp ( start ) } --> { utils . format_timestamp ( end ) } ] { text } "
)
# show the progress bar when verbose is False (otherwise the transcribed text will be printed)
num_frames = mel . shape [ - 1 ]
previous_seek_value = seek
}
# show the progress bar when verbose is False (if True, transcribed text will be printed)
with tqdm . tqdm (
total = num_frames, unit = ' frames ' ,
total = content_frames , unit = " frames " ,
disable = verbose is not False ) as pbar :
while seek < num_frames :
timestamp_offset = float ( seek * HOP_LENGTH / SAMPLE_RATE )
segment = pad_or_trim ( mel [ : , seek : ] , N_FRAMES )
segment_duration = segment . shape [ - 1 ] * HOP_LENGTH / SAMPLE_RATE
decode_options [ " prompt " ] = all_tokens [ prompt_reset_since : ]
result : DecodingResult = decode_with_fallback ( segment )
last_speech_timestamp = 0.0
# NOTE: This loop is obscurely flattened to make the diff readable.
# A later commit should turn this into a simpler nested loop.
# for seek_clip_start, seek_clip_end in seek_clips:
# while seek < seek_clip_end
while clip_idx < len ( seek_clips ) :
seek_clip_start , seek_clip_end = seek_clips [ clip_idx ]
if seek < seek_clip_start :
seek = seek_clip_start
if seek > = seek_clip_end :
clip_idx + = 1
if clip_idx < len ( seek_clips ) :
seek = seek_clips [ clip_idx ] [ 0 ]
continue
time_offset = float ( seek * HOP_LENGTH / SAMPLE_RATE )
window_end_time = float (
( seek + N_FRAMES ) * HOP_LENGTH / SAMPLE_RATE )
segment_size = min ( N_FRAMES , content_frames - seek ,
seek_clip_end - seek )
mel_segment = mel [ : , seek : seek + segment_size ]
segment_duration = segment_size * HOP_LENGTH / SAMPLE_RATE
mel_segment = pad_or_trim ( mel_segment ,
N_FRAMES ) . to ( model . device ) . astype ( dtype )
if carry_initial_prompt :
nignored = max ( len ( initial_prompt_tokens ) , prompt_reset_since )
remaining_prompt = all_tokens [ nignored : ] [
- remaining_prompt_length : ]
decode_options [
" prompt " ] = initial_prompt_tokens + remaining_prompt
else :
decode_options [ " prompt " ] = all_tokens [ prompt_reset_since : ]
result : DecodingResult = decode_with_fallback ( mel_segment )
tokens = paddle . to_tensor ( result . tokens )
if no_speech_threshold is not None :
# no voice activity check
should_skip = result . no_speech_prob > no_speech_threshold
if logprob_threshold is not None and result . avg_logprob > logprob_threshold :
if ( logprob_threshold is not None and
result . avg_logprob > logprob_threshold ) :
# don't skip if the logprob is high enough, despite the no_speech_prob
should_skip = False
if should_skip :
seek + = segment . shape [
- 1 ] # fast-forward to the next segment boundary
seek + = segment_size # fast-forward to the next segment boundary
continue
previous_seek = seek
current_segments = [ ]
# anomalous words are very long/short/improbable
def word_anomaly_score ( word : dict ) - > float :
probability = word . get ( " probability " , 0.0 )
duration = word [ " end " ] - word [ " start " ]
score = 0.0
if probability < 0.15 :
score + = 1.0
if duration < 0.133 :
score + = ( 0.133 - duration ) * 15
if duration > 2.0 :
score + = duration - 2.0
return score
def is_segment_anomaly ( segment : Optional [ dict ] ) - > bool :
if segment is None or not segment [ " words " ] :
return False
words = [
w for w in segment [ " words " ] if w [ " word " ] not in punctuation
]
words = words [ : 8 ]
score = sum ( word_anomaly_score ( w ) for w in words )
return score > = 3 or score + 0.01 > = len ( words )
def next_words_segment ( segments : List [ dict ] ) - > Optional [ dict ] :
return next ( ( s for s in segments if s [ " words " ] ) , None )
timestamp_tokens : paddle . Tensor = tokens . greater_equal (
paddle . to_tensor ( tokenizer . timestamp_begin ) )
single_timestamp_ending = timestamp_tokens [
- 2 : ] . tolist ( ) == [ False , True ]
consecutive = paddle . where ( timestamp_tokens [ : - 1 ] & timestamp_tokens [
1 : ] ) [ 0 ]
if len (
consecutive
) > 0 : # if the output contains two consecutive timestamp tokens
if consecutive . numel ( ) != 0 :
consecutive = paddle . add ( consecutive , paddle . to_tensor ( 1 ) )
if len ( consecutive ) > 0 :
# if the output contains two consecutive timestamp tokens
slices = consecutive . tolist ( )
if single_timestamp_ending :
slices . append ( len ( tokens ) )
last_slice = 0
for current_slice in consecutive :
for current_slice in slices :
sliced_tokens = tokens [ last_slice : current_slice ]
start_timestamp_position = (
start_timestamp_pos = (
sliced_tokens [ 0 ] . item ( ) - tokenizer . timestamp_begin )
end_timestamp_position = (
end_timestamp_pos = (
sliced_tokens [ - 1 ] . item ( ) - tokenizer . timestamp_begin )
add_segment (
start = timestamp_offset + start_timestamp_position *
time_precision ,
end = timestamp_offset + end_timestamp_position *
time_precision ,
text_tokens = sliced_tokens [ 1 : - 1 ] ,
result = result , )
current_segments . append (
new_segment (
start = time_offset + start_timestamp_pos *
time_precision ,
end = time_offset + end_timestamp_pos *
time_precision ,
tokens = sliced_tokens ,
result = result , ) )
last_slice = current_slice
last_timestamp_position = (
tokens [ last_slice - 1 ] . item ( ) - tokenizer . timestamp_begin )
seek + = last_timestamp_position * input_stride
all_tokens . extend ( tokens [ : last_slice + 1 ] . tolist ( ) )
if single_timestamp_ending :
# single timestamp at the end means no speech after the last timestamp.
seek + = segment_size
else :
# otherwise, ignore the unfinished segment and seek to the last timestamp
last_timestamp_pos = ( tokens [ last_slice - 1 ] . item ( ) -
tokenizer . timestamp_begin )
seek + = last_timestamp_pos * input_stride
else :
duration = segment_duration
timestamps = tokens [ timestamp_tokens . nonzero ( ) . flatten ( ) ]
if len ( timestamps ) > 0 and timestamps [
- 1 ] . item ( ) != tokenizer . timestamp_begin :
if ( len ( timestamps ) > 0 and
timestamps [ - 1 ] . item ( ) != tokenizer . timestamp_begin ) :
# no consecutive timestamps but it has a timestamp; use the last one.
# single timestamp at the end means no speech after the last timestamp.
last_timestamp_position = timestamps [
- 1 ] . item ( ) - tokenizer . timestamp_begin
duration = last_timestamp_position * time_precision
last_timestamp_pos = (
timestamps [ - 1 ] . item ( ) - tokenizer . timestamp_begin )
duration = last_timestamp_pos * time_precision
current_segments . append (
new_segment (
start = time_offset ,
end = time_offset + duration ,
tokens = tokens ,
result = result , ) )
seek + = segment_size
add_segment (
start = timestamp_offset ,
end = timestamp_offset + duration ,
text_tokens = tokens ,
result = result , )
seek + = segment . shape [ - 1 ]
all_tokens . extend ( tokens . tolist ( ) )
if verbose :
for segment in current_segments :
start , end , text = segment [ " start " ] , segment [
" end " ] , segment [ " text " ]
line = f " [ { utils . format_timestamp ( start ) } --> { utils . format_timestamp ( end ) } ] { text } "
print ( line )
# if a segment is instantaneous or does not contain text, clear it
for i , segment in enumerate ( current_segments ) :
if segment [ " start " ] == segment [ " end " ] or segment [
" text " ] . strip ( ) == " " :
segment [ " text " ] = " "
segment [ " tokens " ] = [ ]
segment [ " words " ] = [ ]
all_segments . extend (
[ {
" id " : i ,
* *
segment
}
for i , segment in enumerate (
current_segments , start = len ( all_segments ) ) ] )
all_tokens . extend ( [
token
for segment in current_segments for token in segment [ " tokens " ]
] )
if not condition_on_previous_text or result . temperature > 0.5 :
# do not feed the prompt tokens if a high temperature was used
prompt_reset_since = len ( all_tokens )
# update progress bar
pbar . update ( min ( num_frames , seek ) - previous_seek_value )
previous_seek_value = seek
pbar . update ( min ( content_frames , seek ) - previous_seek )
return dict (
text = tokenizer . decode ( all_tokens [ len ( initial_prompt ) : ] ) ,
text = tokenizer . decode ( all_tokens [ len ( initial_prompt _tokens ) : ] ) ,
segments = all_segments ,
language = language )
language = language , )
class SequenceRanker :
@ -776,11 +906,11 @@ class GreedyDecoder(TokenDecoder):
next_tokens . shape [ 0 ] * next_tokens . shape [ 1 ] ,
] )
logprobs = F . log_softmax ( logits , axis = - 1 , dtype = paddle . float32 )
logprobs = F . log_softmax ( logits , axis = - 1 , dtype = " float32 " )
current_logprobs = logprobs [ paddle . arange ( logprobs . shape [ 0 ] ) ,
next_tokens ]
sum_logprobs + = current_logprobs * paddle . to_tensor (
( tokens [ : , - 1 ] != self . eot ) , dtype = paddle . float32 )
( tokens [ : , - 1 ] != self . eot ) , dtype = " float32 " )
next_tokens [ tokens [ : , - 1 ] == self . eot ] = self . eot
tokens = paddle . concat ( [ tokens , next_tokens [ : , None ] ] , axis = - 1 )
@ -928,8 +1058,9 @@ class SuppressBlank(LogitFilter):
def apply ( self , logits : paddle . Tensor , tokens : paddle . Tensor ) :
if tokens . shape [ 1 ] == self . sample_begin :
logits [ : , self . tokenizer . encode ( " " ) . input_ids +
[ self . tokenizer . eot ] ] = - np . inf
logits . contiguous ( )
logits [ : , self . tokenizer . encode ( " " ) + [ self . tokenizer . eot
] ] = - np . inf
class SuppressTokens ( LogitFilter ) :
@ -937,6 +1068,7 @@ class SuppressTokens(LogitFilter):
self . suppress_tokens = list ( suppress_tokens )
def apply ( self , logits : paddle . Tensor , tokens : paddle . Tensor ) :
logits . contiguous ( )
logits [ : , self . suppress_tokens ] = - np . inf
@ -952,6 +1084,7 @@ class ApplyTimestampRules(LogitFilter):
def apply ( self , logits : paddle . Tensor , tokens : paddle . Tensor ) :
# suppress <|notimestamps|> which is handled by without_timestamps
if self . tokenizer . no_timestamps is not None :
logits . contiguous ( )
logits [ : , self . tokenizer . no_timestamps ] = - np . inf
# timestamps have to appear in pairs, except directly before EOT; mask logits accordingly
@ -972,6 +1105,7 @@ class ApplyTimestampRules(LogitFilter):
if tokens . shape [
1 ] == self . sample_begin and self . max_initial_timestamp_index is not None :
last_allowed = self . tokenizer . timestamp_begin + self . max_initial_timestamp_index
logits . contiguous ( )
logits [ : , last_allowed + 1 : ] = - np . inf
# if sum of probability over timestamps is above any other token, sample timestamp
@ -1005,15 +1139,16 @@ class DecodingTask:
language = options . language or " en "
tokenizer = get_tokenizer (
m odel. is_multilingual ,
m ultilingual= m odel. is_multilingual ,
resource_path = resource_path ,
language = language ,
task = options . task )
task = options . task ,
num_languages = model . num_languages )
self . tokenizer : Tokenizer = tokenizer
self . options : DecodingOptions = self . _verify_options ( options )
self . resource_path : str = resource_path
self . beam_size : int = options . beam_size or options . best_of or 1
self . n_group : int = options . beam_size or options . best_of or 1
self . n_ctx : int = model . dims . n_text_ctx
self . sample_len : int = options . sample_len or model . dims . n_text_ctx / / 2
@ -1158,10 +1293,10 @@ class DecodingTask:
sum_logprobs : paddle . Tensor = paddle . zeros (
paddle . to_tensor ( n_batch ) , dtype = paddle . float32 )
no_speech_probs = [ np . nan ] * n_batch
try :
for i in range ( self . sample_len ) :
logits = self . inference . logits ( tokens , audio_features )
logits . contiguous ( )
if i == 0 and self . tokenizer . no_speech is not None : # save no_speech_probs
probs_at_sot = F . softmax (
@ -1197,17 +1332,8 @@ class DecodingTask:
audio_features : paddle . Tensor = self . _get_audio_features (
mel ) # encoder forward pass
tokens : paddle . Tensor
if batch_size > 1 :
for i in range ( batch_size ) :
tokens = paddle . concat (
x = [
paddle . to_tensor ( [ self . initial_tokens ] ) ,
paddle . to_tensor ( [ self . initial_tokens ] )
] ,
axis = 0 )
elif batch_size == 1 :
tokens = paddle . to_tensor ( [ self . initial_tokens ] )
tokens : Tensor = paddle . to_tensor ( [ self . initial_tokens ] ) . repeat (
batch_size , 1 )
# detect language if requested, overwriting the language token
languages , language_probs = self . _detect_language (
@ -1224,30 +1350,26 @@ class DecodingTask:
language_probs )
]
# repeat the audio & text tensors by the group size, for beam search or best-of-n sampling
audio_features = paddle . repeat_interleave (
audio_features , self . beam_size , axis = 0 )
tokens = paddle . repeat_interleave ( tokens , self . beam_size , axis = 0 )
# repeat text tensors by the group size, for beam search or best-of-n sampling
tokens = tokens . repeat_interleave ( self . n_group , axis = 0 )
# call the main sampling loop
tokens , sum_logprobs , no_speech_probs = self . _main_loop ( audio_features ,
tokens )
# reshape the tensors to have (batch_size, beam_size ) as the first two dimensions
audio_features = audio_features [ : : self . beam_size ]
no_speech_probs = no_speech_probs [ : : self . beam_size ]
# reshape the tensors to have (batch_size, n_group ) as the first two dimensions
audio_features = audio_features [ : : self . n_group ]
no_speech_probs = no_speech_probs [ : : self . n_group ]
assert audio_features . shape [ 0 ] == len ( no_speech_probs ) == batch_size
tokens = tokens . reshape ( [ batch_size , self . beam_size , - 1 ] )
sum_logprobs = sum_logprobs . reshape ( [ batch_size , self . beam_size ] )
tokens = tokens . reshape ( [ batch_size , self . n_group , - 1 ] )
sum_logprobs = sum_logprobs . reshape ( [ batch_size , self . n_group ] )
# get the final candidates for each group, and slice between the first sampled token and EOT
tokens , sum_logprobs = self . decoder . finalize ( tokens , sum_logprobs )
tokens : List [ List [ paddle. Tensor] ] = [ [
tokens : List [ List [ Tensor] ] = [ [
t [ self . sample_begin : ( t == tokenizer . eot ) . nonzero ( ) [ 0 , 0 ] ] for t in s
] for s in tokens ]
# select the top-ranked sample in each group
selected = self . sequence_ranker . rank ( tokens , sum_logprobs )
tokens : List [ List [
@ -1256,11 +1378,12 @@ class DecodingTask:
sum_logprobs : List [
float ] = [ lp [ i ] for i , lp in zip ( selected , sum_logprobs ) ]
avg_logprobs : List [
float ] = [ lp / ( len ( t ) + 1 ) for t , lp in zip ( tokens , sum_logprobs ) ]
avg_logprobs : List [ float ] = [
lp / ( len ( t ) + 1 ) for t , lp in zip ( tokens , sum_logprobs )
]
fields = ( texts , languages , tokens , audio_features , avg_logprobs ,
no_speech_probs )
no_speech_probs , )
if len ( set ( map ( len , fields ) ) ) != 1 :
raise RuntimeError (
f " inconsistent result lengths: { list ( map ( len , fields ) ) } " )
@ -1294,7 +1417,7 @@ def decode(
model : Whisper
the Whisper model instance
mel : paddle . Tensor , shape = ( 80 , 3000 ) or ( * , 80 , 3000 )
mel : paddle . Tensor , shape = ( 80 , 3000 ) or ( * , 80 , 3000 ) or ( 128 , 3000 ) or ( * , 128 , 3000 )
A tensor containing the Mel spectrogram ( s )
options : DecodingOptions
@ -1350,7 +1473,11 @@ class Whisper(nn.Layer):
@property
def is_multilingual ( self ) :
return self . dims . n_vocab == 51865
return self . dims . n_vocab > = 51865
@property
def num_languages ( self ) :
return self . dims . n_vocab - 51765 - int ( self . is_multilingual )
def install_kv_cache_hooks ( self , cache : Optional [ dict ] = None ) :
"""
@ -1364,7 +1491,7 @@ class Whisper(nn.Layer):
cache : Dict [ nn . Layer , paddle . Tensor ]
A dictionary object mapping the key / value projection modules to its cache
hooks : List [ RemovableHandle ]
List of P yTorch RemovableHandle objects to stop the hooks to be called
List of P addle RemovableHandle objects to stop the hooks to be called
"""
cache = { * * cache } if cache is not None else { }
hooks = [ ]
@ -1431,11 +1558,11 @@ def hann_window(n_fft: int=N_FFT):
"""
return paddle . to_tensor (
[ 0.5 - 0.5 * np . cos ( 2 * np . pi * n / n_fft ) for n in range ( n_fft ) ] ,
dtype = paddle . float32 )
dtype = " float32 " )
@lru_cache ( maxsize = None )
def mel_filters ( resource_path : str , n_mels : int = N_MELS ) - > paddle . Tensor :
def mel_filters ( resource_path : str , n_mels : int ) - > paddle . Tensor :
"""
load the mel filterbank matrix for projecting STFT into a Mel spectrogram .
Allows decoupling librosa dependency ; saved using :
@ -1445,13 +1572,16 @@ def mel_filters(resource_path: str, n_mels: int=N_MELS) -> paddle.Tensor:
mel_80 = librosa . filters . mel ( sr = 16000 , n_fft = 400 , n_mels = 80 ) ,
)
"""
assert n_mels == 80 , f " Unsupported n_mels: { n_mels } "
with np . load ( os . path . join ( resource_path , " assets " , " mel_filters.npz " ) ) as f :
assert n_mels in { 80 , 128 } , f " Unsupported n_mels: { n_mels } "
filters_path = os . path . join ( resource_path , " assets " , " mel_filters.npz " )
with np . load ( filters_path , allow_pickle = False ) as f :
return paddle . to_tensor ( f [ f " mel_ { n_mels } " ] )
def log_mel_spectrogram ( audio : Union [ str , np . ndarray , paddle . Tensor ] ,
n_mels : int = N_MELS ,
n_mels : int = 80 ,
padding : int = 0 ,
resource_path : str = None ) :
"""
Compute the log - Mel spectrogram of
@ -1462,11 +1592,11 @@ def log_mel_spectrogram(audio: Union[str, np.ndarray, paddle.Tensor],
The path to audio or either a NumPy array or Tensor containing the audio waveform in 16 kHz
n_mels : int
The number of Mel - frequency filters , only 80 is supported
The number of Mel - frequency filters , only 80 and 128 is supported
Returns
- - - - - - -
paddle . Tensor , shape = ( 80 , n_frames )
paddle . Tensor , shape = ( n_mels , n_frames )
A Tensor that contains the Mel spectrogram
"""
if not paddle . is_tensor ( audio ) :
@ -1475,7 +1605,8 @@ def log_mel_spectrogram(audio: Union[str, np.ndarray, paddle.Tensor],
audio = audio [ : , 0 ]
logger . info ( f " audio shape: { audio . shape } " )
audio = paddle . to_tensor ( audio )
if padding > 0 :
audio = F . pad ( audio , ( 0 , padding ) , data_format = " NLC " )
window = hann_window ( N_FFT )
stft = paddle . signal . stft ( audio , N_FFT , HOP_LENGTH , window = window )