|
|
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import math |
|
from dataclasses import dataclass |
|
from transformers import PreTrainedModel, PretrainedConfig |
|
from einops import rearrange, repeat |
|
from functools import partial |
|
from typing import Optional, Tuple |
|
from transformers.modeling_outputs import ModelOutput |
|
|
|
|
|
class ESMplusplusConfig(PretrainedConfig): |
|
model_type = "ESMplusplus" |
|
def __init__( |
|
self, |
|
vocab_size: int = 64, |
|
hidden_size: int = 960, |
|
num_attention_heads: int = 15, |
|
num_hidden_layers: int = 30, |
|
num_labels: int = 2, |
|
problem_type: str | None = None, |
|
**kwargs, |
|
): |
|
super().__init__(**kwargs) |
|
self.vocab_size = vocab_size |
|
self.hidden_size = hidden_size |
|
self.num_attention_heads = num_attention_heads |
|
self.num_hidden_layers = num_hidden_layers |
|
self.num_labels = num_labels |
|
self.problem_type = problem_type |
|
|
|
|
|
|
|
|
|
|
|
|
|
def rotate_half(x, interleaved=False): |
|
if not interleaved: |
|
x1, x2 = x.chunk(2, dim=-1) |
|
return torch.cat((-x2, x1), dim=-1) |
|
else: |
|
x1, x2 = x[..., ::2], x[..., 1::2] |
|
return rearrange( |
|
torch.stack((-x2, x1), dim=-1), "... d two -> ... (d two)", two=2 |
|
) |
|
|
|
|
|
def apply_rotary_emb_torch(x, cos, sin, interleaved=False, _inplace=False): |
|
""" |
|
x: (batch_size, seqlen, nheads, headdim) |
|
cos, sin: (seqlen, rotary_dim / 2) |
|
""" |
|
ro_dim = cos.shape[-1] * 2 |
|
assert ro_dim <= x.shape[-1] |
|
seqlen = x.size(1) |
|
cos = cos[:seqlen] |
|
sin = sin[:seqlen] |
|
cos = repeat(cos, "s d -> s 1 (2 d)") |
|
sin = repeat(sin, "s d -> s 1 (2 d)") |
|
return torch.cat( |
|
[ |
|
x[..., :ro_dim] * cos + rotate_half(x[..., :ro_dim], interleaved) * sin, |
|
x[..., ro_dim:], |
|
], |
|
dim=-1, |
|
) |
|
|
|
|
|
class RotaryEmbedding(torch.nn.Module): |
|
def __init__( |
|
self, |
|
dim: int, |
|
base=10000.0, |
|
interleaved=False, |
|
scale_base=None, |
|
scaling_factor=1.0, |
|
pos_idx_in_fp32=True, |
|
device=None, |
|
): |
|
super().__init__() |
|
self.dim = dim |
|
self.base = float(base) |
|
self.pos_idx_in_fp32 = pos_idx_in_fp32 |
|
|
|
self.interleaved = interleaved |
|
self.scale_base = scale_base |
|
self.scaling_factor = scaling_factor |
|
self.device = device |
|
|
|
self._seq_len_cached = 0 |
|
self._cos_cached = None |
|
self._sin_cached = None |
|
self._cos_k_cached = None |
|
self._sin_k_cached = None |
|
self.reset_parameters() |
|
|
|
def reset_parameters(self): |
|
inv_freq = self._compute_inv_freq(self.device) |
|
self.register_buffer("inv_freq", inv_freq, persistent=False) |
|
arange = torch.arange(0, self.dim, 2, device=self.device, dtype=torch.float32) |
|
scale = ( |
|
(arange + 0.4 * self.dim) / (1.4 * self.dim) |
|
if self.scale_base is not None |
|
else None |
|
) |
|
self.register_buffer("scale", scale) |
|
|
|
def _compute_inv_freq(self, device=None): |
|
return 1 / ( |
|
self.base |
|
** ( |
|
torch.arange(0, self.dim, 2, device=device, dtype=torch.float32) |
|
/ self.dim |
|
) |
|
) |
|
|
|
def _update_cos_sin_cache(self, seqlen, device=None, dtype=None): |
|
if ( |
|
seqlen > self._seq_len_cached |
|
or self._cos_cached is None |
|
or self._cos_cached.device != device |
|
or self._cos_cached.dtype != dtype |
|
or (self.training and self._cos_cached.is_inference()) |
|
): |
|
self._seq_len_cached = seqlen |
|
if self.pos_idx_in_fp32: |
|
t = torch.arange(seqlen, device=device, dtype=torch.float32) |
|
t /= self.scaling_factor |
|
if self.inv_freq.dtype != torch.float32: |
|
inv_freq = self.inv_freq.to(torch.float32) |
|
else: |
|
inv_freq = self.inv_freq |
|
else: |
|
t = torch.arange(seqlen, device=device, dtype=self.inv_freq.dtype) |
|
t /= self.scaling_factor |
|
inv_freq = self.inv_freq |
|
freqs = torch.outer(t, inv_freq) |
|
|
|
if self.scale is None: |
|
self._cos_cached = torch.cos(freqs).to(dtype) |
|
self._sin_cached = torch.sin(freqs).to(dtype) |
|
else: |
|
power = ( |
|
torch.arange( |
|
seqlen, dtype=self.scale.dtype, device=self.scale.device |
|
) |
|
- seqlen // 2 |
|
) / self.scale_base |
|
scale = self.scale.to(device=power.device) ** power.unsqueeze(-1) |
|
self._cos_cached = (torch.cos(freqs) * scale).to(dtype) |
|
self._sin_cached = (torch.sin(freqs) * scale).to(dtype) |
|
self._cos_k_cached = (torch.cos(freqs) / scale).to(dtype) |
|
self._sin_k_cached = (torch.sin(freqs) / scale).to(dtype) |
|
|
|
def forward(self, q: torch.Tensor, k: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
|
""" |
|
q: (batch, seqlen, nheads, headdim) |
|
k: (batch, seqlen, nheads, headdim) |
|
""" |
|
self._update_cos_sin_cache(q.shape[1], device=q.device, dtype=q.dtype) |
|
assert self._cos_cached is not None |
|
assert self._sin_cached is not None |
|
if self.scale is None: |
|
return ( |
|
apply_rotary_emb_torch( |
|
q, |
|
self._cos_cached, |
|
self._sin_cached, |
|
self.interleaved, |
|
True, |
|
), |
|
apply_rotary_emb_torch( |
|
k, |
|
self._cos_cached, |
|
self._sin_cached, |
|
self.interleaved, |
|
True, |
|
), |
|
) |
|
else: |
|
assert False |
|
|
|
|
|
|
|
def swiglu_correction_fn(expansion_ratio: float, d_model: int) -> int: |
|
return int(((expansion_ratio * d_model) + 255) // 256 * 256) |
|
|
|
|
|
class SwiGLU(nn.Module): |
|
def __init__(self): |
|
super(SwiGLU, self).__init__() |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
x1, x2 = x.chunk(2, dim=-1) |
|
return F.silu(x1) * x2 |
|
|
|
|
|
def swiglu_ln_ffn(d_model: int, expansion_ratio: float): |
|
return nn.Sequential( |
|
nn.LayerNorm(d_model), |
|
nn.Linear( |
|
d_model, swiglu_correction_fn(expansion_ratio, d_model) * 2, bias=False |
|
), |
|
SwiGLU(), |
|
nn.Linear(swiglu_correction_fn(expansion_ratio, d_model), d_model, bias=False), |
|
) |
|
|
|
|
|
|
|
class MultiHeadAttention(nn.Module): |
|
def __init__(self, d_model: int, n_heads: int): |
|
super().__init__() |
|
self.d_model = d_model |
|
self.n_heads = n_heads |
|
self.d_head = self.d_model // self.n_heads |
|
self.layernorm_qkv = nn.Sequential( |
|
nn.LayerNorm(d_model), nn.Linear(d_model, d_model * 3, bias=False) |
|
) |
|
self.out_proj = nn.Linear(d_model, d_model, bias=False) |
|
self.q_ln = nn.LayerNorm(d_model, bias=False) |
|
self.k_ln = nn.LayerNorm(d_model, bias=False) |
|
self.reshaper = partial(rearrange, pattern="b s (h d) -> b h s d", h=n_heads) |
|
self.rotary = RotaryEmbedding(d_model // n_heads) |
|
|
|
def _apply_rotary(self, q: torch.Tensor, k: torch.Tensor): |
|
q = q.unflatten(-1, (self.n_heads, self.d_head)) |
|
k = k.unflatten(-1, (self.n_heads, self.d_head)) |
|
q, k = self.rotary(q, k) |
|
q = q.flatten(-2, -1) |
|
k = k.flatten(-2, -1) |
|
return q, k |
|
|
|
def forward(self, x, attention_mask=None): |
|
qkv_BLD3 = self.layernorm_qkv(x) |
|
query_BLD, key_BLD, value_BLD = torch.chunk(qkv_BLD3, 3, dim=-1) |
|
query_BLD, key_BLD = ( |
|
self.q_ln(query_BLD).to(query_BLD.dtype), |
|
self.k_ln(key_BLD).to(query_BLD.dtype), |
|
) |
|
query_BLD, key_BLD = self._apply_rotary(query_BLD, key_BLD) |
|
query_BHLD, key_BHLD, value_BHLD = map(self.reshaper, (query_BLD, key_BLD, value_BLD)) |
|
context_BHLD = F.scaled_dot_product_attention( |
|
query_BHLD, key_BHLD, value_BHLD, attention_mask |
|
) |
|
context_BLD = rearrange(context_BHLD, "b h s d -> b s (h d)") |
|
return self.out_proj(context_BLD) |
|
|
|
|
|
|
|
def RegressionHead( |
|
d_model: int, output_dim: int, hidden_dim: int | None = None |
|
) -> nn.Module: |
|
hidden_dim = hidden_dim if hidden_dim is not None else d_model |
|
return nn.Sequential( |
|
nn.Linear(d_model, hidden_dim), |
|
nn.GELU(), |
|
nn.LayerNorm(hidden_dim), |
|
nn.Linear(hidden_dim, output_dim), |
|
) |
|
|
|
|
|
|
|
class UnifiedTransformerBlock(nn.Module): |
|
def __init__( |
|
self, |
|
d_model: int, |
|
n_heads: int, |
|
residue_scaling_factor: float = 1, |
|
expansion_ratio: float = 8 / 3, |
|
): |
|
super().__init__() |
|
self.attn = MultiHeadAttention(d_model, n_heads) |
|
self.ffn = swiglu_ln_ffn(d_model, expansion_ratio) |
|
self.scaling_factor = residue_scaling_factor |
|
|
|
def forward( |
|
self, |
|
x: torch.Tensor, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
) -> torch.Tensor: |
|
r1 = self.attn(x, attention_mask) |
|
x = x + r1 / self.scaling_factor |
|
r3 = self.ffn(x) / self.scaling_factor |
|
x = x + r3 |
|
return x |
|
|
|
|
|
|
|
@dataclass |
|
class TransformerOutput(ModelOutput): |
|
last_hidden_state: torch.Tensor | None = None |
|
hidden_states: tuple[torch.Tensor] | None = None |
|
|
|
|
|
@dataclass |
|
class ESMplusplusOutput(ModelOutput): |
|
loss: torch.Tensor | None = None |
|
logits: torch.Tensor | None = None |
|
last_hidden_state: torch.Tensor | None = None |
|
hidden_states: tuple[torch.Tensor] | None = None |
|
|
|
|
|
|
|
class TransformerStack(nn.Module): |
|
def __init__( |
|
self, |
|
d_model: int, |
|
n_heads: int, |
|
n_layers: int, |
|
): |
|
super().__init__() |
|
self.blocks = nn.ModuleList( |
|
[ |
|
UnifiedTransformerBlock( |
|
d_model, |
|
n_heads, |
|
residue_scaling_factor=math.sqrt(n_layers / 36), |
|
) |
|
for i in range(n_layers) |
|
] |
|
) |
|
self.norm = nn.LayerNorm(d_model, bias=False) |
|
|
|
def forward( |
|
self, |
|
x: torch.Tensor, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
output_hidden_states: bool = False, |
|
) -> TransformerOutput: |
|
batch_size, seq_len, _ = x.shape |
|
hidden_states = () |
|
if attention_mask is not None: |
|
attention_mask = attention_mask[:, None, None, :].expand(batch_size, 1, seq_len, seq_len).bool() |
|
for block in self.blocks: |
|
x = block(x, attention_mask) |
|
if output_hidden_states: |
|
hidden_states += (x,) |
|
return TransformerOutput(last_hidden_state=self.norm(x), hidden_states=hidden_states) |
|
|
|
|
|
|
|
class ESMplusplusForMaskedLM(PreTrainedModel): |
|
""" |
|
ESM++ for masked language modeling. |
|
""" |
|
config_class = ESMplusplusConfig |
|
def __init__(self, config: ESMplusplusConfig): |
|
super().__init__(config) |
|
self.config = config |
|
self.vocab_size = config.vocab_size |
|
self.embed = nn.Embedding(self.vocab_size, config.hidden_size) |
|
self.transformer = TransformerStack(config.hidden_size, config.num_attention_heads, config.num_hidden_layers) |
|
self.sequence_head = RegressionHead(config.hidden_size, self.vocab_size) |
|
self.ce_loss = nn.CrossEntropyLoss() |
|
self.tokenizer = EsmSequenceTokenizer() |
|
|
|
@classmethod |
|
def from_pretrained_esm(cls, model_name: str): |
|
if '300' in model_name: |
|
return ESMplusplus_300M() |
|
elif '600' in model_name: |
|
return ESMplusplus_600M() |
|
else: |
|
raise ValueError(f"Invalid model name: {model_name}") |
|
|
|
@property |
|
def device(self): |
|
return next(self.parameters()).device |
|
|
|
def forward( |
|
self, |
|
input_ids: torch.Tensor | None = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
labels: Optional[torch.Tensor] = None, |
|
output_hidden_states: bool = False, |
|
) -> ESMplusplusOutput: |
|
x = self.embed(input_ids) |
|
output = self.transformer(x, attention_mask, output_hidden_states) |
|
x = output.last_hidden_state |
|
logits = self.sequence_head(x) |
|
loss = None |
|
if labels is not None: |
|
loss = self.ce_loss(logits.view(-1, self.vocab_size), labels.view(-1)) |
|
return ESMplusplusOutput( |
|
loss=loss, |
|
logits=logits, |
|
last_hidden_state=x, |
|
hidden_states=output.hidden_states, |
|
) |
|
|
|
|
|
class ESMplusplusForSequenceClassification(ESMplusplusForMaskedLM): |
|
""" |
|
ESM++ for sequence classification. |
|
""" |
|
def __init__(self, config: ESMplusplusConfig): |
|
super().__init__(config) |
|
self.config = config |
|
self.classifier = RegressionHead(config.hidden_size * 2, config.num_labels, config.hidden_size * 4) |
|
|
|
self.mse = nn.MSELoss() |
|
self.ce = nn.CrossEntropyLoss() |
|
self.bce = nn.BCEWithLogitsLoss() |
|
|
|
def mean_pooling(self, x: torch.Tensor, attention_mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
|
|
|
|
|
if attention_mask is None: |
|
return x.mean(dim=1) |
|
else: |
|
attention_mask = attention_mask.unsqueeze(-1) |
|
return (x * attention_mask).sum(dim=1) / attention_mask.sum(dim=1) |
|
|
|
def forward( |
|
self, |
|
input_ids: torch.Tensor | None = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
labels: Optional[torch.Tensor] = None, |
|
output_hidden_states: bool = False, |
|
) -> ESMplusplusOutput: |
|
output = super().forward(input_ids, attention_mask, labels, output_hidden_states) |
|
x = output.last_hidden_state |
|
cls_features = x[:, 0, :] |
|
mean_features = self.mean_pooling(x, attention_mask) |
|
|
|
features = torch.cat([cls_features, mean_features], dim=-1) |
|
logits = self.classifier(features) |
|
loss = None |
|
if labels is not None: |
|
labels = labels.to(logits.device) |
|
if self.config.problem_type is None: |
|
if self.num_labels == 1: |
|
self.config.problem_type = "regression" |
|
elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): |
|
self.config.problem_type = "single_label_classification" |
|
else: |
|
self.config.problem_type = "multi_label_classification" |
|
|
|
if self.config.problem_type == "regression": |
|
if self.num_labels == 1: |
|
loss = self.mse(logits.flatten(), labels.flatten()) |
|
else: |
|
loss = self.mse(logits, labels) |
|
elif self.config.problem_type == "single_label_classification": |
|
loss = self.ce(logits.view(-1, self.num_labels), labels.view(-1)) |
|
elif self.config.problem_type == "multi_label_classification": |
|
loss = self.bce(logits, labels) |
|
return ESMplusplusOutput( |
|
loss=loss, |
|
logits=logits, |
|
last_hidden_state=x, |
|
hidden_states=output.hidden_states, |
|
) |
|
|
|
|
|
class ESMplusplusForTokenClassification(ESMplusplusForMaskedLM): |
|
""" |
|
ESM++ for token classification. |
|
""" |
|
def __init__(self, config: ESMplusplusConfig): |
|
super().__init__(config) |
|
self.config = config |
|
self.num_labels = config.num_labels |
|
self.classifier = RegressionHead(config.hidden_size, config.num_labels, config.hidden_size * 4) |
|
|
|
self.loss_fct = nn.CrossEntropyLoss() |
|
|
|
def forward( |
|
self, |
|
input_ids: torch.Tensor | None = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
labels: Optional[torch.Tensor] = None, |
|
output_hidden_states: bool = False, |
|
) -> ESMplusplusOutput: |
|
output = super().forward(input_ids, attention_mask, labels, output_hidden_states) |
|
x = output.last_hidden_state |
|
logits = self.classifier(x) |
|
loss = None |
|
if labels is not None: |
|
loss = self.loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) |
|
return ESMplusplusOutput( |
|
loss=loss, |
|
logits=logits, |
|
last_hidden_state=x, |
|
hidden_states=output.hidden_states, |
|
) |
|
|
|
|
|
|
|
import os |
|
from functools import cache |
|
from pathlib import Path |
|
from huggingface_hub import snapshot_download |
|
|
|
|
|
@staticmethod |
|
@cache |
|
def data_root(model: str): |
|
if "INFRA_PROVIDER" in os.environ: |
|
return Path("") |
|
|
|
if model.startswith("esmc-300"): |
|
path = Path(snapshot_download(repo_id="EvolutionaryScale/esmc-300m-2024-12")) |
|
elif model.startswith("esmc-600"): |
|
path = Path(snapshot_download(repo_id="EvolutionaryScale/esmc-600m-2024-12")) |
|
else: |
|
raise ValueError(f"{model=} is an invalid model name.") |
|
return path |
|
|
|
|
|
def ESMplusplus_300M(device: torch.device | str = "cpu"): |
|
with torch.device(device): |
|
config = ESMplusplusConfig( |
|
hidden_size=960, |
|
num_attention_heads=15, |
|
num_hidden_layers=30, |
|
) |
|
model = ESMplusplusForMaskedLM(config) |
|
state_dict = torch.load( |
|
data_root("esmc-300") / "data/weights/esmc_300m_2024_12_v0.pth", |
|
map_location=device, |
|
) |
|
model.load_state_dict(state_dict) |
|
return model |
|
|
|
|
|
def ESMplusplus_600M(device: torch.device | str = "cpu"): |
|
with torch.device(device): |
|
config = ESMplusplusConfig( |
|
hidden_size=1152, |
|
num_attention_heads=18, |
|
num_hidden_layers=36, |
|
) |
|
model = ESMplusplusForMaskedLM(config) |
|
state_dict = torch.load( |
|
data_root("esmc-600") / "data/weights/esmc_600m_2024_12_v0.pth", |
|
map_location=device, |
|
) |
|
model.load_state_dict(state_dict) |
|
return model |
|
|
|
|
|
|
|
from tokenizers import Tokenizer |
|
from tokenizers.models import BPE |
|
from tokenizers.processors import TemplateProcessing |
|
from transformers import PreTrainedTokenizerFast |
|
|
|
|
|
SEQUENCE_VOCAB = [ |
|
"<cls>", "<pad>", "<eos>", "<unk>", |
|
"L", "A", "G", "V", "S", "E", "R", "T", "I", "D", "P", "K", |
|
"Q", "N", "F", "Y", "M", "H", "W", "C", "X", "B", "U", "Z", |
|
"O", ".", "-", "|", |
|
"<mask>", |
|
] |
|
|
|
class EsmSequenceTokenizer(PreTrainedTokenizerFast): |
|
model_input_names = ["input_ids", "attention_mask"] |
|
|
|
def __init__( |
|
self, |
|
unk_token="<unk>", |
|
cls_token="<cls>", |
|
pad_token="<pad>", |
|
mask_token="<mask>", |
|
eos_token="<eos>", |
|
chain_break_token="|", |
|
**kwargs, |
|
): |
|
all_tokens = SEQUENCE_VOCAB |
|
token_to_id = {tok: ind for ind, tok in enumerate(all_tokens)} |
|
|
|
|
|
bpe = BPE(token_to_id, merges=[], unk_token=unk_token) |
|
tokenizer = Tokenizer(bpe) |
|
special_tokens = [ |
|
cls_token, |
|
pad_token, |
|
mask_token, |
|
eos_token, |
|
chain_break_token, |
|
] |
|
self.cb_token = chain_break_token |
|
additional_special_tokens = [chain_break_token] |
|
|
|
tokenizer.add_special_tokens(special_tokens) |
|
|
|
|
|
|
|
|
|
tokenizer.post_processor = TemplateProcessing( |
|
single="<cls> $A <eos>", |
|
special_tokens=[ |
|
("<cls>", tokenizer.token_to_id("<cls>")), |
|
("<eos>", tokenizer.token_to_id("<eos>")), |
|
], |
|
) |
|
super().__init__( |
|
tokenizer_object=tokenizer, |
|
unk_token=unk_token, |
|
cls_token=cls_token, |
|
pad_token=pad_token, |
|
mask_token=mask_token, |
|
eos_token=eos_token, |
|
additional_special_tokens=additional_special_tokens, |
|
**kwargs, |
|
) |
|
|
|
|
|
@property |
|
def bos_token(self): |
|
return self.cls_token |
|
|
|
@property |
|
def bos_token_id(self): |
|
return self.cls_token_id |
|
|
|
@property |
|
def chain_break_token(self): |
|
return self.cb_token |
|
|
|
@property |
|
def chain_break_token_id(self): |
|
return self.convert_tokens_to_ids(self.chain_break_token) |
|
|
|
@property |
|
def all_token_ids(self): |
|
return list(range(self.vocab_size)) |
|
|
|
@property |
|
def special_token_ids(self): |
|
return self.all_special_ids |
|
|