|
| 1 | +import lightning as L |
| 2 | +import torch |
| 3 | +import torch.nn as nn |
| 4 | +from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence |
| 5 | +import torchmetrics.functional.regression |
| 6 | +import torch.optim as optim |
| 7 | +from spotpython.hyperparameters.optimizer import optimizer_handler |
| 8 | + |
| 9 | + |
| 10 | +class ManyToManyGRU(nn.Module): |
| 11 | + """A Many-to-Many GRU model for sequence-to-sequence regression tasks. |
| 12 | +
|
| 13 | + This model uses a GRU layer followed by a fully connected layer and an output layer. |
| 14 | +
|
| 15 | + Args: |
| 16 | + input_size (int): The number of input features. |
| 17 | + output_size (int): The number of output features. Defaults to 1. |
| 18 | + gru_units (int): The number of units in the GRU layer. Defaults to 128. |
| 19 | + fc_units (int): The number of units in the fully connected layer. Defaults to 128. |
| 20 | + activation_fct (nn.Module): The activation function to use after the fully connected layer. Defaults to nn.ReLU(). |
| 21 | + dropout (float): The dropout probability. Defaults to 0.2. |
| 22 | + bidirectional (bool): Whether the GRU is bidirectional. Defaults to True. |
| 23 | + num_layers (int): The number of GRU layers. Defaults to 2. |
| 24 | +
|
| 25 | + Examples: |
| 26 | + >>> from spotpython.light.regression.nn_many_to_many_gru_regressor import ManyToManyGRU |
| 27 | + >>> import torch |
| 28 | + >>> model = ManyToManyGRU(input_size=10, output_size=1) |
| 29 | + >>> x = torch.randn(16, 10, 10) # Batch of 16 sequences, each of length 10 with 10 features |
| 30 | + >>> lengths = torch.tensor([10] * 16) # All sequences have length 10 |
| 31 | + >>> output = model(x, lengths) |
| 32 | + >>> print(output.shape) # Output shape: (16, 10, 1) |
| 33 | + """ |
| 34 | + |
| 35 | + def __init__( |
| 36 | + self, |
| 37 | + input_size, |
| 38 | + output_size=1, |
| 39 | + gru_units=128, |
| 40 | + fc_units=128, |
| 41 | + activation_fct=nn.ReLU(), |
| 42 | + dropout=0.2, |
| 43 | + bidirectional=True, |
| 44 | + num_layers=2, |
| 45 | + ): |
| 46 | + super(ManyToManyGRU, self).__init__() |
| 47 | + self.gru_layer = nn.GRU( |
| 48 | + input_size=input_size, |
| 49 | + hidden_size=gru_units, |
| 50 | + num_layers=num_layers, |
| 51 | + batch_first=True, |
| 52 | + bidirectional=bidirectional, |
| 53 | + dropout=dropout if num_layers > 1 else 0.0, |
| 54 | + ) |
| 55 | + if bidirectional: |
| 56 | + gru_units = gru_units * 2 |
| 57 | + self.fc = nn.Linear(gru_units, fc_units) |
| 58 | + self.dropout = nn.Dropout(dropout) |
| 59 | + self.output_layer = nn.Linear(fc_units, output_size) |
| 60 | + self.activation_fct = activation_fct |
| 61 | + |
| 62 | + def forward(self, x, lengths): |
| 63 | + """Forward pass of the ManyToManyGRU model. |
| 64 | +
|
| 65 | + Args: |
| 66 | + x (torch.Tensor): Input tensor of shape (batch_size, seq_len, input_size). |
| 67 | + lengths (torch.Tensor): Tensor containing the lengths of each sequence in the batch. |
| 68 | +
|
| 69 | + Returns: |
| 70 | + torch.Tensor: Output tensor of shape (batch_size, seq_len, output_size). |
| 71 | +
|
| 72 | + Raises: |
| 73 | + ValueError: If the input tensor is empty or if the lengths tensor is empty. |
| 74 | + RuntimeError: If the lengths tensor does not match the batch size of the input tensor. |
| 75 | + """ |
| 76 | + if x.size(0) == 0 or lengths.size(0) == 0: |
| 77 | + raise ValueError("Input tensor or lengths tensor is empty.") |
| 78 | + if x.size(0) != lengths.size(0): |
| 79 | + raise RuntimeError(f"Batch size of input tensor ({x.size(0)}) and lengths tensor ({lengths.size(0)}) must match.") |
| 80 | + |
| 81 | + x = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False) |
| 82 | + packed_output, _ = self.gru_layer(x) |
| 83 | + x, _ = pad_packed_sequence(packed_output, batch_first=True) |
| 84 | + x = self.dropout(x) |
| 85 | + x = self.fc(x) |
| 86 | + x = self.activation_fct(x) |
| 87 | + x = self.output_layer(x) |
| 88 | + return x |
| 89 | + |
| 90 | + |
| 91 | +class ManyToManyGRURegressor(L.LightningModule): |
| 92 | + """A LightningModule for training and evaluating a Many-to-Many GRU regressor. |
| 93 | +
|
| 94 | + Args: |
| 95 | + _L_in (int): The number of input features. |
| 96 | + _L_out (int): The number of output features. |
| 97 | + l1 (int): Unused parameter. Defaults to 8. |
| 98 | + gru_units (int): The number of units in the GRU layer. Defaults to 128. |
| 99 | + fc_units (int): The number of units in the fully connected layer. Defaults to 128. |
| 100 | + act_fn (nn.Module): The activation function to use after the fully connected layer. Defaults to nn.ReLU(). |
| 101 | + dropout_prob (float): The dropout probability. Defaults to 0.2. |
| 102 | + bidirectional (bool): Whether the GRU is bidirectional. Defaults to True. |
| 103 | + num_layers (int): The number of GRU layers. Defaults to 2. |
| 104 | + optimizer (str): The optimizer to use. Defaults to "Adam". |
| 105 | + lr_mult (float): Learning rate multiplier. Defaults to 1.0. |
| 106 | + patience (int): Patience for learning rate scheduler. Defaults to 5. |
| 107 | + epochs (int): Number of training epochs. Defaults to 100. |
| 108 | + batch_size (int): Batch size for training. Defaults to 32. |
| 109 | + _torchmetric (str): The metric to use for evaluation. Defaults to "mean_squared_error". |
| 110 | +
|
| 111 | + Examples: |
| 112 | + >>> model = ManyToManyGRURegressor(_L_in=10, _L_out=1) |
| 113 | + >>> x = torch.randn(16, 10, 10) # Batch of 16 sequences, each of length 10 with 10 features |
| 114 | + >>> lengths = torch.tensor([10] * 16) # All sequences have length 10 |
| 115 | + >>> output = model(x, lengths) |
| 116 | + >>> print(output.shape) # Output shape: (16, 10, 1) |
| 117 | + """ |
| 118 | + |
| 119 | + def __init__( |
| 120 | + self, |
| 121 | + _L_in: int, |
| 122 | + _L_out: int, |
| 123 | + l1: int = 8, |
| 124 | + gru_units: int = 128, |
| 125 | + fc_units: int = 128, |
| 126 | + act_fn: nn.Module = nn.ReLU(), |
| 127 | + dropout_prob: float = 0.2, |
| 128 | + bidirectional: bool = True, |
| 129 | + num_layers: int = 2, |
| 130 | + optimizer: str = "Adam", |
| 131 | + lr_mult: float = 1.0, |
| 132 | + patience: int = 5, |
| 133 | + epochs: int = 100, |
| 134 | + batch_size: int = 32, |
| 135 | + _torchmetric: str = "mean_squared_error", |
| 136 | + *args, |
| 137 | + **kwargs, |
| 138 | + ): |
| 139 | + super().__init__() |
| 140 | + self._L_in = _L_in |
| 141 | + self._L_out = _L_out |
| 142 | + if _torchmetric is None: |
| 143 | + _torchmetric = "mean_squared_error" |
| 144 | + self._torchmetric = _torchmetric |
| 145 | + self.metric = getattr(torchmetrics.functional.regression, _torchmetric) |
| 146 | + self.save_hyperparameters(ignore=["_L_in", "_L_out", "_torchmetric"]) |
| 147 | + self.example_input_array = (torch.zeros((batch_size, 10, _L_in)), torch.tensor([10] * batch_size)) |
| 148 | + |
| 149 | + self.layers = ManyToManyGRU( |
| 150 | + input_size=_L_in, |
| 151 | + output_size=_L_out, |
| 152 | + gru_units=self.hparams.gru_units, |
| 153 | + fc_units=self.hparams.fc_units, |
| 154 | + activation_fct=self.hparams.act_fn, |
| 155 | + dropout=self.hparams.dropout_prob, |
| 156 | + bidirectional=self.hparams.bidirectional, |
| 157 | + num_layers=self.hparams.num_layers, |
| 158 | + ) |
| 159 | + |
| 160 | + def forward(self, x, lengths) -> torch.Tensor: |
| 161 | + """Forward pass of the ManyToManyGRURegressor. |
| 162 | +
|
| 163 | + Args: |
| 164 | + x (torch.Tensor): Input tensor of shape (batch_size, seq_len, input_size). |
| 165 | + lengths (torch.Tensor): Tensor containing the lengths of each sequence in the batch. |
| 166 | +
|
| 167 | + Returns: |
| 168 | + torch.Tensor: Output tensor of shape (batch_size, seq_len, output_size). |
| 169 | + """ |
| 170 | + x = self.layers(x, lengths) |
| 171 | + return x |
| 172 | + |
| 173 | + def _calculate_loss(self, batch): |
| 174 | + """Calculates the loss for a given batch. |
| 175 | +
|
| 176 | + Args: |
| 177 | + batch (tuple): A tuple containing (x, lengths, y), where: |
| 178 | + - x: Input tensor of shape (batch_size, seq_len, input_size). |
| 179 | + - lengths: Tensor containing the lengths of each sequence in the batch. |
| 180 | + - y: Target tensor of shape (batch_size, seq_len, output_size). |
| 181 | +
|
| 182 | + Returns: |
| 183 | + torch.Tensor: The calculated loss. |
| 184 | + """ |
| 185 | + x, lengths, y = batch |
| 186 | + y_hat = self(x, lengths) |
| 187 | + y = y.view_as(y_hat) |
| 188 | + loss = self.metric(y_hat, y) |
| 189 | + return loss |
| 190 | + |
| 191 | + def training_step(self, batch: tuple, batch_idx) -> torch.Tensor: |
| 192 | + """Performs a single training step. |
| 193 | +
|
| 194 | + Args: |
| 195 | + batch (tuple): A tuple containing (x, lengths, y). |
| 196 | + batch_idx (int): The index of the batch. |
| 197 | +
|
| 198 | + Returns: |
| 199 | + torch.Tensor: The training loss. |
| 200 | + """ |
| 201 | + val_loss = self._calculate_loss(batch) |
| 202 | + return val_loss |
| 203 | + |
| 204 | + def validation_step(self, batch: tuple, batch_idx, prog_bar: bool = False) -> torch.Tensor: |
| 205 | + """Performs a single validation step. |
| 206 | +
|
| 207 | + Args: |
| 208 | + batch (tuple): A tuple containing (x, lengths, y). |
| 209 | + batch_idx (int): The index of the batch. |
| 210 | + prog_bar (bool): Whether to log the loss to the progress bar. Defaults to False. |
| 211 | +
|
| 212 | + Returns: |
| 213 | + torch.Tensor: The validation loss. |
| 214 | + """ |
| 215 | + val_loss = self._calculate_loss(batch) |
| 216 | + self.log("val_loss", val_loss, prog_bar=True) |
| 217 | + self.log("hp_metric", val_loss, prog_bar=True) |
| 218 | + return val_loss |
| 219 | + |
| 220 | + def test_step(self, batch: tuple, batch_idx: int, prog_bar: bool = False) -> torch.Tensor: |
| 221 | + """Performs a single test step. |
| 222 | +
|
| 223 | + Args: |
| 224 | + batch (tuple): A tuple containing (x, lengths, y). |
| 225 | + batch_idx (int): The index of the batch. |
| 226 | + prog_bar (bool): Whether to log the loss to the progress bar. Defaults to False. |
| 227 | +
|
| 228 | + Returns: |
| 229 | + torch.Tensor: The test loss. |
| 230 | + """ |
| 231 | + val_loss = self._calculate_loss(batch) |
| 232 | + self.log("val_loss", val_loss, prog_bar=prog_bar) |
| 233 | + self.log("hp_metric", val_loss, prog_bar=prog_bar) |
| 234 | + return val_loss |
| 235 | + |
| 236 | + def configure_optimizers(self) -> dict: |
| 237 | + """Configures the optimizer and learning rate scheduler. |
| 238 | +
|
| 239 | + Returns: |
| 240 | + dict: A dictionary containing the optimizer and learning rate scheduler configuration. |
| 241 | + """ |
| 242 | + optimizer = optimizer_handler(optimizer_name=self.hparams.optimizer, params=self.parameters(), lr_mult=self.hparams.lr_mult) |
| 243 | + |
| 244 | + num_milestones = 3 |
| 245 | + milestones = [int(self.hparams.epochs / (num_milestones + 1) * (i + 1)) for i in range(num_milestones)] |
| 246 | + scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=milestones, gamma=0.1) |
| 247 | + |
| 248 | + lr_scheduler_config = { |
| 249 | + "scheduler": scheduler, |
| 250 | + "interval": "epoch", |
| 251 | + "frequency": 1, |
| 252 | + } |
| 253 | + |
| 254 | + return {"optimizer": optimizer, "lr_scheduler": lr_scheduler_config} |
0 commit comments