import abc import argparse import gc import itertools import os from typing import List, Optional import numpy as np try: import huggingface_hub import torch import transformers except ImportError: pass from ctranslate2.converters import utils from ctranslate2.converters.converter import Converter from ctranslate2.specs import ( attention_spec, common_spec, model_spec, transformer_spec, wav2vec2_spec, whisper_spec, ) _SUPPORTED_ACTIVATIONS = { "gelu": common_spec.Activation.GELU, "gelu_fast": common_spec.Activation.GELUTanh, "gelu_new": common_spec.Activation.GELUTanh, "gelu_python": common_spec.Activation.GELU, "gelu_pytorch_tanh": common_spec.Activation.GELUTanh, "quick_gelu": common_spec.Activation.GELUSigmoid, "relu": common_spec.Activation.RELU, "silu": common_spec.Activation.SWISH, "swish": common_spec.Activation.SWISH, } _SUPPORTED_ROPE_SCALING = { "linear": attention_spec.RotaryScalingType.Linear, "su": attention_spec.RotaryScalingType.Su, "llama3": attention_spec.RotaryScalingType.Llama3, } _SUPPORTED_QUANTIZATION = { "gemm": common_spec.Quantization.AWQ_GEMM, "gemv": common_spec.Quantization.AWQ_GEMV, } _MODEL_LOADERS = {} def register_loader(config_name): """Registers a model loader for this configuration name.""" def decorator(cls): _MODEL_LOADERS[config_name] = cls() return cls return decorator class TransformersConverter(Converter): """Converts models from Hugging Face Transformers.""" def __init__( self, model_name_or_path: str, activation_scales: Optional[str] = None, copy_files: Optional[List[str]] = None, load_as_float16: bool = False, revision: Optional[str] = None, low_cpu_mem_usage: bool = False, trust_remote_code: bool = False, ): """Initializes the converter. Arguments: model_name_or_path: Name of the pretrained model to download, or path to the directory containing the pretrained model. activation_scales: Path to the pre-computed activation scales. Models may use them to rescale some weights to smooth the intermediate activations and improve the quantization accuracy. See https://github.com/mit-han-lab/smoothquant. copy_files: List of filenames to copy from the Hugging Face model to the converted model directory. load_as_float16: Load the model weights as float16. More precisely, the model will be loaded with ``from_pretrained(..., torch_dtype=torch.float16)``. revision: Revision of the model to download from the Hugging Face Hub. low_cpu_mem_usage: Enable the flag ``low_cpu_mem_usage`` when loading the model with ``from_pretrained``. trust_remote_code: Allow converting models using custom code. """ self._model_name_or_path = model_name_or_path self._activation_scales = activation_scales self._copy_files = copy_files self._load_as_float16 = load_as_float16 self._revision = revision self._low_cpu_mem_usage = low_cpu_mem_usage self._trust_remote_code = trust_remote_code def _load(self): with torch.no_grad(): config = transformers.AutoConfig.from_pretrained( self._model_name_or_path, trust_remote_code=self._trust_remote_code ) config_name = config.__class__.__name__ loader = _MODEL_LOADERS.get(config_name) if loader is None: raise ValueError( "No conversion is registered for the model configuration %s " "(supported configurations are: %s)" % (config_name, ", ".join(sorted(_MODEL_LOADERS.keys()))) ) model_class = getattr(transformers, loader.architecture_name) tokenizer_class = transformers.AutoTokenizer kwargs = { "torch_dtype": ( torch.float16 if self._load_as_float16 else getattr(config, "torch_dtype", None) ) } if self._revision: kwargs["revision"] = self._revision if self._low_cpu_mem_usage: kwargs["low_cpu_mem_usage"] = self._low_cpu_mem_usage if self._trust_remote_code: kwargs["trust_remote_code"] = self._trust_remote_code model = self.load_model(model_class, self._model_name_or_path, **kwargs) tokenizer_kwargs = {} if self._trust_remote_code: tokenizer_kwargs["trust_remote_code"] = self._trust_remote_code tokenizer = self.load_tokenizer( tokenizer_class, self._model_name_or_path, **tokenizer_kwargs ) spec = loader(model, tokenizer) if self._activation_scales: activation_scales = torch.load( self._activation_scales, map_location="cpu" ) loader.smooth_activation(spec, activation_scales) if self._copy_files: for filename in self._copy_files: spec.register_file(self.get_model_file(filename)) return spec def load_model(self, model_class, model_name_or_path, **kwargs): return model_class.from_pretrained(model_name_or_path, **kwargs) def load_tokenizer(self, tokenizer_class, model_name_or_path, **kwargs): return tokenizer_class.from_pretrained(model_name_or_path, **kwargs) def get_model_file(self, filename): if os.path.isdir(self._model_name_or_path): path = os.path.join(self._model_name_or_path, filename) else: try: path = huggingface_hub.hf_hub_download( repo_id=self._model_name_or_path, filename=filename ) except huggingface_hub.utils.EntryNotFoundError: path = None if path is None or not os.path.isfile(path): raise ValueError( "File %s does not exist in model %s" % (filename, self._model_name_or_path) ) return path class ModelLoader(abc.ABC): """Base class for loading Transformers models into a CTranslate2 model specification.""" @property def architecture_name(self): return None @abc.abstractmethod def get_model_spec(self, model): raise NotImplementedError() def __call__(self, model, tokenizer): spec = self.get_model_spec(model) self.set_config(spec.config, model, tokenizer) tokens = self.get_vocabulary(model, tokenizer) self.set_vocabulary(spec, tokens) return spec def get_vocabulary(self, model, tokenizer): return [ token for token, _ in sorted( tokenizer.get_vocab().items(), key=lambda item: item[1] ) ] def set_vocabulary(self, spec, tokens): pass def set_config(self, config, model, tokenizer): pass def set_layer_norm(self, spec, module): spec.gamma = module.weight spec.beta = module.bias def set_linear(self, spec, module, quant_type=common_spec.Quantization.CT2): if quant_type == common_spec.Quantization.CT2: spec.weight = module.weight else: spec.weight = module.qweight spec.weight_scale = module.scales spec.weight_zero = module.qzeros if isinstance(module, transformers.Conv1D): spec.weight = spec.weight.transpose(0, 1) if module.bias is not None: spec.bias = module.bias def set_embeddings(self, spec, module): spec.weight = module.weight def set_position_encodings(self, spec, module): spec.encodings = module.weight offset = getattr(module, "offset", 0) if offset > 0: spec.encodings = spec.encodings[offset:] def smooth_activation(self, spec, activation_scales): raise NotImplementedError( "No activation smoothing logic is defined for this model" ) @register_loader("BartConfig") class BartLoader(ModelLoader): @property def architecture_name(self): return "BartForConditionalGeneration" def get_model_spec(self, model): spec = transformer_spec.TransformerSpec.from_config( (model.config.encoder_layers, model.config.decoder_layers), model.config.encoder_attention_heads, pre_norm=model.config.normalize_before, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], layernorm_embedding=getattr(model.config, "normalize_embedding", True), ) self.set_encoder(spec.encoder, model.model.encoder) self.set_decoder(spec.decoder, model.model.decoder) self.set_linear(spec.decoder.projection, model.lm_head) final_logits_bias = getattr(model, "final_logits_bias", None) if final_logits_bias is not None and final_logits_bias.nonzero().numel() != 0: spec.decoder.projection.bias = final_logits_bias.squeeze() return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) if model.config.vocab_size < len(tokens): tokens = tokens[: model.config.vocab_size] return tokens def set_vocabulary(self, spec, tokens): spec.register_source_vocabulary(tokens) spec.register_target_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token config.decoder_start_token = tokenizer.convert_ids_to_tokens( model.config.decoder_start_token_id ) def set_encoder(self, spec, encoder): self.set_common_layers(spec, encoder) for layer_spec, layer in zip(spec.layer, encoder.layers): self.set_attention( layer_spec.self_attention, layer.self_attn, self_attention=True, ) self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.self_attn_layer_norm, ) self.set_linear(layer_spec.ffn.linear_0, layer.fc1) self.set_linear(layer_spec.ffn.linear_1, layer.fc2) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.final_layer_norm) def set_decoder(self, spec, decoder): self.set_common_layers(spec, decoder) for layer_spec, layer in zip(spec.layer, decoder.layers): self.set_attention( layer_spec.self_attention, layer.self_attn, self_attention=True, ) self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.self_attn_layer_norm, ) if hasattr(layer, "encoder_attn"): self.set_attention( layer_spec.attention, layer.encoder_attn, self_attention=False, ) self.set_layer_norm( layer_spec.attention.layer_norm, layer.encoder_attn_layer_norm, ) self.set_linear(layer_spec.ffn.linear_0, layer.fc1) self.set_linear(layer_spec.ffn.linear_1, layer.fc2) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.final_layer_norm) def set_attention(self, spec, attention, self_attention=False): split_layers = [common_spec.LinearSpec() for _ in range(3)] self.set_linear(split_layers[0], attention.q_proj) self.set_linear(split_layers[1], attention.k_proj) self.set_linear(split_layers[2], attention.v_proj) if self_attention: utils.fuse_linear(spec.linear[0], split_layers) else: utils.fuse_linear(spec.linear[0], split_layers[:1]) utils.fuse_linear(spec.linear[1], split_layers[1:]) self.set_linear(spec.linear[-1], attention.out_proj) def set_common_layers(self, spec, module): spec.scale_embeddings = module.embed_scale self.set_position_encodings(spec.position_encodings, module.embed_positions) self.set_embeddings( ( spec.embeddings[0] if isinstance(spec.embeddings, list) else spec.embeddings ), module.embed_tokens, ) if hasattr(module, "layer_norm"): self.set_layer_norm(spec.layer_norm, module.layer_norm) if hasattr(module, "layernorm_embedding"): self.set_layer_norm(spec.layernorm_embedding, module.layernorm_embedding) @register_loader("MarianConfig") class MarianMTLoader(BartLoader): @property def architecture_name(self): return "MarianMTModel" def get_model_spec(self, model): model.config.normalize_before = False model.config.normalize_embedding = False spec = super().get_model_spec(model) self._remove_pad_weights(spec) return spec def set_config(self, config, model, tokenizer): config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token # The decoder start token can be any token because the decoder always starts # from a zero embedding. config.decoder_start_token = tokenizer.eos_token def set_decoder(self, spec, decoder): spec.start_from_zero_embedding = True super().set_decoder(spec, decoder) def get_vocabulary(self, model, tokenizer): # The token is added by Transformers to start the decoder from a zero embedding, # but we already have a dedicated option "start_from_zero_embedding". We remove this token # to match the original Marian vocabulary and prevent this token from being generated. tokens = super().get_vocabulary(model, tokenizer) if tokens[-1] == "": tokens.pop() return tokens def _remove_pad_weights(self, spec): vocab_specs = [ spec.encoder.embeddings[0], spec.decoder.embeddings, spec.decoder.projection, ] # Weights may be shared so we check against the expected size to prevent # updating the same weight multiple times. new_vocab_size = vocab_specs[0].weight.shape[0] - 1 for vocab_spec in vocab_specs: if vocab_spec.weight.shape[0] == new_vocab_size + 1: vocab_spec.weight = vocab_spec.weight[:-1] if ( isinstance(vocab_spec, common_spec.LinearSpec) and vocab_spec.has_bias() and vocab_spec.bias.shape[0] == new_vocab_size + 1 ): vocab_spec.bias = vocab_spec.bias[:-1] @register_loader("M2M100Config") class M2M100Loader(BartLoader): @property def architecture_name(self): return "M2M100ForConditionalGeneration" def get_model_spec(self, model): model.config.normalize_before = True model.config.normalize_embedding = False return super().get_model_spec(model) def set_position_encodings(self, spec, module): spec.encodings = module.weights[module.offset :] def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) # Workaround for issue https://github.com/OpenNMT/CTranslate2/issues/1039. if tokens[-1] == tokenizer.unk_token: tokens.insert(tokenizer.unk_token_id, tokens.pop()) for token in tokenizer.additional_special_tokens: if token not in tokens: tokens.append(token) num_madeup_words = getattr( tokenizer, "num_madeup_words", model.config.vocab_size - len(tokens) ) if num_madeup_words > 0: tokens += ["madeupword%d" % i for i in range(num_madeup_words)] return tokens @register_loader("MBartConfig") class MBartLoader(BartLoader): @property def architecture_name(self): return "MBartForConditionalGeneration" def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token # MBart-25 passes the language code as the decoder start token. if model.config.tokenizer_class in ("MBartTokenizer", None): config.decoder_start_token = None else: config.decoder_start_token = tokenizer.eos_token @register_loader("PegasusConfig") class PegasusLoader(BartLoader): @property def architecture_name(self): return "PegasusForConditionalGeneration" def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.pad_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token config.decoder_start_token = tokenizer.pad_token @register_loader("OPTConfig") class OPTLoader(BartLoader): @property def architecture_name(self): return "OPTForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.num_hidden_layers, model.config.num_attention_heads, pre_norm=model.config.do_layer_norm_before, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], project_in_out=model.config.word_embed_proj_dim != model.config.hidden_size, ) self.set_decoder(spec.decoder, model.model.decoder) self.set_linear(spec.decoder.projection, model.lm_head) return spec def smooth_activation(self, spec, activation_scales): for i, layer in enumerate(spec.decoder.layer): layer_scope = "model.decoder.layers.%d" % i utils.smooth_activation( layer.self_attention.layer_norm, layer.self_attention.linear[0], activation_scales["%s.self_attn.q_proj" % layer_scope], ) utils.smooth_activation( layer.ffn.layer_norm, layer.ffn.linear_0, activation_scales["%s.fc1" % layer_scope], ) def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, decoder): super().set_decoder(spec, decoder) if decoder.project_in is not None: self.set_linear(spec.project_in, decoder.project_in) if decoder.project_out is not None: self.set_linear(spec.project_out, decoder.project_out) if decoder.final_layer_norm is not None: self.set_layer_norm(spec.layer_norm, decoder.final_layer_norm) def set_common_layers(self, spec, module): spec.scale_embeddings = False self.set_position_encodings(spec.position_encodings, module.embed_positions) self.set_embeddings(spec.embeddings, module.embed_tokens) def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) i = 0 while len(tokens) % 8 != 0: symbol = "madeupword{:04d}".format(i) if symbol not in tokens: tokens.append(symbol) i += 1 return tokens @register_loader("GPTBigCodeConfig") class GPTBigCodeMHALoader(ModelLoader): @property def architecture_name(self): return "GPTBigCodeForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.n_layer, model.config.n_head, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], multi_query_attention=True, ) self.set_decoder(spec.decoder, model.transformer) self.set_linear(spec.decoder.projection, model.lm_head) return spec def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.wte) self.set_position_encodings(spec.position_encodings, module.wpe) self.set_layer_norm(spec.layer_norm, module.ln_f) for layer_spec, layer in zip(spec.layer, module.h): self.set_layer_norm(layer_spec.self_attention.layer_norm, layer.ln_1) self.set_linear(layer_spec.self_attention.linear[0], layer.attn.c_attn) self.set_linear(layer_spec.self_attention.linear[1], layer.attn.c_proj) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.ln_2) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.c_fc) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.c_proj) @register_loader("GPT2Config") class GPT2Loader(ModelLoader): @property def architecture_name(self): return "GPT2LMHeadModel" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.n_layer, model.config.n_head, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], ) self.set_decoder(spec.decoder, model.transformer) self.set_linear(spec.decoder.projection, model.lm_head) return spec def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.wte) self.set_position_encodings(spec.position_encodings, module.wpe) self.set_layer_norm(spec.layer_norm, module.ln_f) for layer_spec, layer in zip(spec.layer, module.h): self.set_layer_norm(layer_spec.self_attention.layer_norm, layer.ln_1) self.set_linear(layer_spec.self_attention.linear[0], layer.attn.c_attn) self.set_linear(layer_spec.self_attention.linear[1], layer.attn.c_proj) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.ln_2) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.c_fc) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.c_proj) @register_loader("GPTJConfig") class GPTJLoader(ModelLoader): @property def architecture_name(self): return "GPTJForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.n_layer, model.config.n_head, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], rotary_dim=model.config.rotary_dim, rotary_interleave=False, parallel_residual=True, shared_layer_norm=True, ) self.set_decoder( spec.decoder, model.transformer, model.config.rotary_dim, model.config.n_head, ) self.set_linear(spec.decoder.projection, model.lm_head) return spec def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module, rotary_dim, num_heads): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.wte) self.set_layer_norm(spec.layer_norm, module.ln_f) for layer_spec, layer in zip(spec.layer, module.h): self.set_layer_norm(layer_spec.shared_layer_norm, layer.ln_1) qw = layer.attn.q_proj.weight kw = layer.attn.k_proj.weight vw = layer.attn.v_proj.weight qw = utils.permute_for_sliced_rotary(qw, num_heads, rotary_dim) kw = utils.permute_for_sliced_rotary(kw, num_heads, rotary_dim) layer_spec.self_attention.linear[0].weight = torch.cat((qw, kw, vw)) self.set_linear(layer_spec.self_attention.linear[1], layer.attn.out_proj) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.fc_in) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.fc_out) @register_loader("CodeGenConfig") class CodeGenLoader(ModelLoader): @property def architecture_name(self): return "CodeGenForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.n_layer, model.config.n_head, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], rotary_dim=model.config.rotary_dim, rotary_interleave=False, parallel_residual=True, shared_layer_norm=True, ) mp_num = 4 if hasattr(model.config, "head_dim") and model.config.head_dim in [128, 256]: # models forked from "Salesforce/codegen2-1B" and "Salesforce/codegen2-3_7B" # use a special setting of mp_num=8, all other using 4 # these model.config's use a special setting of head_dim mp_num = 8 self.set_decoder( spec.decoder, model.transformer, model.config.rotary_dim, model.config.n_head, model.config.n_embd, mp_num=mp_num, ) self.set_linear(spec.decoder.projection, model.lm_head) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): # fix for additional vocab, see GPTNeoX Converter tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module, rotary_dim, num_heads, embed_dim, mp_num): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.wte) self.set_layer_norm(spec.layer_norm, module.ln_f) base_permutation = np.arange(0, mp_num * 3).reshape(-1, 3).T.flatten().tolist() local_dim = embed_dim // mp_num permutation = torch.cat( [torch.arange(i * local_dim, (i + 1) * local_dim) for i in base_permutation] ) for layer_spec, layer in zip(spec.layer, module.h): self.set_layer_norm(layer_spec.shared_layer_norm, layer.ln_1) # [start convert CodeGen to GPT-J format] # see https://github.com/fauxpilot/fauxpilot/blob/fb4073a9078dd001ebeb7dfefb8cb2ecc8a88f4b/converter/codegen_gptj_convert.py # noqa qkv_proj = layer.attn.qkv_proj.weight # GPT-J and CodeGen slice up the qkv projection slightly differently. # the following permutation brings Codegen 'qkv_proj' # in GPT-J order of qw, vw, kw # we permute the *rows* here because the computation is xA.T new_qkv_proj = qkv_proj[permutation, :] # the name QKV is misleading here; they are actually stored in QVK qw, vw, kw = new_qkv_proj.chunk(3, dim=0) # [end convert CodeGen to GPT-J.] qw = utils.permute_for_sliced_rotary(qw, num_heads, rotary_dim) kw = utils.permute_for_sliced_rotary(kw, num_heads, rotary_dim) layer_spec.self_attention.linear[0].weight = torch.cat((qw, kw, vw)) self.set_linear(layer_spec.self_attention.linear[1], layer.attn.out_proj) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.fc_in) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.fc_out) @register_loader("GPTNeoXConfig") class GPTNeoXLoader(ModelLoader): @property def architecture_name(self): return "GPTNeoXForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.num_hidden_layers, model.config.num_attention_heads, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.hidden_act], rotary_dim=int( model.config.rotary_pct * (model.config.hidden_size // model.config.num_attention_heads) ), rotary_interleave=False, parallel_residual=model.config.use_parallel_residual, shared_layer_norm=False, ) self.set_decoder(spec.decoder, model.gpt_neox, model.config.num_attention_heads) self.set_linear(spec.decoder.projection, model.embed_out) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module, num_heads): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.embed_in) self.set_layer_norm(spec.layer_norm, module.final_layer_norm) for layer_spec, layer in zip(spec.layer, module.layers): if hasattr(layer_spec, "input_layer_norm"): # Use parallel residual. self.set_layer_norm(layer_spec.input_layer_norm, layer.input_layernorm) self.set_layer_norm( layer_spec.post_attention_layer_norm, layer.post_attention_layernorm ) else: self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.input_layernorm ) self.set_layer_norm( layer_spec.ffn.layer_norm, layer.post_attention_layernorm ) qkv_w = layer.attention.query_key_value.weight qkv_b = layer.attention.query_key_value.bias qkv_w = ( qkv_w.reshape(num_heads, 3, -1, qkv_w.shape[-1]) .swapaxes(0, 1) .reshape(-1, qkv_w.shape[-1]) ) qkv_b = qkv_b.reshape(num_heads, 3, -1).swapaxes(0, 1).reshape(-1) layer_spec.self_attention.linear[0].weight = qkv_w layer_spec.self_attention.linear[0].bias = qkv_b self.set_linear(layer_spec.self_attention.linear[1], layer.attention.dense) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.dense_h_to_4h) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.dense_4h_to_h) @register_loader("WhisperConfig") class WhisperLoader(BartLoader): @property def architecture_name(self): return "WhisperForConditionalGeneration" def get_model_spec(self, model): spec = whisper_spec.WhisperSpec( model.config.encoder_layers, model.config.encoder_attention_heads, model.config.decoder_layers, model.config.decoder_attention_heads, ) self.set_encoder(spec.encoder, model.model.encoder) self.set_decoder(spec.decoder, model.model.decoder) self.set_linear(spec.decoder.projection, model.proj_out) return spec def _get_lang_ids_from_tokenizer(self, tokenizer): non_lang_special_tokens = [ "<|endoftext|>", "<|startoftranscript|>", "<|translate|>", "<|transcribe|>", "<|startoflm|>", "<|startofprev|>", "<|nocaptions|>", "<|notimestamps|>", ] return [ token_id for token_id, token in zip( tokenizer.additional_special_tokens_ids, tokenizer.additional_special_tokens, ) if token not in non_lang_special_tokens ] def set_config(self, config, model, tokenizer): gen_config = getattr(model, "generation_config", None) if gen_config is not None: config.suppress_ids = gen_config.suppress_tokens config.suppress_ids_begin = gen_config.begin_suppress_tokens if hasattr(gen_config, "alignment_heads"): config.alignment_heads = gen_config.alignment_heads if hasattr(gen_config, "lang_to_id"): config.lang_ids = sorted(gen_config.lang_to_id.values()) else: config.suppress_ids = model.config.suppress_tokens config.suppress_ids_begin = model.config.begin_suppress_tokens config.alignment_heads = _WHISPER_ALIGNMENT_HEADS.get(model.name_or_path) if getattr(config, "lang_ids", None) is None: config.lang_ids = self._get_lang_ids_from_tokenizer(tokenizer) if config.alignment_heads is None: # Use the last half layers for alignment by default. num_layers = model.config.decoder_layers num_heads = model.config.decoder_attention_heads config.alignment_heads = list( itertools.product( range(num_layers // 2, num_layers), range(num_heads), ) ) def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) # Add timestamp tokens. tokens.extend( "<|%.2f|>" % (i * 0.02) for i in range(model.config.vocab_size - len(tokens)) ) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_encoder(self, spec, encoder): self.set_conv1d(spec.conv1, encoder.conv1) self.set_conv1d(spec.conv2, encoder.conv2) super().set_encoder(spec, encoder) def set_decoder(self, spec, decoder): self.set_embeddings(spec.embeddings, decoder.embed_tokens) super().set_decoder(spec, decoder) def set_common_layers(self, spec, module): self.set_position_encodings(spec.position_encodings, module.embed_positions) self.set_layer_norm(spec.layer_norm, module.layer_norm) def set_conv1d(self, spec, module): spec.weight = module.weight spec.bias = module.bias @register_loader("Wav2Vec2Config") class Wav2Vec2Loader(BartLoader): @property def architecture_name(self): return "Wav2Vec2ForCTC" def get_model_spec(self, model): spec = wav2vec2_spec.Wav2Vec2Spec( model.wav2vec2.config.num_feat_extract_layers, model.wav2vec2.encoder.config.num_hidden_layers, model.wav2vec2.encoder.config.num_attention_heads, ) # layer component name matching (no duplications saving) for layer in model.wav2vec2.encoder.layers: layer.self_attn = layer.attention layer.self_attn_layer_norm = layer.layer_norm layer.activation_fn = layer.feed_forward.intermediate_act_fn layer.fc1 = layer.feed_forward.intermediate_dense layer.fc2 = layer.feed_forward.output_dense self.set_encoder(spec.encoder, model, model.wav2vec2.config) return spec def set_config(self, config, model, tokenizer): return def get_vocabulary(self, model, tokenizer): return tokenizer.get_vocab() def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_feature_extractor(self, spec, feature_extractor): spec.feat_layer0.conv.weight = feature_extractor.conv_layers[0].conv.weight spec.feat_layer0.conv.bias = feature_extractor.conv_layers[0].conv.bias self.set_layer_norm( spec.feat_layer0.layer_norm, feature_extractor.conv_layers[0].layer_norm ) for spec_layer, module_layer in zip( spec.feat_layer, feature_extractor.conv_layers[1:] ): spec_layer.conv.weight = module_layer.conv.weight spec_layer.conv.bias = module_layer.conv.bias self.set_layer_norm(spec_layer.layer_norm, module_layer.layer_norm) def set_feature_projection(self, spec, feature_projection): self.set_layer_norm(spec.fp_layer_norm, feature_projection.layer_norm) self.set_linear(spec.fp_projection, feature_projection.projection) def set_pos_conv_embed(self, spec, encoder, config): # forcing parameters to be set because some transformers version initializes garbage numbers # conv parameters are float16 so force float32 for the loading encoder.pos_conv_embed.conv.weight.data = ( encoder.pos_conv_embed.conv.weight.data.float() ) encoder.pos_conv_embed.conv.bias.data = encoder.pos_conv_embed.conv.bias.float() for param in encoder.pos_conv_embed.parameters(): param.data = param.data.float() encoder.pos_conv_embed(torch.randn((1, 1, config.hidden_size))) spec.pos_conv_embed.conv.weight = encoder.pos_conv_embed.conv.weight spec.pos_conv_embed.conv.bias = encoder.pos_conv_embed.conv.bias def set_encoder(self, spec, model, config): self.set_feature_extractor(spec, model.wav2vec2.feature_extractor) self.set_feature_projection(spec, model.wav2vec2.feature_projection) self.set_pos_conv_embed(spec, model.wav2vec2.encoder, config) super().set_encoder(spec, model.wav2vec2.encoder) self.set_linear(spec.lm_head, model.lm_head) def set_common_layers(self, spec, module): self.set_layer_norm(spec.layer_norm, module.layer_norm) @register_loader("T5Config") class T5Loader(ModelLoader): @property def architecture_name(self): return "T5ForConditionalGeneration" def get_model_spec(self, model): spec = transformer_spec.TransformerSpec.from_config( (model.config.num_layers, model.config.num_decoder_layers), model.config.num_heads, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.dense_act_fn], ffn_glu=model.config.is_gated_act, relative_attention_bias=True, rms_norm=True, ) self.set_stack(spec.encoder, model.encoder) self.set_stack(spec.decoder, model.decoder, is_decoder=True) self.set_linear(spec.decoder.projection, model.lm_head) if model.config.tie_word_embeddings: spec.decoder.scale_outputs = model.config.d_model**-0.5 return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_source_vocabulary(tokens) spec.register_target_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.pad_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token if hasattr(model.config, "decoder_start_token_id"): config.decoder_start_token = tokenizer.convert_ids_to_tokens( model.config.decoder_start_token_id ) else: config.decoder_start_token = tokenizer.pad_token def set_stack(self, spec, module, is_decoder=False): self.set_layer_norm(spec.layer_norm, module.final_layer_norm) self.set_embeddings( ( spec.embeddings[0] if isinstance(spec.embeddings, list) else spec.embeddings ), module.embed_tokens, ) spec.scale_embeddings = False for i, (layer_spec, block) in enumerate(zip(spec.layer, module.block)): self.set_self_attention(layer_spec.self_attention, block.layer[0]) if i > 0: # Reuse relative attention bias from the first layer. first_self_attention = spec.layer[0].self_attention layer_spec.self_attention.relative_attention_bias = ( first_self_attention.relative_attention_bias ) layer_spec.self_attention.relative_attention_max_distance = ( first_self_attention.relative_attention_max_distance ) if is_decoder: self.set_cross_attention(layer_spec.attention, block.layer[1]) self.set_ffn(layer_spec.ffn, block.layer[-1]) def set_ffn(self, spec, module): if hasattr(spec, "linear_0_noact"): self.set_linear(spec.linear_0, module.DenseReluDense.wi_0) self.set_linear(spec.linear_0_noact, module.DenseReluDense.wi_1) else: self.set_linear(spec.linear_0, module.DenseReluDense.wi) self.set_linear(spec.linear_1, module.DenseReluDense.wo) self.set_layer_norm(spec.layer_norm, module.layer_norm) def set_self_attention(self, spec, module): self.set_attention(spec, module.SelfAttention, self_attention=True) self.set_layer_norm(spec.layer_norm, module.layer_norm) def set_cross_attention(self, spec, module): self.set_attention(spec, module.EncDecAttention) self.set_layer_norm(spec.layer_norm, module.layer_norm) def set_attention(self, spec, attention, self_attention=False): spec.queries_scale = 1.0 split_layers = [common_spec.LinearSpec() for _ in range(3)] self.set_linear(split_layers[0], attention.q) self.set_linear(split_layers[1], attention.k) self.set_linear(split_layers[2], attention.v) if self_attention: utils.fuse_linear(spec.linear[0], split_layers) else: utils.fuse_linear(spec.linear[0], split_layers[:1]) utils.fuse_linear(spec.linear[1], split_layers[1:]) self.set_linear(spec.linear[-1], attention.o) if attention.has_relative_attention_bias: spec.relative_attention_bias = attention.relative_attention_bias.weight spec.relative_attention_max_distance = np.dtype("int32").type( attention.relative_attention_max_distance ) def set_layer_norm(self, spec, layer_norm): spec.gamma = layer_norm.weight @register_loader("MT5Config") class MT5Loader(T5Loader): @property def architecture_name(self): return "MT5ForConditionalGeneration" @register_loader("BloomConfig") class BloomLoader(ModelLoader): @property def architecture_name(self): return "BloomForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.n_layer, model.config.n_head, pre_norm=True, activation=common_spec.Activation.GELUTanh, layernorm_embedding=True, alibi=True, alibi_use_positive_positions=True, ) self.set_decoder(spec.decoder, model.transformer) self.set_linear(spec.decoder.projection, model.lm_head) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.word_embeddings) self.set_layer_norm(spec.layernorm_embedding, module.word_embeddings_layernorm) self.set_layer_norm(spec.layer_norm, module.ln_f) for layer_spec, layer in zip(spec.layer, module.h): self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.input_layernorm ) self.set_qkv_linear( layer_spec.self_attention.linear[0], layer.self_attention.query_key_value, layer.self_attention.num_heads, ) self.set_linear( layer_spec.self_attention.linear[1], layer.self_attention.dense ) self.set_layer_norm( layer_spec.ffn.layer_norm, layer.post_attention_layernorm ) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.dense_h_to_4h) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.dense_4h_to_h) def set_qkv_linear(self, spec, module, num_heads): weight = module.weight weight = weight.reshape(num_heads, 3, -1, weight.shape[-1]) weight = weight.transpose(0, 1) weight = weight.reshape(-1, weight.shape[-1]) bias = module.bias bias = bias.reshape(num_heads, 3, -1) bias = bias.transpose(0, 1) bias = bias.reshape(-1) spec.weight = weight spec.bias = bias @register_loader("MPTConfig") class MPTLoader(ModelLoader): @property def architecture_name(self): return "AutoModelForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( model.config.n_layers, model.config.n_heads, pre_norm=True, activation=common_spec.Activation.GELU, alibi=True, ) self.set_decoder(spec.decoder, model.transformer) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module): self.set_embeddings(spec.embeddings, module.wte) self.set_layer_norm(spec.layer_norm, module.norm_f) spec.scale_embeddings = False spec.projection.weight = spec.embeddings.weight for layer_spec, layer in zip(spec.layer, module.blocks): self.set_layer_norm(layer_spec.self_attention.layer_norm, layer.norm_1) self.set_linear(layer_spec.self_attention.linear[0], layer.attn.Wqkv) self.set_linear(layer_spec.self_attention.linear[1], layer.attn.out_proj) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.norm_2) self.set_linear(layer_spec.ffn.linear_0, layer.ffn.up_proj) self.set_linear(layer_spec.ffn.linear_1, layer.ffn.down_proj) def set_layer_norm(self, spec, module): spec.gamma = module.weight spec.beta = torch.zeros_like(spec.gamma) @register_loader("GemmaConfig") class GemmaLoader(ModelLoader): @property def architecture_name(self): return "GemmaForCausalLM" def get_model_spec(self, model): num_layers = model.config.num_hidden_layers num_heads = model.config.num_attention_heads num_heads_kv = getattr(model.config, "num_key_value_heads", num_heads) if num_heads_kv == num_heads: num_heads_kv = None activation_config = getattr( model.config, "hidden_activation", "gelu_pytorch_tanh" ) spec = transformer_spec.TransformerDecoderModelSpec.from_config( num_layers, num_heads, activation=( common_spec.Activation.GELU if activation_config == "gelu" else common_spec.Activation.GELUTanh ), pre_norm=True, ffn_glu=True, rms_norm=True, rotary_dim=0, rotary_interleave=False, rotary_base=getattr(model.config, "rope_theta", 10000), num_heads_kv=num_heads_kv, head_dim=model.config.head_dim, ) self.set_decoder(spec.decoder, model.model) self.set_linear(spec.decoder.projection, model.lm_head) spec.decoder.embeddings.multiply_by_sqrt_depth = model.config.hidden_size**0.5 return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) if model.config.vocab_size < len(tokens): tokens = tokens[: model.config.vocab_size] return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token config.layer_norm_epsilon = model.config.rms_norm_eps def set_layer_norm(self, spec, layer_norm): spec.gamma = layer_norm.weight spec.layer_norm_use_residual = True def set_decoder(self, spec, module): spec.scale_embeddings = True spec.start_from_zero_embedding = False self.set_embeddings(spec.embeddings, module.embed_tokens) self.set_layer_norm(spec.layer_norm, module.norm) for layer_spec, layer in zip(spec.layer, module.layers): self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.input_layernorm ) self.set_layer_norm( layer_spec.ffn.layer_norm, layer.post_attention_layernorm ) wq = layer.self_attn.q_proj.weight wk = layer.self_attn.k_proj.weight wv = layer.self_attn.v_proj.weight wo = layer.self_attn.o_proj.weight layer_spec.self_attention.linear[0].weight = torch.cat([wq, wk, wv]) layer_spec.self_attention.linear[1].weight = wo self.set_linear(layer_spec.ffn.linear_0, layer.mlp.gate_proj) self.set_linear(layer_spec.ffn.linear_0_noact, layer.mlp.up_proj) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.down_proj) delattr(layer, "self_attn") delattr(layer, "mlp") gc.collect() @register_loader("Gemma2Config") class Gemma2Loader(ModelLoader): @property def architecture_name(self): return "Gemma2ForCausalLM" def get_model_spec(self, model): num_layers = model.config.num_hidden_layers num_heads = model.config.num_attention_heads num_heads_kv = getattr(model.config, "num_key_value_heads", num_heads) if num_heads_kv == num_heads: num_heads_kv = None activation_config = getattr( model.config, "hidden_activation", "gelu_pytorch_tanh" ) spec = transformer_spec.TransformerDecoderModelSpec.from_config( num_layers, num_heads, activation=( common_spec.Activation.GELU if activation_config == "gelu" else common_spec.Activation.GELUTanh ), pre_norm=True, ffn_glu=True, rms_norm=True, rotary_dim=0, rotary_interleave=False, rotary_base=getattr(model.config, "rope_theta", 10000), num_heads_kv=num_heads_kv, head_dim=model.config.head_dim, pre_post_layer_norm=True, ) self.set_decoder(spec.decoder, model.model) self.set_linear(spec.decoder.projection, model.lm_head) spec.decoder.embeddings.multiply_by_sqrt_depth = model.config.hidden_size**0.5 return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) if model.config.vocab_size < len(tokens): tokens = tokens[: model.config.vocab_size] return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token config.layer_norm_epsilon = model.config.rms_norm_eps def set_layer_norm(self, spec, layer_norm): spec.gamma = layer_norm.weight spec.layer_norm_use_residual = True def set_decoder(self, spec, module): spec.scale_embeddings = True spec.start_from_zero_embedding = False self.set_embeddings(spec.embeddings, module.embed_tokens) self.set_layer_norm(spec.layer_norm, module.norm) for layer_spec, layer in zip(spec.layer, module.layers): self.set_layer_norm(layer_spec.input_layer_norm, layer.input_layernorm) self.set_layer_norm( layer_spec.post_attention_layer_norm, layer.post_attention_layernorm ) self.set_layer_norm( layer_spec.pre_feedforward_layer_norm, layer.pre_feedforward_layernorm ) self.set_layer_norm( layer_spec.post_feedforward_layer_norm, layer.post_feedforward_layernorm ) wq = layer.self_attn.q_proj.weight wk = layer.self_attn.k_proj.weight wv = layer.self_attn.v_proj.weight wo = layer.self_attn.o_proj.weight layer_spec.self_attention.linear[0].weight = torch.cat([wq, wk, wv]) layer_spec.self_attention.linear[1].weight = wo self.set_linear(layer_spec.ffn.linear_0, layer.mlp.gate_proj) self.set_linear(layer_spec.ffn.linear_0_noact, layer.mlp.up_proj) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.down_proj) delattr(layer, "self_attn") delattr(layer, "mlp") gc.collect() @register_loader("LlamaConfig") class LlamaLoader(ModelLoader): @property def architecture_name(self): return "LlamaForCausalLM" def get_model_spec(self, model): num_layers = model.config.num_hidden_layers num_heads = model.config.num_attention_heads num_heads_kv = getattr(model.config, "num_key_value_heads", num_heads) if num_heads_kv == num_heads: num_heads_kv = None rope_scaling = getattr(model.config, "rope_scaling", None) if rope_scaling: rope_type = rope_scaling.get("type") or rope_scaling["rope_type"] rotary_scaling_type = _SUPPORTED_ROPE_SCALING.get(rope_type) rotary_scaling_factor = rope_scaling["factor"] if rotary_scaling_type is None: raise NotImplementedError( "RoPE scaling type '%s' is not yet implemented. " "The following RoPE scaling types are currently supported: %s" % (rope_scaling["type"], ", ".join(_SUPPORTED_ROPE_SCALING.keys())) ) else: rotary_scaling_type = None rotary_scaling_factor = 1 quantization_config = getattr(model.config, "quantization_config", None) if quantization_config: quant_type = None if quantization_config.quant_method == "awq": quant_type = _SUPPORTED_QUANTIZATION.get(quantization_config.version) if quant_type is None: raise NotImplementedError( "Quantization type '%s' is not yet implemented. " "The following Quantization types are currently supported: %s" % ( quantization_config.quant_method, ", ".join(_SUPPORTED_QUANTIZATION.keys()), ) ) quant_group_size = quantization_config.group_size quant_bits = quantization_config.bits else: quant_type = common_spec.Quantization.CT2 quant_group_size = None quant_bits = None spec = transformer_spec.TransformerDecoderModelSpec.from_config( num_layers, num_heads, activation=common_spec.Activation.SWISH, pre_norm=True, ffn_glu=True, rms_norm=True, rotary_dim=0, rotary_interleave=False, rotary_scaling_type=rotary_scaling_type, rotary_scaling_factor=rotary_scaling_factor, rotary_base=getattr(model.config, "rope_theta", 10000), num_heads_kv=num_heads_kv, quant_type=quant_type, quant_group_size=quant_group_size, quant_bits=quant_bits, ) self.set_decoder(spec.decoder, model.model, quant_type) self.set_linear(spec.decoder.projection, model.lm_head) # set extra RoPE parameters for Llama-3.1 if rotary_scaling_type == attention_spec.RotaryScalingType.Llama3: for layer in spec.decoder.layer: layer.self_attention.rotary_low_freq_factor = rope_scaling[ "low_freq_factor" ] layer.self_attention.rotary_high_freq_factor = rope_scaling[ "high_freq_factor" ] return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) if model.config.vocab_size < len(tokens): tokens = tokens[: model.config.vocab_size] return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = ( tokenizer.unk_token if tokenizer.unk_token is not None else "" ) config.layer_norm_epsilon = model.config.rms_norm_eps def set_layer_norm(self, spec, layer_norm): spec.gamma = layer_norm.weight def set_decoder(self, spec, module, quant_type=common_spec.Quantization.CT2): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.embed_tokens) self.set_layer_norm(spec.layer_norm, module.norm) for layer_spec, layer in zip(spec.layer, module.layers): self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.input_layernorm ) self.set_layer_norm( layer_spec.ffn.layer_norm, layer.post_attention_layernorm ) split_layers = [common_spec.LinearSpec() for _ in range(3)] self.set_linear( split_layers[0], layer.self_attn.q_proj, quant_type=quant_type ) self.set_linear( split_layers[1], layer.self_attn.k_proj, quant_type=quant_type ) self.set_linear( split_layers[2], layer.self_attn.v_proj, quant_type=quant_type ) if quant_type == common_spec.Quantization.CT2: utils.fuse_linear(layer_spec.self_attention.linear[0], split_layers) else: cc_dim = 1 if quant_type == common_spec.Quantization.AWQ_GEMM else 0 utils.fuse_linear_prequant( layer_spec.self_attention.linear[0], split_layers, cc_dim ) self.set_linear( layer_spec.self_attention.linear[1], layer.self_attn.o_proj, quant_type=quant_type, ) self.set_linear( layer_spec.ffn.linear_0, layer.mlp.gate_proj, quant_type=quant_type ) self.set_linear( layer_spec.ffn.linear_0_noact, layer.mlp.up_proj, quant_type=quant_type ) self.set_linear( layer_spec.ffn.linear_1, layer.mlp.down_proj, quant_type=quant_type ) delattr(layer, "self_attn") delattr(layer, "mlp") gc.collect() @register_loader("MistralConfig") class MistralLoader(ModelLoader): @property def architecture_name(self): return "MistralForCausalLM" def get_model_spec(self, model): num_layers = model.config.num_hidden_layers num_heads = model.config.num_attention_heads num_heads_kv = getattr(model.config, "num_key_value_heads", num_heads) if num_heads_kv == num_heads: num_heads_kv = None sliding_window = getattr(model.config, "sliding_window", 0) rope_scaling = getattr(model.config, "rope_scaling", None) if rope_scaling: rotary_scaling_type = _SUPPORTED_ROPE_SCALING.get(rope_scaling["type"]) rotary_scaling_factor = rope_scaling["factor"] if rotary_scaling_type is None: raise NotImplementedError( "RoPE scaling type '%s' is not yet implemented. " "The following RoPE scaling types are currently supported: %s" % (rope_scaling["type"], ", ".join(_SUPPORTED_ROPE_SCALING.keys())) ) else: rotary_scaling_type = None rotary_scaling_factor = 1 quantization_config = getattr(model.config, "quantization_config", None) if quantization_config: if quantization_config.quant_method == "awq": quant_type = _SUPPORTED_QUANTIZATION.get(quantization_config.version) if quant_type is None: raise NotImplementedError( "Quantization type '%s' is not yet implemented. " "The following Quantization types are currently supported: %s" % ( quantization_config.quant_method, ", ".join(_SUPPORTED_QUANTIZATION.keys()), ) ) quant_group_size = quantization_config.group_size quant_bits = quantization_config.bits else: quant_type = common_spec.Quantization.CT2 quant_group_size = None quant_bits = None spec = transformer_spec.TransformerDecoderModelSpec.from_config( num_layers, num_heads, activation=common_spec.Activation.SWISH, pre_norm=True, ffn_glu=True, rms_norm=True, rotary_dim=0, rotary_interleave=False, rotary_scaling_type=rotary_scaling_type, rotary_scaling_factor=rotary_scaling_factor, rotary_base=getattr(model.config, "rope_theta", 10000), num_heads_kv=num_heads_kv, sliding_window=sliding_window, quant_type=quant_type, quant_group_size=quant_group_size, quant_bits=quant_bits, ) self.set_decoder(spec.decoder, model.model, quant_type=quant_type) self.set_linear(spec.decoder.projection, model.lm_head) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token config.layer_norm_epsilon = model.config.rms_norm_eps def set_layer_norm(self, spec, layer_norm): spec.gamma = layer_norm.weight def set_decoder(self, spec, module, quant_type=common_spec.Quantization.CT2): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.embed_tokens) self.set_layer_norm(spec.layer_norm, module.norm) for layer_spec, layer in zip(spec.layer, module.layers): self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.input_layernorm ) self.set_layer_norm( layer_spec.ffn.layer_norm, layer.post_attention_layernorm ) split_layers = [common_spec.LinearSpec() for _ in range(3)] self.set_linear( split_layers[0], layer.self_attn.q_proj, quant_type=quant_type ) self.set_linear( split_layers[1], layer.self_attn.k_proj, quant_type=quant_type ) self.set_linear( split_layers[2], layer.self_attn.v_proj, quant_type=quant_type ) if quant_type == common_spec.Quantization.CT2: utils.fuse_linear(layer_spec.self_attention.linear[0], split_layers) else: cc_dim = 1 if quant_type == common_spec.Quantization.AWQ_GEMM else 0 utils.fuse_linear_prequant( layer_spec.self_attention.linear[0], split_layers, cc_dim ) self.set_linear( layer_spec.self_attention.linear[1], layer.self_attn.o_proj, quant_type=quant_type, ) self.set_linear( layer_spec.ffn.linear_0, layer.mlp.gate_proj, quant_type=quant_type ) self.set_linear( layer_spec.ffn.linear_0_noact, layer.mlp.up_proj, quant_type=quant_type ) self.set_linear( layer_spec.ffn.linear_1, layer.mlp.down_proj, quant_type=quant_type ) delattr(layer, "self_attn") delattr(layer, "mlp") gc.collect() @register_loader("MixFormerSequentialConfig") class MixFormerSequentialLoader(ModelLoader): @property def architecture_name(self): return "AutoModelForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( num_layers=model.config.n_layer, num_heads=model.config.n_head, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], rotary_dim=model.config.rotary_dim, rotary_interleave=False, parallel_residual=True, shared_layer_norm=True, ) self.set_decoder(spec.decoder, model.layers) self.set_linear(spec.decoder.projection, model.layers[-1].linear) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module[0].wte) self.set_layer_norm(spec.layer_norm, module[-1].ln) for layer_spec, layer in zip(spec.layer, module[1:-1]): self.set_layer_norm(layer_spec.shared_layer_norm, layer.ln) self.set_linear(layer_spec.self_attention.linear[0], layer.mixer.Wqkv) self.set_linear(layer_spec.self_attention.linear[1], layer.mixer.out_proj) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.fc1) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.fc2) @register_loader("PhiConfig") class PhiLoader(ModelLoader): @property def architecture_name(self): return "AutoModelForCausalLM" def get_model_spec(self, model): spec = transformer_spec.TransformerDecoderModelSpec.from_config( num_layers=model.config.n_layer, num_heads=model.config.n_head, pre_norm=True, activation=_SUPPORTED_ACTIVATIONS[model.config.activation_function], rotary_dim=model.config.rotary_dim, rotary_interleave=False, parallel_residual=True, shared_layer_norm=True, ) self.set_decoder(spec.decoder, model.transformer) self.set_linear(spec.decoder.projection, model.lm_head.linear) self.set_layer_norm(spec.decoder.layer_norm, model.lm_head.ln) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_decoder(self, spec, module): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.embd.wte) for layer_spec, layer in zip(spec.layer, module.h): self.set_layer_norm(layer_spec.shared_layer_norm, layer.ln) self.set_linear(layer_spec.self_attention.linear[0], layer.mixer.Wqkv) self.set_linear(layer_spec.self_attention.linear[1], layer.mixer.out_proj) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.fc1) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.fc2) @register_loader("Phi3Config") class Phi3Loader(ModelLoader): @property def architecture_name(self): return "AutoModelForCausalLM" def get_model_spec(self, model): num_layers = model.config.num_hidden_layers num_heads = model.config.num_attention_heads num_heads_kv = getattr(model.config, "num_key_value_heads", num_heads) if num_heads_kv == num_heads: num_heads_kv = None original_max_position_embeddings = getattr( model.config, "original_max_position_embeddings", 0 ) max_position_embeddings = getattr(model.config, "max_position_embeddings", 0) rope_scaling = getattr(model.config, "rope_scaling", None) if rope_scaling: rotary_scaling_type = _SUPPORTED_ROPE_SCALING.get(rope_scaling["type"]) rotary_scaling_factor = rope_scaling.get("factor", 1) if rotary_scaling_type is None: raise NotImplementedError( "RoPE scaling type '%s' is not yet implemented. " "The following RoPE scaling types are currently supported: %s" % (rope_scaling["type"], ", ".join(_SUPPORTED_ROPE_SCALING.keys())) ) else: rotary_scaling_type = None rotary_scaling_factor = 1 spec = transformer_spec.TransformerDecoderModelSpec.from_config( num_layers, num_heads, activation=common_spec.Activation.SWISH, pre_norm=True, ffn_glu=True, rms_norm=True, rotary_dim=0, rotary_interleave=False, rotary_scaling_type=rotary_scaling_type, rotary_scaling_factor=rotary_scaling_factor, rotary_base=getattr(model.config, "rope_theta", 10000), original_max_position_embeddings=original_max_position_embeddings, max_position_embeddings=max_position_embeddings, num_heads_kv=num_heads_kv, ) self.set_decoder(spec.decoder, model.model) self.set_linear(spec.decoder.projection, model.lm_head) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.bos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.unk_token def set_layer_norm(self, spec, layer_norm): spec.gamma = layer_norm.weight def set_rotary_embeddings( self, spec, rotary_scaling_long_factor, rotary_scaling_short_factor ): spec.rotary_scaling_long_factor = torch.tensor( rotary_scaling_long_factor, dtype=torch.float32 ) spec.rotary_scaling_short_factor = torch.tensor( rotary_scaling_short_factor, dtype=torch.float32 ) def set_decoder(self, spec, module): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.embed_tokens) self.set_layer_norm(spec.layer_norm, module.norm) for layer_spec, layer in zip(spec.layer, module.layers): self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.input_layernorm ) self.set_layer_norm( layer_spec.ffn.layer_norm, layer.post_attention_layernorm ) self.set_linear( layer_spec.self_attention.linear[0], layer.self_attn.qkv_proj ) self.set_linear(layer_spec.self_attention.linear[1], layer.self_attn.o_proj) if ( layer.self_attn.rotary_emb.long_factor is not None and layer.self_attn.rotary_emb.short_factor is not None ): self.set_rotary_embeddings( layer_spec.self_attention, layer.self_attn.rotary_emb.long_factor, layer.self_attn.rotary_emb.short_factor, ) gate_proj, up_proj = layer.mlp.gate_up_proj.weight.chunk(2, dim=0) layer_spec.ffn.linear_0.weight = gate_proj layer_spec.ffn.linear_0_noact.weight = up_proj self.set_linear(layer_spec.ffn.linear_1, layer.mlp.down_proj) delattr(layer, "self_attn") delattr(layer, "mlp") gc.collect() @register_loader("RWConfig") class RWLoader(ModelLoader): @property def architecture_name(self): return "AutoModelForCausalLM" def get_falcon_spec(self, model): self._num_layers = model.config.n_layer self._num_heads = model.config.n_head self._num_heads_kv = getattr(model.config, "n_head_kv", None) self._num_kv_attr = "num_kv" def get_model_spec(self, model): self.get_falcon_spec(model) if getattr(model.config, "multi_query", False): num_heads_kv = 1 else: num_heads_kv = self._num_heads_kv spec = transformer_spec.TransformerDecoderModelSpec.from_config( self._num_layers, self._num_heads, pre_norm=True, activation=common_spec.Activation.GELU, alibi=model.config.alibi, alibi_use_positive_positions=True, scale_alibi=True, rotary_dim=0 if model.config.rotary else None, rotary_interleave=False, parallel_residual=model.config.parallel_attn, shared_layer_norm=num_heads_kv == 1, num_heads_kv=num_heads_kv, ) self.set_decoder(spec.decoder, model.transformer) self.set_linear(spec.decoder.projection, model.lm_head) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.bos_token = tokenizer.eos_token config.eos_token = tokenizer.eos_token config.unk_token = tokenizer.eos_token def set_decoder(self, spec, module): spec.scale_embeddings = False self.set_embeddings(spec.embeddings, module.word_embeddings) self.set_layer_norm(spec.layer_norm, module.ln_f) for layer_spec, layer in zip(spec.layer, module.h): if hasattr(layer, "ln_attn"): self.set_layer_norm(layer_spec.input_layer_norm, layer.ln_attn) self.set_layer_norm(layer_spec.post_attention_layer_norm, layer.ln_mlp) elif hasattr(layer_spec, "shared_layer_norm"): self.set_layer_norm(layer_spec.shared_layer_norm, layer.input_layernorm) else: self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.input_layernorm ) self.set_layer_norm( layer_spec.ffn.layer_norm, layer.post_attention_layernorm ) num_kv = getattr(layer.self_attention, self._num_kv_attr) if num_kv == 1: self.set_linear( layer_spec.self_attention.linear[0], layer.self_attention.query_key_value, ) else: self.set_qkv_linear( layer_spec.self_attention.linear[0], layer.self_attention.query_key_value, layer.self_attention.num_heads, num_kv if num_kv < layer.self_attention.num_heads else None, ) self.set_linear( layer_spec.self_attention.linear[1], layer.self_attention.dense ) self.set_linear(layer_spec.ffn.linear_0, layer.mlp.dense_h_to_4h) self.set_linear(layer_spec.ffn.linear_1, layer.mlp.dense_4h_to_h) def set_qkv_linear(self, spec, module, num_heads, num_kv=None): weight = module.weight if num_kv is None: weight = weight.reshape(num_heads, 3, -1, weight.shape[-1]) weight = weight.transpose(0, 1) weight = weight.reshape(-1, weight.shape[-1]) else: head_dim = weight.shape[0] // (num_heads + num_kv * 2) weight = weight.reshape( -1, num_heads // num_kv + 2, head_dim, weight.shape[-1] ) q, k, v = weight.split([num_heads // num_kv, 1, 1], dim=1) weight = torch.cat( [ q.reshape(num_heads * head_dim, -1), k.reshape(num_kv * head_dim, -1), v.reshape(num_kv * head_dim, -1), ] ) spec.weight = weight if module.bias is not None: bias = module.bias if num_kv is None: bias = bias.reshape(num_heads, 3, -1) bias = bias.transpose(0, 1) bias = bias.reshape(-1) else: bias = bias.reshape(-1, num_heads // num_kv + 2, head_dim) q, k, v = bias.split([num_heads // num_kv, 1, 1], dim=1) bias = torch.cat( [ q.reshape(num_heads * head_dim), k.reshape(num_kv * head_dim), v.reshape(num_kv * head_dim), ] ) spec.bias = bias @register_loader("FalconConfig") class FalconLoader(RWLoader): def get_falcon_spec(self, model): self._num_layers = model.config.num_hidden_layers self._num_heads = model.config.num_attention_heads self._num_heads_kv = getattr(model.config, "num_kv_heads", None) self._num_kv_attr = "num_kv_heads" @register_loader("DistilBertConfig") class DistilBertLoader(ModelLoader): @property def architecture_name(self): return "DistilBertModel" def get_model_spec(self, model): encoder_spec = transformer_spec.TransformerEncoderSpec( model.config.n_layers, model.config.n_heads, pre_norm=False, activation=_SUPPORTED_ACTIVATIONS[model.config.activation], layernorm_embedding=True, ) spec = transformer_spec.TransformerEncoderModelSpec( encoder_spec, ) spec.encoder.scale_embeddings = False self.set_embeddings( spec.encoder.embeddings[0], model.embeddings.word_embeddings ) self.set_position_encodings( spec.encoder.position_encodings, model.embeddings.position_embeddings ) self.set_layer_norm( spec.encoder.layernorm_embedding, model.embeddings.LayerNorm ) for layer_spec, layer in zip(spec.encoder.layer, model.transformer.layer): split_layers = [common_spec.LinearSpec() for _ in range(3)] self.set_linear(split_layers[0], layer.attention.q_lin) self.set_linear(split_layers[1], layer.attention.k_lin) self.set_linear(split_layers[2], layer.attention.v_lin) utils.fuse_linear(layer_spec.self_attention.linear[0], split_layers) self.set_linear( layer_spec.self_attention.linear[1], layer.attention.out_lin ) self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.sa_layer_norm ) self.set_linear(layer_spec.ffn.linear_0, layer.ffn.lin1) self.set_linear(layer_spec.ffn.linear_1, layer.ffn.lin2) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.output_layer_norm) return spec def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.unk_token = tokenizer.unk_token config.layer_norm_epsilon = 1e-12 @register_loader("BertConfig") class BertLoader(ModelLoader): @property def architecture_name(self): return "BertModel" def get_model_spec(self, model): assert model.config.position_embedding_type == "absolute" encoder_spec = transformer_spec.TransformerEncoderSpec( model.config.num_hidden_layers, model.config.num_attention_heads, pre_norm=False, activation=_SUPPORTED_ACTIVATIONS[model.config.hidden_act], layernorm_embedding=True, num_source_embeddings=2, embeddings_merge=common_spec.EmbeddingsMerge.ADD, ) spec = transformer_spec.TransformerEncoderModelSpec( encoder_spec, pooling_layer=True, pooling_activation=common_spec.Activation.Tanh, ) spec.encoder.scale_embeddings = False self.set_embeddings( spec.encoder.embeddings[0], model.embeddings.word_embeddings ) self.set_embeddings( spec.encoder.embeddings[1], model.embeddings.token_type_embeddings ) self.set_position_encodings( spec.encoder.position_encodings, model.embeddings.position_embeddings ) self.set_layer_norm( spec.encoder.layernorm_embedding, model.embeddings.LayerNorm ) self.set_linear(spec.pooler_dense, model.pooler.dense) for layer_spec, layer in zip(spec.encoder.layer, model.encoder.layer): split_layers = [common_spec.LinearSpec() for _ in range(3)] self.set_linear(split_layers[0], layer.attention.self.query) self.set_linear(split_layers[1], layer.attention.self.key) self.set_linear(split_layers[2], layer.attention.self.value) utils.fuse_linear(layer_spec.self_attention.linear[0], split_layers) self.set_linear( layer_spec.self_attention.linear[1], layer.attention.output.dense ) self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.attention.output.LayerNorm ) self.set_linear(layer_spec.ffn.linear_0, layer.intermediate.dense) self.set_linear(layer_spec.ffn.linear_1, layer.output.dense) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.output.LayerNorm) return spec def get_vocabulary(self, model, tokenizer): tokens = super().get_vocabulary(model, tokenizer) extra_ids = model.config.vocab_size - len(tokens) for i in range(extra_ids): tokens.append("" % i) return tokens def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.unk_token = tokenizer.unk_token config.layer_norm_epsilon = model.config.layer_norm_eps @register_loader("XLMRobertaConfig") class XLMRobertaLoader(ModelLoader): @property def architecture_name(self): return "XLMRobertaForSequenceClassification" def get_model_spec(self, model): assert model.config.position_embedding_type == "absolute" encoder_spec = transformer_spec.TransformerEncoderSpec( model.config.num_hidden_layers, model.config.num_attention_heads, pre_norm=False, activation=_SUPPORTED_ACTIVATIONS[model.config.hidden_act], layernorm_embedding=True, num_source_embeddings=2, embeddings_merge=common_spec.EmbeddingsMerge.ADD, ) if model.roberta.pooler is None: pooling_layer = False else: pooling_layer = True spec = transformer_spec.TransformerEncoderModelSpec( encoder_spec, pooling_layer=pooling_layer, pooling_activation=common_spec.Activation.Tanh, ) spec.encoder.scale_embeddings = False self.set_embeddings( spec.encoder.embeddings[0], model.roberta.embeddings.word_embeddings ) self.set_embeddings( spec.encoder.embeddings[1], model.roberta.embeddings.token_type_embeddings ) self.set_position_encodings( spec.encoder.position_encodings, model.roberta.embeddings.position_embeddings, ) self.set_layer_norm( spec.encoder.layernorm_embedding, model.roberta.embeddings.LayerNorm ) if pooling_layer: self.set_linear(spec.pooler_dense, model.roberta.pooler.dense) for layer_spec, layer in zip(spec.encoder.layer, model.roberta.encoder.layer): split_layers = [common_spec.LinearSpec() for _ in range(3)] self.set_linear(split_layers[0], layer.attention.self.query) self.set_linear(split_layers[1], layer.attention.self.key) self.set_linear(split_layers[2], layer.attention.self.value) utils.fuse_linear(layer_spec.self_attention.linear[0], split_layers) self.set_linear( layer_spec.self_attention.linear[1], layer.attention.output.dense ) self.set_layer_norm( layer_spec.self_attention.layer_norm, layer.attention.output.LayerNorm ) self.set_linear(layer_spec.ffn.linear_0, layer.intermediate.dense) self.set_linear(layer_spec.ffn.linear_1, layer.output.dense) self.set_layer_norm(layer_spec.ffn.layer_norm, layer.output.LayerNorm) return spec def set_vocabulary(self, spec, tokens): spec.register_vocabulary(tokens) def set_config(self, config, model, tokenizer): config.unk_token = tokenizer.unk_token config.layer_norm_epsilon = model.config.layer_norm_eps def set_position_encodings(self, spec, module): spec.encodings = module.weight offset = getattr(module, "padding_idx", 0) if offset > 0: spec.encodings = spec.encodings[offset + 1 :] def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument( "--model", required=True, help=( "Name of the pretrained model to download, " "or path to a directory containing the pretrained model." ), ) parser.add_argument( "--activation_scales", help=( "Path to the pre-computed activation scales. Models may " "use them to rescale some weights to smooth the intermediate activations " "and improve the quantization accuracy. See " "https://github.com/mit-han-lab/smoothquant." ), ) parser.add_argument( "--copy_files", nargs="+", help=( "List of filenames to copy from the Hugging Face model to the converted " "model directory." ), ) parser.add_argument( "--revision", help="Revision of the model to download from the Hugging Face Hub.", ) parser.add_argument( "--low_cpu_mem_usage", action="store_true", help="Enable the flag low_cpu_mem_usage when loading the model with from_pretrained.", ) parser.add_argument( "--trust_remote_code", action="store_true", help="Allow converting models using custom code.", ) Converter.declare_arguments(parser) args = parser.parse_args() converter = TransformersConverter( args.model, activation_scales=args.activation_scales, copy_files=args.copy_files, load_as_float16=args.quantization in ("float16", "int8_float16"), revision=args.revision, low_cpu_mem_usage=args.low_cpu_mem_usage, trust_remote_code=args.trust_remote_code, ) converter.convert_from_args(args) if __name__ == "__main__": main() # Cross-attention heads that are highly correlated to the word-level timing, # i.e. the alignment between audio and text tokens. # Obtained from https://github.com/openai/whisper/blob/v20231106/whisper/__init__.py#L32-L47 _WHISPER_ALIGNMENT_HEADS = { "openai/whisper-tiny.en": [ (1, 0), (2, 0), (2, 5), (3, 0), (3, 1), (3, 2), (3, 3), (3, 4), ], "openai/whisper-tiny": [(2, 2), (3, 0), (3, 2), (3, 3), (3, 4), (3, 5)], "openai/whisper-base.en": [(3, 3), (4, 7), (5, 1), (5, 5), (5, 7)], "openai/whisper-base": [ (3, 1), (4, 2), (4, 3), (4, 7), (5, 1), (5, 2), (5, 4), (5, 6), ], "openai/whisper-small.en": [ (6, 6), (7, 0), (7, 3), (7, 8), (8, 2), (8, 5), (8, 7), (9, 0), (9, 4), (9, 8), (9, 10), (10, 0), (10, 1), (10, 2), (10, 3), (10, 6), (10, 11), (11, 2), (11, 4), ], "openai/whisper-small": [ (5, 3), (5, 9), (8, 0), (8, 4), (8, 7), (8, 8), (9, 0), (9, 7), (9, 9), (10, 5), ], "openai/whisper-medium.en": [ (11, 4), (14, 1), (14, 12), (14, 14), (15, 4), (16, 0), (16, 4), (16, 9), (17, 12), (17, 14), (18, 7), (18, 10), (18, 15), (20, 0), (20, 3), (20, 9), (20, 14), (21, 12), ], "openai/whisper-medium": [(13, 15), (15, 4), (15, 15), (16, 1), (20, 0), (23, 4)], "openai/whisper-large": [ (9, 19), (11, 2), (11, 4), (11, 17), (22, 7), (22, 11), (22, 17), (23, 2), (23, 15), ], "openai/whisper-large-v2": [ (10, 12), (13, 17), (16, 11), (16, 12), (16, 13), (17, 15), (17, 16), (18, 4), (18, 11), (18, 19), (19, 11), (21, 2), (21, 3), (22, 3), (22, 9), (22, 12), (23, 5), (23, 7), (23, 13), (25, 5), (26, 1), (26, 12), (27, 15), ], "openai/whisper-large-v3": [ (7, 0), (10, 17), (12, 18), (13, 12), (16, 1), (17, 14), (19, 11), (21, 4), (24, 1), (25, 6), ], }