Code source de src.encapsulation

from typing import Any
import numpy as np
from tqdm.autonotebook import tqdm, trange
from icecream import ic
from .module import Module, Loss
from sklearn.model_selection import train_test_split
from pandas import DataFrame
from copy import deepcopy
import matplotlib.pyplot as plt
from IPython import display


[docs]class Sequential: def __init__(self, *args: Module) -> None: self.modules = [*args] self.modules_copy = deepcopy(self.modules) self.inputs = [] def __call__(self, *args: Any, **kwds: Any) -> Any: return self.forward(*args, **kwds)
[docs] def add(self, module: Module): """Add a module to the network.""" self.modules.append(module)
[docs] def insert(self, idx: int, module: Module): """Insert a module to the network at a specified indice.""" self.modules.insert(idx, module)
[docs] def reset(self): """Reset network to initial parameters and modules.""" self.modules = deepcopy(self.modules_copy) return self
[docs] def forward(self, input): self.inputs = [input] for module in self.modules: # print(f"[{module.__class__.__name__}] ➡️ Forward...") # print(f"\tInput's shape : {input.shape}") input = module(input) self.inputs.append(input) # print(f"\tOutput's shape : {input.shape}") # print(f"[{module.__class__.__name__}] ✅ Forward done!") return input
[docs] def backward(self, input, delta): # Pas sur des indices des listes ! self.inputs.reverse() # print(f"\tDelta's (loss) shape : {delta.shape}") for i, module in enumerate(reversed(self.modules)): # print(f"[{module.__class__.__name__}] ➡️ Backward...") # print(f"\tDelta's shape : {delta.shape}") # print(f"\tInput's shape : {self.inputs[i+1].shape}") module.backward_update_gradient(self.inputs[i + 1], delta) # if hasattr(module, "_parameters") and "weight" in module._parameters: # print(f"\tParamètres {module._parameters['weight']}") # if hasattr(module, "_gradient") and "weight" in module._gradient: # print(f"\tGradient {module._gradient['weight']}") delta = module.backward_delta(self.inputs[i + 1], delta) # print(f"[{module.__class__.__name__}] ✅ Backward done!") return delta
[docs] def update_parameters(self, eps=1e-3): for module in self.modules: # print(f"[{module.__class__.__name__}] ➡️ Updating parameters...") # if hasattr(module, "_parameters") and "weight" in module._parameters: # print(f"\tParamètres {module._parameters['weight']}") # if hasattr(module, "_gradient") and "weight" in module._gradient: # print(f"\tGradient {module._gradient['weight']}") module.update_parameters(learning_rate=eps)
# print(f"[{module.__class__.__name__}] Parameters updated! ✅") # if hasattr(module, "_parameters") and "weight" in module._parameters: # print(f"\tParamètres {module._parameters['weight']}") # if hasattr(module, "_gradient") and "weight" in module._gradient: # print(f"\tGradient {module._gradient['weight']}")
[docs] def zero_grad(self): for module in self.modules: module.zero_grad()
# print(f"[{module.__class__.__name__}] Gradient reinitialized ✅")
[docs]class Optim: def __init__(self, network: Sequential, loss: Loss, eps: float) -> None: self.network = network self.loss = loss self.eps = eps def _create_batches(self, X, y, batch_size, shuffle=True, seed=42): n_samples = X.shape[0] if shuffle: if seed is not None: np.random.seed(seed) indices = np.random.permutation(n_samples) X = X[indices] y = y[indices] for X_batch, y_batch in zip( np.array_split(X, n_samples / batch_size), np.array_split(y, n_samples / batch_size), ): yield X_batch, y_batch
[docs] def step(self, batch_x, batch_y): # Forward pass y_hat = self.network.forward(batch_x) loss_value = self.loss.forward(batch_y, y_hat) # Backward pass loss_delta = self.loss.backward(batch_y, y_hat) self.network.zero_grad() self.network.backward(batch_x, loss_delta) self.network.update_parameters(self.eps) return loss_value
[docs] def SGD( self, X, y, batch_size: int, epochs: int, network: Sequential = None, shuffle: bool = True, seed: int = 42, ): if not network: network = self.network losses = [] for epoch in trange(epochs): loss_sum = 0 for X_i, y_i in self._create_batches(X, y, batch_size, shuffle, seed): loss_sum += self.step(X_i, y_i).sum() losses.append(loss_sum / len(y)) # print(f"Epoch [{epoch+1}], Loss = {losses[-1]:.4f}") return np.array(losses)
[docs] def SGD_eval( self, X, y, batch_size: int, epochs: int, test_size: float, patience: int = 10, network: Sequential = None, shuffle_train: bool = True, shuffle_test: bool = False, seed: int = 42, return_dataframe: bool = False, online_plot: bool = False, ): if not network: network = self.network if online_plot: fig = plt.gcf() ax = plt.gca() dh = display.display(fig, display_id=True) # Train test split X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=42 ) # Sauvegarde pour éventuelle utilisation en dehors de la fonction self.X_train, self.X_test, self.y_train, self.y_test = ( X_train, X_test, y_train, y_test, ) # Training losses_train = [] losses_test = [] scores_train = [] scores_test = [] # Early stopping best_score = 0.0 counter = 0 epoch_progress = tqdm(range(epochs), desc="Epoch", position=0) batch_progress = tqdm( desc="Batch", position=1, total=len(X_train) // batch_size, mininterval=0.3 ) for _ in epoch_progress: loss_sum = 0 batch_iter = self._create_batches( X_train, y_train, batch_size, shuffle_train, seed ) for X_i, y_i in batch_iter: loss_batch_vect = self.step(X_i, y_i) loss_sum += loss_batch_vect.sum() batch_progress.update() batch_progress.reset() # Reset batch bar epoch_train_loss = loss_sum / len(y_train) losses_train.append(epoch_train_loss) epoch_train_score = self.score(X_train, y_train) scores_train.append(epoch_train_score) # Epoch evaluation y_hat = self.network.forward(X_test) epoch_test_loss = self.loss.forward(y_test, y_hat).mean() epoch_test_score = self.score(X_test, y_test) losses_test.append(epoch_test_loss) scores_test.append(epoch_test_score) # Update the epoch progress bar with the latest epoch loss value epoch_progress.set_postfix( { "train_loss": epoch_train_loss, "train_score": epoch_train_score, "test_loss": epoch_test_loss, "test_score": epoch_test_score, } ) if online_plot: ax.plot(losses_train) ax.plot(losses_test) dh.update(fig, clear=True) # Early stopping if epoch_test_score > best_score: best_score = epoch_test_score counter = 0 else: counter += 1 if patience and counter >= patience: # print(f"Early stopping after {patience} epochs without improvement.") break batch_progress.close() if return_dataframe: self.train_df = DataFrame( { "epoch": np.arange(len(losses_train)), "loss_train": losses_train, "loss_test": losses_test, "score_train": scores_train, "score_test": scores_test, } ) else: self.train_df = ( np.array(losses_train), np.array(scores_train), np.array(losses_test), np.array(scores_test), ) return self.train_df
[docs] def score(self, X, y): assert X.shape[0] == y.shape[0], ValueError() if len(y.shape) != 1: # eventual y with OneHot encoding y = y.argmax(axis=1) y_hat = np.argmax(self.network.forward(X), axis=1) return np.where(y == y_hat, 1, 0).mean()