import inspect
import json
import os
import shutil
import numpy as np
import pytest
import test_utils
import ctranslate2
@pytest.fixture
def clear_transformers_cache():
"""Clears the Transformers model cache after each test when running in a CI."""
yield
clear_transformers_cache_in_ci()
def clear_transformers_cache_in_ci():
import transformers
if os.environ.get("CI") == "true":
shutil.rmtree(transformers.utils.default_cache_path)
_TRANSFORMERS_TRANSLATION_TESTS = [
(
"Helsinki-NLP/opus-mt-en-de",
"▁Hello ▁world ! ",
"",
"▁Hallo ▁Welt !",
dict(),
),
(
"Helsinki-NLP/opus-mt-en-roa",
">>ind<< ▁The ▁Prime ▁Minister ▁is ▁coming ▁back ▁tomorrow . ",
"",
"▁Per da na ▁Men teri ▁akan ▁kembali ▁besok .",
dict(),
),
(
"Helsinki-NLP/opus-mt-mul-en",
"▁Bon jo ur ▁le ▁mo nde ",
"",
"▁Welcome ▁to ▁the ▁World",
dict(),
),
(
"facebook/m2m100_418M",
"__en__ ▁Hello ▁world ! ",
"__de__",
"__de__ ▁Hallo ▁der ▁Welt !",
dict(),
),
(
"facebook/mbart-large-50-many-to-many-mmt",
"en_XX ▁Hello ▁world ! ",
"de_DE",
"de_DE ▁Hallo ▁Welt !",
dict(),
),
(
"facebook/mbart-large-en-ro",
"▁UN ▁Chief ▁Say s ▁There ▁Is ▁No ▁Militar y ▁Solution ▁in ▁Syria en_XX",
"ro_RO",
"▁Şe ful ▁ONU ▁de cla ră ▁că ▁nu ▁există ▁o ▁solu ţie ▁militar ă ▁în ▁Siria",
dict(),
),
(
"facebook/bart-base",
" UN ĠChief ĠSays ĠThere ĠIs ĠNo Ġin ĠSyria ",
"",
" UN ĠChief ĠSays ĠThere ĠIs ĠNo ĠWar Ġin ĠSyria",
dict(),
),
(
"google/pegasus-xsum",
"▁PG & E ▁stated ▁it ▁scheduled ▁the ▁blackout s ▁in ▁response ▁to ▁forecasts "
"▁for ▁high ▁winds ▁amid ▁dry ▁conditions . ▁The ▁aim ▁is ▁to ▁reduce ▁the "
"▁risk ▁of ▁wildfires . ▁Nearly ▁800 ▁thousand ▁customers ▁were ▁scheduled ▁to "
"▁be ▁affected ▁by ▁the ▁shutoff s ▁which ▁were ▁expected ▁to ▁last ▁through "
"▁at ▁least ▁midday ▁tomorrow . ",
"",
"▁California ' s ▁largest ▁electricity ▁provider ▁has ▁turned ▁off ▁power ▁to "
"▁hundreds ▁of ▁thousands ▁of ▁customers .",
dict(length_penalty=0.6),
),
(
"facebook/nllb-200-distilled-600M",
["▁Hello ▁world ! eng_Latn", " eng_Latn"],
["fra_Latn", "fra_Latn"],
["fra_Latn ▁Bon jour ▁le ▁monde ▁!", "fra_Latn"],
dict(),
),
(
"t5-small",
"▁translate ▁English ▁to ▁German : ▁The ▁house ▁is ▁wonderful . ",
"",
"▁Das ▁Haus ▁ist ▁wunderbar .",
dict(),
),
(
"ml6team/mt5-small-german-query-generation",
"▁Das ▁Lama ▁( L ama ▁glam a ) ▁ist ▁eine ▁Art ▁der ▁Kam ele . "
"▁Es ▁ist ▁in ▁den ▁süd amerikanische n ▁And en ▁ver breite t ▁und "
"▁eine ▁vom ▁Guan ako ▁ab sta mmende ▁ Haustier form . ",
"",
"▁Was ▁ist ▁Lama ▁glam a ?",
dict(),
),
]
@test_utils.only_on_linux
@pytest.mark.parametrize(
"model,source_tokens,target_tokens,expected_tokens,kwargs",
_TRANSFORMERS_TRANSLATION_TESTS,
ids=[args[0] for args in _TRANSFORMERS_TRANSLATION_TESTS],
)
def test_transformers_translation(
clear_transformers_cache,
tmp_dir,
model,
source_tokens,
target_tokens,
expected_tokens,
kwargs,
):
converter = ctranslate2.converters.TransformersConverter(model)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
if not isinstance(expected_tokens, list):
expected_tokens = [expected_tokens]
if not isinstance(source_tokens, list):
source_tokens = [source_tokens]
if target_tokens and not isinstance(target_tokens, list):
target_tokens = [target_tokens]
translator = ctranslate2.Translator(output_dir)
results = translator.translate_batch(
[line.split() for line in source_tokens],
[line.split() for line in target_tokens] if target_tokens else None,
**kwargs,
)
output_tokens = [" ".join(result.hypotheses[0]) for result in results]
assert output_tokens == expected_tokens
_TRANSFORMERS_GENERATION_TESTS = [
(
"bigcode/tiny_starcoder_py",
(
" def Ġprint _ one _ two _ three (): ĊĠĠĠ Ġprint (' one ') "
"ĊĠĠĠĠ ĊĠĠĠ Ġprint (' three ') "
),
26,
(
" def Ġprint _ one _ two _ three (): ĊĠĠĠ Ġprint (' one ') "
"ĊĠĠĠĠ ĊĠĠĠ Ġprint (' three ') "
" print (' two ')"
),
),
(
"Salesforce/codegen-350M-mono",
"def Ġhello _ name ( name ):",
25,
"def Ġhello _ name ( name ):"
' Ċ print ( f " Hello Ġ{ name } ") Ċ Ċ hello _ name (" John ")',
),
(
"gpt2",
"<|endoftext|>",
10,
"Ċ The Ġfirst Ġtime ĠI Ġsaw Ġthe Ġnew Ġversion Ġof",
),
(
"facebook/opt-350m",
"",
10,
"Ċ The Ġfollowing Ġis Ġa Ġlist Ġof Ġthe Ġmost Ġpopular",
),
(
"microsoft/DialoGPT-medium",
"Hello <|endoftext|>",
100,
"Hello <|endoftext|> Hello Ġ! Ġ: D",
),
(
"bigscience/bloom-560m",
"Hello , ĠI Ġam",
20,
"Hello , ĠI Ġam Ġa Ġnew bie Ġin Ġthe Ġworld Ġof Ġweb Ġdesign Ġand ĠI Ġam "
"Ġlooking Ġfor Ġa Ġweb Ġdeveloper",
),
]
@test_utils.only_on_linux
@pytest.mark.parametrize(
"model,start_tokens,max_length,expected_tokens",
_TRANSFORMERS_GENERATION_TESTS,
ids=[args[0] for args in _TRANSFORMERS_GENERATION_TESTS],
)
def test_transformers_generation(
clear_transformers_cache,
tmp_dir,
model,
start_tokens,
max_length,
expected_tokens,
):
converter = ctranslate2.converters.TransformersConverter(model)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
results = generator.generate_batch([start_tokens.split()], max_length=max_length)
output_tokens = " ".join(results[0].sequences[0])
assert output_tokens == expected_tokens
# Test empty inputs.
assert generator.generate_batch([]) == []
with pytest.raises(ValueError, match="start token"):
generator.generate_batch([[]])
@test_utils.only_on_linux
def test_transformers_dtype(clear_transformers_cache, tmp_dir):
converter = ctranslate2.converters.TransformersConverter("facebook/opt-350m")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
model_b = os.path.getsize(os.path.join(output_dir, "model.bin"))
model_mb = model_b / (1000**2)
assert model_mb < 700
@test_utils.only_on_linux
def test_transformers_marianmt_vocabulary(clear_transformers_cache, tmp_dir):
converter = ctranslate2.converters.TransformersConverter(
"Helsinki-NLP/opus-mt-en-de"
)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
vocabulary_path = os.path.join(output_dir, "shared_vocabulary.json")
with open(vocabulary_path, encoding="utf-8") as vocabulary_file:
vocab = json.load(vocabulary_file)
assert vocab[-1] != ""
@test_utils.only_on_linux
@pytest.mark.parametrize("beam_size", [1, 2])
def test_transformers_marianmt_disable_unk(
clear_transformers_cache, tmp_dir, beam_size
):
converter = ctranslate2.converters.TransformersConverter(
"Helsinki-NLP/opus-mt-en-roa"
)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
tokens = ">>ind<< ▁The ▁Prime ▁is ▁coming ▁back ▁tomorrow . ".split()
translator = ctranslate2.Translator(output_dir)
output = translator.translate_batch([tokens], beam_size=beam_size, disable_unk=True)
assert "" not in output[0].hypotheses[0]
@test_utils.only_on_linux
@test_utils.on_available_devices
@pytest.mark.parametrize(
"model_name",
[
"bert-base-uncased",
"distilbert-base-uncased",
"distilbert-base-cased-distilled-squad",
"typeform/distilbert-base-uncased-mnli",
],
)
def test_transformers_encoder(clear_transformers_cache, tmp_dir, device, model_name):
import torch
import transformers
text = ["Hello world!", "Hello, my dog is cute"]
model = transformers.AutoModel.from_pretrained(model_name)
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
inputs = tokenizer(text, return_tensors="pt", padding=True)
inputs.to(device)
model.to(device)
with torch.no_grad():
outputs = model(**inputs)
mask = inputs.attention_mask.unsqueeze(-1).cpu().numpy()
ref_last_hidden_state = outputs.last_hidden_state.cpu().numpy()
ref_pooler_output = (
outputs.pooler_output.cpu().numpy()
if hasattr(outputs, "pooler_output")
else None
)
converter = ctranslate2.converters.TransformersConverter(model_name)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
encoder = ctranslate2.Encoder(output_dir, device=device)
ids = [tokenizer(t).input_ids for t in text]
outputs = encoder.forward_batch(ids)
last_hidden_state = _to_numpy(outputs.last_hidden_state, device)
assert last_hidden_state.shape == ref_last_hidden_state.shape
last_hidden_state *= mask
ref_last_hidden_state *= mask
np.testing.assert_array_almost_equal(
last_hidden_state, ref_last_hidden_state, decimal=5
)
if ref_pooler_output is not None:
pooler_output = _to_numpy(outputs.pooler_output, device)
assert pooler_output.shape == ref_pooler_output.shape
np.testing.assert_array_almost_equal(
pooler_output, ref_pooler_output, decimal=5
)
def _to_numpy(storage, device):
import torch
return (
np.array(storage)
if device == "cpu"
else torch.as_tensor(storage, device=device).cpu().numpy()
)
@test_utils.only_on_linux
def test_transformers_gptbigcode(clear_transformers_cache, tmp_dir):
import transformers
_check_generator_logits(
tmp_dir,
"hf-internal-testing/tiny-random-GPTBigCodeForCausalLM",
transformers.GPTBigCodeForCausalLM,
transformers.AutoTokenizer,
"hello",
)
def _check_generator_logits(
tmp_dir, model_name, hf_model_class, hf_tokenizer_class, input_text
):
import torch
model = hf_model_class.from_pretrained(model_name)
tokenizer = hf_tokenizer_class.from_pretrained(model_name)
inputs = tokenizer(input_text, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs, labels=inputs["input_ids"])
ref_logits = outputs.logits.numpy()
converter = ctranslate2.converters.TransformersConverter(model_name)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
tokens = tokenizer.convert_ids_to_tokens(tokenizer.encode(input_text))
logits = generator.forward_batch([tokens])
logits = np.array(logits)
assert logits.shape == ref_logits.shape
np.testing.assert_array_almost_equal(logits, ref_logits)
class TestGeneration:
@classmethod
def teardown_class(cls):
clear_transformers_cache_in_ci()
@test_utils.only_on_linux
def test_transformers_lm_scoring(self, tmp_dir):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
tokens = "Ċ The Ġfirst Ġtime ĠI Ġsaw Ġthe Ġnew Ġversion Ġof".split()
output = generator.score_batch([tokens])[0]
assert output.tokens == tokens[1:]
assert len(output.log_probs) == len(output.tokens)
# Test empty inputs.
assert generator.score_batch([]) == []
output = generator.score_batch([[], tokens])[0]
assert not output.tokens
assert not output.log_probs
output = generator.score_batch([["<|endoftext|>"]])[0]
assert not output.tokens
assert not output.log_probs
@test_utils.only_on_linux
@test_utils.on_available_devices
@pytest.mark.parametrize("return_log_probs", [True, False])
@pytest.mark.parametrize("tensor_input", [True, False])
def test_transformers_lm_forward(
self, tmp_dir, device, return_log_probs, tensor_input
):
import torch
import transformers
model_name = "gpt2"
model = transformers.GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
converter = ctranslate2.converters.TransformersConverter(model_name)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir, device=device)
text = ["Hello world!"]
with torch.no_grad():
inputs = tokenizer(text, return_tensors="pt")
inputs.to(device)
model.to(device)
output = model(**inputs)
ref_output = output.logits
if return_log_probs:
ref_output = torch.nn.functional.log_softmax(ref_output, dim=-1)
ref_output = ref_output.cpu().numpy()
kwargs = dict(return_log_probs=return_log_probs)
if tensor_input:
inputs = tokenizer(text, return_length=True, return_tensors="pt")
inputs.to(device)
ids = inputs.input_ids.to(torch.int32)
lengths = inputs.length.to(torch.int32)
if device == "cpu":
ids = ids.numpy()
lengths = lengths.numpy()
ids = ctranslate2.StorageView.from_array(ids)
lengths = ctranslate2.StorageView.from_array(lengths)
with pytest.raises(ValueError, match="lengths"):
generator.forward_batch(ids, **kwargs)
output = generator.forward_batch(ids, lengths, **kwargs)
else:
ids = tokenizer(text).input_ids
output = generator.forward_batch(ids, **kwargs)
if device == "cpu":
output = np.array(output)
else:
output = torch.as_tensor(output, device=device).cpu().numpy()
assert output.shape == ref_output.shape
np.testing.assert_allclose(output, ref_output, rtol=1e-2)
@test_utils.only_on_linux
def test_transformers_generator_on_iterables(self, tmp_dir):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
start_tokens = ["<|endoftext|>"]
tokens = "Ċ The Ġfirst Ġtime ĠI Ġsaw Ġthe Ġnew Ġversion Ġof".split()
output = next(generator.generate_iterable(iter([start_tokens]), max_length=10))
assert output.sequences[0] == tokens
output = next(generator.score_iterable(iter([tokens])))
assert output.tokens == tokens[1:]
assert len(output.log_probs) == len(output.tokens)
# Test empty iterables.
with pytest.raises(StopIteration):
next(generator.score_iterable(iter([])))
with pytest.raises(StopIteration):
next(generator.generate_iterable(iter([])))
@test_utils.only_on_linux
def test_transformers_generator_suppress_sequences(self, tmp_dir):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
output = generator.generate_batch(
[["<|endoftext|>"]],
max_length=10,
suppress_sequences=[["Ġfirst", "Ġtime"]],
)
expected_tokens = "Ċ The Ġfirst Ġof Ġthe Ġthree Ġnew Ġseries Ġof Ġthe".split()
assert output[0].sequences[0] == expected_tokens
@test_utils.only_on_linux
@pytest.mark.parametrize("beam_size", [1, 2])
def test_transformers_generator_ignore_prompt(self, tmp_dir, beam_size):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
max_length = 20
tokens = "Ċ The Ġfirst Ġtime ĠI".split()
result_wo_prompt = generator.generate_batch(
[tokens],
beam_size=beam_size,
max_length=max_length - len(tokens),
return_scores=True,
include_prompt_in_result=False,
)[0]
result_w_prompt = generator.generate_batch(
[tokens],
beam_size=beam_size,
max_length=max_length - 1,
return_scores=True,
)[0]
assert len(result_w_prompt.sequences[0]) == max_length
assert tokens + result_wo_prompt.sequences[0] == result_w_prompt.sequences[0]
cum_score_wo_prompt = result_wo_prompt.scores[0] * (
len(result_wo_prompt.sequences[0])
)
cum_score_w_prompt = result_w_prompt.scores[0] * (
len(result_w_prompt.sequences[0]) - 1
)
assert cum_score_wo_prompt == pytest.approx(cum_score_w_prompt, abs=1e-3)
@test_utils.only_on_linux
@pytest.mark.parametrize("beam_size", [1, 2])
def test_transformers_generator_ignore_prompt_batch(self, tmp_dir, beam_size):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
new_tokens = 2
prompt = [
"Ċ The Ġfirst Ġtime ĠI".split(),
"Ċ The Ġfirst".split(),
]
results = generator.generate_batch(
prompt,
beam_size=beam_size,
min_length=new_tokens,
max_length=new_tokens,
include_prompt_in_result=False,
)
for tokens, result in zip(prompt, results):
assert len(result.sequences[0]) == new_tokens
@test_utils.only_on_linux
def test_transformers_generator_static_prompt(self, tmp_dir):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
max_length = 20
prompt = "Ċ The Ġfirst Ġtime ĠI".split()
expected_result = generator.generate_batch(
[prompt],
max_length=max_length,
include_prompt_in_result=False,
)[0]
result = generator.generate_batch(
[[expected_result.sequences[0][0]]],
max_length=max_length - 1,
static_prompt=prompt,
)[0]
assert result.sequences[0] == expected_result.sequences[0]
result = generator.generate_batch(
[expected_result.sequences[0][:2]],
max_length=max_length - 2,
static_prompt=prompt,
include_prompt_in_result=False,
)[0]
assert (
expected_result.sequences[0][:2] + result.sequences[0]
== expected_result.sequences[0]
)
batch_results = generator.generate_batch(
[[expected_result.sequences[0][0]], [expected_result.sequences[0][0]]],
max_length=max_length - 1,
static_prompt=prompt,
)
assert batch_results[0].sequences[0] == expected_result.sequences[0]
assert batch_results[1].sequences[0] == expected_result.sequences[0]
@test_utils.only_on_linux
@pytest.mark.parametrize("return_log_prob", [True, False])
def test_transformers_generator_token_streaming(self, tmp_dir, return_log_prob):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
max_length = 20
prompt = "Ċ The Ġfirst Ġtime ĠI".split()
expected_result = generator.generate_batch(
[prompt],
max_length=max_length,
return_scores=True,
include_prompt_in_result=False,
)[0]
step_results = generator.generate_tokens(
prompt, max_length=max_length, return_log_prob=return_log_prob
)
assert inspect.isgenerator(step_results)
tokens = []
ids = []
cum_log_probs = 0
for step_result in step_results:
assert isinstance(step_result, ctranslate2.GenerationStepResult)
tokens.append(step_result.token)
ids.append(step_result.token_id)
if return_log_prob:
cum_log_probs += step_result.log_prob
else:
assert step_result.log_prob is None
assert tokens == expected_result.sequences[0]
assert ids == expected_result.sequences_ids[0]
if return_log_prob:
assert cum_log_probs / len(ids) == pytest.approx(
expected_result.scores[0], abs=1e-5
)
@test_utils.only_on_linux
def test_transformers_generator_token_streaming_early_stop(self, tmp_dir):
converter = ctranslate2.converters.TransformersConverter("gpt2")
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
generator = ctranslate2.Generator(output_dir)
prompt = "Ċ The Ġfirst Ġtime ĠI".split()
results = generator.generate_tokens(prompt)
for result in results:
break
results.close()
class TestWhisper:
@classmethod
def teardown_class(cls):
clear_transformers_cache_in_ci()
@test_utils.only_on_linux
@test_utils.on_available_devices
@pytest.mark.parametrize(
"model_name,prompts,expected_transcriptions,expected_no_speech_probs",
[
(
"openai/whisper-tiny",
[
[
"<|startoftranscript|>",
"<|en|>",
"<|transcribe|>",
"<|notimestamps|>",
],
[
"<|startoftranscript|>",
"<|en|>",
"<|transcribe|>",
"<|notimestamps|>",
"ĠAnd",
"Ġthus",
"Ġmy",
],
],
[
" Mr. Quilter is the apostle of the middle classes and we are glad"
" to welcome his gospel.",
" And thus my fellow Americans ask not what your country can do for you,"
" ask what you can do for your country.",
],
[
pytest.approx(0.0022832120303064585, abs=1e-4),
pytest.approx(0.06885894387960434, abs=1e-3),
],
),
(
"openai/whisper-tiny",
[
["<|startoftranscript|>", "<|en|>", "<|transcribe|>"],
["<|startoftranscript|>", "<|en|>", "<|transcribe|>"],
],
[
" Mr. Quilter is the apostle of the middle classes and we are glad"
" to welcome his gospel.",
" And so, my fellow Americans, ask not what your country can do for you,"
" ask what you can do for your country.",
],
[
pytest.approx(0.0022832120303064585, abs=1e-4),
pytest.approx(0.06885894387960434, abs=1e-3),
],
),
(
"openai/whisper-tiny.en",
[["<|startoftranscript|>"], ["<|startoftranscript|>"]],
[
" Mr. Quilter is the apostle of the middle classes, and we are glad"
" to welcome his gospel.",
" And so, my fellow Americans ask not what your country can do for you"
" ask what you can do for your country.",
],
[
pytest.approx(0.02644546702504158, abs=1e-4),
pytest.approx(0.062380101531744, abs=1e-3),
],
),
],
)
def test_transformers_whisper(
self,
tmp_dir,
device,
model_name,
prompts,
expected_transcriptions,
expected_no_speech_probs,
):
import transformers
converter = ctranslate2.converters.TransformersConverter(model_name)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
audio_paths = [
os.path.join(test_utils.get_data_dir(), "audio", "mr_quilter.npy"),
os.path.join(test_utils.get_data_dir(), "audio", "jfk.npy"),
]
audio = list(map(np.load, audio_paths))
processor = transformers.WhisperProcessor.from_pretrained(model_name)
def _get_features(audio):
# Pad after computing the log-Mel spectrogram to match the openai/whisper behavior.
inputs = processor(audio, padding=False, sampling_rate=16000)
features = inputs.input_features[0]
features = np.pad(features, [(0, 0), (0, 3000 - features.shape[-1])])
return features
features = np.stack(list(map(_get_features, audio)))
features = ctranslate2.StorageView.from_array(features)
model = ctranslate2.models.Whisper(output_dir, device=device)
assert model.is_multilingual == (not model_name.endswith(".en"))
if model.is_multilingual:
for result in model.detect_language(features):
best_lang, best_prob = result[0]
assert best_lang == "<|en|>"
assert best_prob > 0.9
else:
with pytest.raises(RuntimeError, match="multilingual"):
model.detect_language(features)
results = model.generate(
features,
prompts,
beam_size=2,
num_hypotheses=2,
return_no_speech_prob=True,
)
timestamp_begin = (
processor.tokenizer.convert_tokens_to_ids("<|notimestamps|>") + 1
)
for prompt, result, expected_transcription, expected_no_speech_prob in zip(
prompts, results, expected_transcriptions, expected_no_speech_probs
):
assert len(result.sequences_ids) == 2
assert result.no_speech_prob == expected_no_speech_prob
for tokens in result.sequences_ids:
if "<|notimestamps|>" in prompt:
assert all(token < timestamp_begin for token in tokens)
else:
assert tokens[0] >= timestamp_begin
assert tokens[-1] >= timestamp_begin
assert tokens[-1] > tokens[0]
token_ids = list(
filter(lambda token: token < timestamp_begin, result.sequences_ids[0])
)
transcription = processor.decode(token_ids)
assert transcription == expected_transcription
@test_utils.only_on_linux
@test_utils.on_available_devices
@pytest.mark.parametrize(
"test_names", [["jfk"], ["jfk", "jfk"], ["mr_quilter", "jfk"]]
)
def test_transformers_whisper_align(self, tmp_dir, device, test_names):
import transformers
test_cases = []
audio = []
test_dir = os.path.join(test_utils.get_data_dir(), "audio")
for name in test_names:
audio_path = os.path.join(test_dir, "%s.npy" % name)
audio.append(np.load(audio_path))
test_case_path = os.path.join(test_dir, "%s_alignments.json" % name)
with open(test_case_path) as test_case_file:
test_cases.append(json.load(test_case_file))
model_name = "openai/whisper-tiny.en"
converter = ctranslate2.converters.TransformersConverter(model_name)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
processor = transformers.WhisperProcessor.from_pretrained(model_name)
inputs = processor(audio, return_tensors="np", sampling_rate=16000)
features = ctranslate2.StorageView.from_array(inputs.input_features)
model = ctranslate2.models.Whisper(output_dir, device=device)
results = model.align(
features,
[50257],
[test_case["text_tokens"] for test_case in test_cases],
[test_case["num_frames"] for test_case in test_cases],
)
for result, test_case in zip(results, test_cases):
assert np.sum(result.text_token_probs) == pytest.approx(
test_case["expected_text_token_probs_sum"], abs=1e-3
)
assert result.alignments == [
tuple(pair) for pair in test_case["expected_alignments"]
]
@test_utils.only_on_linux
@test_utils.on_available_devices
def test_transformers_whisper_encode(self, tmp_dir, device):
import transformers
model_name = "openai/whisper-tiny.en"
converter = ctranslate2.converters.TransformersConverter(model_name)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
audio_path = os.path.join(test_utils.get_data_dir(), "audio", "jfk.npy")
audio = np.load(audio_path)
processor = transformers.WhisperProcessor.from_pretrained(model_name)
inputs = processor(audio, sampling_rate=16000)
features = inputs.input_features[0]
features = np.expand_dims(features, 0)
features = ctranslate2.StorageView.from_array(features)
model = ctranslate2.models.Whisper(output_dir, device=device)
encoded = model.encode(features)
prompts = [["<|startoftranscript|>", "<|notimestamps|>"]]
result = model.generate(encoded, prompts)[0]
transcription = processor.decode(result.sequences_ids[0])
assert transcription == (
" And so my fellow Americans ask not what your country can do for you, "
"ask what you can do for your country."
)
@test_utils.only_on_linux
def test_transformers_whisper_partial_audio_context(self, tmp_dir):
import transformers
model_name = "openai/whisper-tiny"
converter = ctranslate2.converters.TransformersConverter(model_name)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
audio_path = os.path.join(test_utils.get_data_dir(), "audio", "jfk.npy")
audio = np.load(audio_path)
processor = transformers.WhisperProcessor.from_pretrained(model_name)
inputs = processor(
audio, padding=False, return_tensors="np", sampling_rate=16000
)
features = ctranslate2.StorageView.from_array(inputs.input_features)
model = ctranslate2.models.Whisper(output_dir)
encoder_output = model.encode(features)
assert encoder_output.shape == [1, features.shape[2] // 2, 384]
@test_utils.only_on_linux
def test_transformers_whisper_include_tokenizer_json(self, tmp_dir):
model_name = "openai/whisper-tiny"
converter = ctranslate2.converters.TransformersConverter(
model_name, copy_files=["tokenizer.json"]
)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
assert os.path.isfile(os.path.join(output_dir, "tokenizer.json"))
class TestWav2Vec2:
@classmethod
def teardown_class(cls):
clear_transformers_cache_in_ci()
@test_utils.only_on_linux
@test_utils.on_available_devices
@pytest.mark.parametrize(
"model_name,expected_transcription",
[
(
"facebook/wav2vec2-large-robust-ft-swbd-300h",
[
"MISTER QUILTER IS THE APOSSEL OF THE MIDDLE CLASSES AND"
" WE ARE GLAD TO WELCOME HIS GOSPEL",
],
),
],
)
def test_transformers_wav2vec2(
self,
tmp_dir,
device,
model_name,
expected_transcription,
):
import torch
import transformers
converter = ctranslate2.converters.TransformersConverter(
model_name, load_as_float16="int8"
)
output_dir = str(tmp_dir.join("ctranslate2_model"))
output_dir = converter.convert(output_dir)
w2v2_processor = transformers.Wav2Vec2Processor.from_pretrained(model_name)
w2v2_processor.save_pretrained(output_dir + "/wav2vec2_processor")
processor = transformers.AutoProcessor.from_pretrained(
output_dir + "/wav2vec2_processor"
)
device = "cuda" if os.environ.get("CUDA_VISIBLE_DEVICES") else "cpu"
cpu_threads = int(os.environ.get("OMP_NUM_THREADS", 0))
model = ctranslate2.models.Wav2Vec2(
output_dir,
device=device,
device_index=[0],
compute_type="int8",
intra_threads=cpu_threads,
inter_threads=1,
)
speech_array = np.load(
os.path.join(test_utils.get_data_dir(), "audio", "mr_quilter.npy")
)
input_values = processor(
speech_array,
padding=True,
return_tensors="pt",
sampling_rate=16000,
).input_values
hidden_states = np.ascontiguousarray(input_values.unsqueeze(0))
hidden_states = ctranslate2.StorageView.from_array(hidden_states)
to_cpu = model.device == "cuda" and len(model.device_index) > 1
output = model.encode(hidden_states, to_cpu=to_cpu)
if model.device == "cuda":
logits = torch.as_tensor(output, device=model.device)[0]
else:
logits = torch.as_tensor(
np.array(output), dtype=torch.float32, device=model.device
)[0]
predicted_ids = torch.argmax(logits, dim=-1)
transcription = processor.decode(predicted_ids, output_word_offsets=True)
transcription = transcription[0].replace(processor.tokenizer.unk_token, "")
assert transcription == expected_transcription[0]