import pytorch_lightning as pl import torch from src.const import ZINC_TRAIN_LINKER_ID2SIZE, ZINC_TRAIN_LINKER_SIZE2ID from src.linker_size import SizeGNN from src.egnn import coord2diff from src.datasets import ZincDataset, get_dataloader, collate_with_fragment_edges from typing import Dict, List, Optional from torch.nn.functional import cross_entropy, mse_loss, sigmoid from pdb import set_trace class SizeClassifier(pl.LightningModule): train_dataset = None val_dataset = None test_dataset = None metrics: Dict[str, List[float]] = {} def __init__( self, data_path, train_data_prefix, val_data_prefix, in_node_nf, hidden_nf, out_node_nf, n_layers, batch_size, lr, torch_device, normalization=None, loss_weights=None, min_linker_size=None, linker_size2id=ZINC_TRAIN_LINKER_SIZE2ID, linker_id2size=ZINC_TRAIN_LINKER_ID2SIZE, task='classification', ): super(SizeClassifier, self).__init__() self.save_hyperparameters() self.data_path = data_path self.train_data_prefix = train_data_prefix self.val_data_prefix = val_data_prefix self.min_linker_size = min_linker_size self.linker_size2id = linker_size2id self.linker_id2size = linker_id2size self.batch_size = batch_size self.lr = lr self.torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.loss_weights = None if loss_weights is None else torch.tensor(loss_weights, device=self.torch_device) self.gnn = SizeGNN( in_node_nf=in_node_nf, hidden_nf=hidden_nf, out_node_nf=out_node_nf, n_layers=n_layers, device=self.torch_device, normalization=normalization, ) def setup(self, stage: Optional[str] = None): if stage == 'fit': self.train_dataset = ZincDataset( data_path=self.data_path, prefix=self.train_data_prefix, device=self.torch_device ) self.val_dataset = ZincDataset( data_path=self.data_path, prefix=self.val_data_prefix, device=self.torch_device ) elif stage == 'val': self.val_dataset = ZincDataset( data_path=self.data_path, prefix=self.val_data_prefix, device=self.torch_device ) else: raise NotImplementedError def train_dataloader(self): return get_dataloader(self.train_dataset, self.batch_size, collate_fn=collate_with_fragment_edges, shuffle=True) def val_dataloader(self): return get_dataloader(self.val_dataset, self.batch_size, collate_fn=collate_with_fragment_edges) def test_dataloader(self): return get_dataloader(self.test_dataset, self.batch_size, collate_fn=collate_with_fragment_edges) def forward(self, data): h = data['one_hot'] x = data['positions'] fragment_mask = data['fragment_mask'] linker_mask = data['linker_mask'] edge_mask = data['edge_mask'] edges = data['edges'] # Considering only fragments x = x * fragment_mask h = h * fragment_mask # Reshaping bs, n_nodes = x.shape[0], x.shape[1] fragment_mask = fragment_mask.view(bs * n_nodes, 1) x = x.view(bs * n_nodes, -1) h = h.view(bs * n_nodes, -1) # Prediction distances, _ = coord2diff(x, edges) distance_edge_mask = (edge_mask.bool() & (distances < 6)).long() output = self.gnn.forward(h, edges, distances, fragment_mask, distance_edge_mask) output = output.view(bs, n_nodes, -1).mean(1) true = self.get_true_labels(linker_mask) loss = cross_entropy(output, true, weight=self.loss_weights) return output, loss def get_true_labels(self, linker_mask): labels = [] sizes = linker_mask.squeeze().sum(-1).long().detach().cpu().numpy() for size in sizes: label = self.linker_size2id.get(size) if label is None: label = self.linker_size2id[max(self.linker_id2size)] labels.append(label) labels = torch.tensor(labels, device=linker_mask.device, dtype=torch.long) return labels def training_step(self, data, *args): _, loss = self.forward(data) return {'loss': loss} def validation_step(self, data, *args): _, loss = self.forward(data) return {'loss': loss} def test_step(self, data, *args): loss = self.forward(data) return {'loss': loss} def training_epoch_end(self, training_step_outputs): for metric in training_step_outputs[0].keys(): avg_metric = self.aggregate_metric(training_step_outputs, metric) self.metrics.setdefault(f'{metric}/train', []).append(avg_metric) self.log(f'{metric}/train', avg_metric, prog_bar=True) def validation_epoch_end(self, validation_step_outputs): for metric in validation_step_outputs[0].keys(): avg_metric = self.aggregate_metric(validation_step_outputs, metric) self.metrics.setdefault(f'{metric}/val', []).append(avg_metric) self.log(f'{metric}/val', avg_metric, prog_bar=True) correct = 0 total = 0 for data in self.val_dataloader(): output, _ = self.forward(data) pred = output.argmax(dim=-1) true = self.get_true_labels(data['linker_mask']) correct += (pred == true).sum() total += len(pred) accuracy = correct / total self.metrics.setdefault(f'accuracy/val', []).append(accuracy) self.log(f'accuracy/val', accuracy, prog_bar=True) def configure_optimizers(self): return torch.optim.AdamW(self.gnn.parameters(), lr=self.lr, amsgrad=True, weight_decay=1e-12) @staticmethod def aggregate_metric(step_outputs, metric): return torch.tensor([out[metric] for out in step_outputs]).mean() class SizeOrdinalClassifier(pl.LightningModule): train_dataset = None val_dataset = None test_dataset = None metrics: Dict[str, List[float]] = {} def __init__( self, data_path, train_data_prefix, val_data_prefix, in_node_nf, hidden_nf, out_node_nf, n_layers, batch_size, lr, torch_device, normalization=None, min_linker_size=None, linker_size2id=ZINC_TRAIN_LINKER_SIZE2ID, linker_id2size=ZINC_TRAIN_LINKER_ID2SIZE, task='ordinal', ): super(SizeOrdinalClassifier, self).__init__() self.save_hyperparameters() self.data_path = data_path self.train_data_prefix = train_data_prefix self.val_data_prefix = val_data_prefix self.min_linker_size = min_linker_size self.batch_size = batch_size self.lr = lr self.torch_device = torch_device self.linker_size2id = linker_size2id self.linker_id2size = linker_id2size self.gnn = SizeGNN( in_node_nf=in_node_nf, hidden_nf=hidden_nf, out_node_nf=out_node_nf, n_layers=n_layers, device=torch_device, normalization=normalization, ) def setup(self, stage: Optional[str] = None): if stage == 'fit': self.train_dataset = ZincDataset( data_path=self.data_path, prefix=self.train_data_prefix, device=self.torch_device ) self.val_dataset = ZincDataset( data_path=self.data_path, prefix=self.val_data_prefix, device=self.torch_device ) elif stage == 'val': self.val_dataset = ZincDataset( data_path=self.data_path, prefix=self.val_data_prefix, device=self.torch_device ) else: raise NotImplementedError def train_dataloader(self): return get_dataloader(self.train_dataset, self.batch_size, collate_fn=collate_with_fragment_edges, shuffle=True) def val_dataloader(self): return get_dataloader(self.val_dataset, self.batch_size, collate_fn=collate_with_fragment_edges) def test_dataloader(self): return get_dataloader(self.test_dataset, self.batch_size, collate_fn=collate_with_fragment_edges) def forward(self, data): h = data['one_hot'] x = data['positions'] fragment_mask = data['fragment_mask'] linker_mask = data['linker_mask'] edge_mask = data['edge_mask'] edges = data['edges'] # Considering only fragments x = x * fragment_mask h = h * fragment_mask # Reshaping bs, n_nodes = x.shape[0], x.shape[1] fragment_mask = fragment_mask.view(bs * n_nodes, 1) x = x.view(bs * n_nodes, -1) h = h.view(bs * n_nodes, -1) # Prediction distances, _ = coord2diff(x, edges) distance_edge_mask = (edge_mask.bool() & (distances < 6)).long() output = self.gnn.forward(h, edges, distances, fragment_mask, distance_edge_mask) output = output.view(bs, n_nodes, -1).mean(1) output = sigmoid(output) true = self.get_true_labels(linker_mask) loss = self.ordinal_loss(output, true) return output, loss def ordinal_loss(self, pred, true): target = torch.zeros_like(pred, device=self.torch_device) for i, label in enumerate(true): target[i, 0:label + 1] = 1 return mse_loss(pred, target, reduction='none').sum(1).mean() def get_true_labels(self, linker_mask): labels = [] sizes = linker_mask.squeeze().sum(-1).long().detach().cpu().numpy() for size in sizes: label = self.linker_size2id.get(size) if label is None: label = self.linker_size2id[max(self.linker_id2size)] labels.append(label) labels = torch.tensor(labels, device=linker_mask.device, dtype=torch.long) return labels @staticmethod def prediction2label(pred): return torch.cumprod(pred > 0.5, dim=1).sum(dim=1) - 1 def training_step(self, data, *args): _, loss = self.forward(data) return {'loss': loss} def validation_step(self, data, *args): _, loss = self.forward(data) return {'loss': loss} def test_step(self, data, *args): loss = self.forward(data) return {'loss': loss} def training_epoch_end(self, training_step_outputs): for metric in training_step_outputs[0].keys(): avg_metric = self.aggregate_metric(training_step_outputs, metric) self.metrics.setdefault(f'{metric}/train', []).append(avg_metric) self.log(f'{metric}/train', avg_metric, prog_bar=True) def validation_epoch_end(self, validation_step_outputs): for metric in validation_step_outputs[0].keys(): avg_metric = self.aggregate_metric(validation_step_outputs, metric) self.metrics.setdefault(f'{metric}/val', []).append(avg_metric) self.log(f'{metric}/val', avg_metric, prog_bar=True) correct = 0 total = 0 for data in self.val_dataloader(): output, _ = self.forward(data) pred = self.prediction2label(output) true = self.get_true_labels(data['linker_mask']) correct += (pred == true).sum() total += len(pred) accuracy = correct / total self.metrics.setdefault(f'accuracy/val', []).append(accuracy) self.log(f'accuracy/val', accuracy, prog_bar=True) def configure_optimizers(self): return torch.optim.AdamW(self.gnn.parameters(), lr=self.lr, amsgrad=True, weight_decay=1e-12) @staticmethod def aggregate_metric(step_outputs, metric): return torch.tensor([out[metric] for out in step_outputs]).mean() class SizeRegressor(pl.LightningModule): train_dataset = None val_dataset = None test_dataset = None metrics: Dict[str, List[float]] = {} def __init__( self, data_path, train_data_prefix, val_data_prefix, in_node_nf, hidden_nf, n_layers, batch_size, lr, torch_device, normalization=None, task='regression', ): super(SizeRegressor, self).__init__() self.save_hyperparameters() self.data_path = data_path self.train_data_prefix = train_data_prefix self.val_data_prefix = val_data_prefix self.batch_size = batch_size self.lr = lr self.torch_device = torch_device self.gnn = SizeGNN( in_node_nf=in_node_nf, hidden_nf=hidden_nf, out_node_nf=1, n_layers=n_layers, device=torch_device, normalization=normalization, ) def setup(self, stage: Optional[str] = None): if stage == 'fit': self.train_dataset = ZincDataset( data_path=self.data_path, prefix=self.train_data_prefix, device=self.torch_device ) self.val_dataset = ZincDataset( data_path=self.data_path, prefix=self.val_data_prefix, device=self.torch_device ) elif stage == 'val': self.val_dataset = ZincDataset( data_path=self.data_path, prefix=self.val_data_prefix, device=self.torch_device ) else: raise NotImplementedError def train_dataloader(self): return get_dataloader(self.train_dataset, self.batch_size, collate_fn=collate_with_fragment_edges, shuffle=True) def val_dataloader(self): return get_dataloader(self.val_dataset, self.batch_size, collate_fn=collate_with_fragment_edges) def test_dataloader(self): return get_dataloader(self.test_dataset, self.batch_size, collate_fn=collate_with_fragment_edges) def forward(self, data): h = data['one_hot'] x = data['positions'] fragment_mask = data['fragment_mask'] linker_mask = data['linker_mask'] edge_mask = data['edge_mask'] edges = data['edges'] # Considering only fragments x = x * fragment_mask h = h * fragment_mask # Reshaping bs, n_nodes = x.shape[0], x.shape[1] fragment_mask = fragment_mask.view(bs * n_nodes, 1) x = x.view(bs * n_nodes, -1) h = h.view(bs * n_nodes, -1) # Prediction distances, _ = coord2diff(x, edges) distance_edge_mask = (edge_mask.bool() & (distances < 6)).long() output = self.gnn.forward(h, edges, distances, fragment_mask, distance_edge_mask) output = output.view(bs, n_nodes, -1).mean(1).squeeze() true = linker_mask.squeeze().sum(-1).float() loss = mse_loss(output, true) return output, loss def training_step(self, data, *args): _, loss = self.forward(data) return {'loss': loss} def validation_step(self, data, *args): _, loss = self.forward(data) return {'loss': loss} def test_step(self, data, *args): loss = self.forward(data) return {'loss': loss} def training_epoch_end(self, training_step_outputs): for metric in training_step_outputs[0].keys(): avg_metric = self.aggregate_metric(training_step_outputs, metric) self.metrics.setdefault(f'{metric}/train', []).append(avg_metric) self.log(f'{metric}/train', avg_metric, prog_bar=True) def validation_epoch_end(self, validation_step_outputs): for metric in validation_step_outputs[0].keys(): avg_metric = self.aggregate_metric(validation_step_outputs, metric) self.metrics.setdefault(f'{metric}/val', []).append(avg_metric) self.log(f'{metric}/val', avg_metric, prog_bar=True) correct = 0 total = 0 for data in self.val_dataloader(): output, _ = self.forward(data) pred = torch.round(output).long() true = data['linker_mask'].squeeze().sum(-1).long() correct += (pred == true).sum() total += len(pred) accuracy = correct / total self.metrics.setdefault(f'accuracy/val', []).append(accuracy) self.log(f'accuracy/val', accuracy, prog_bar=True) def configure_optimizers(self): return torch.optim.AdamW(self.gnn.parameters(), lr=self.lr, amsgrad=True, weight_decay=1e-12) @staticmethod def aggregate_metric(step_outputs, metric): return torch.tensor([out[metric] for out in step_outputs]).mean()