Clase NNClassifier v7

Se muestra a continuación la clase NNClassifier tras las mejoras implementadas:

import numpy as np

def sigmoid(a: np.ndarray):
    """ Aplica la función sigmoide al array a elemento por elemento """
    return 1 / (1 + np.exp(-a))
    
def to_categorical(n, size):
    """ Convierte un número n en un array de tamaño (size, 1) de ceros salvo el valor
    correspondiente a n, que toma el valor 1. Así, -si size toma el valor 4- el número 2
    se convertiría en el vector columna formado por los valores [0, 0, 1, 0] """
    
    v = np.zeros(shape = (size, 1))      # Creamos el array vertical con ceros
    v[n, 0] = 1                          # Modificamos el valor correspondiente a n por 1
    return v

def sigmoid_derivative(x):
    """ Derivada de la función sigmoide """
    return sigmoid(x) * (1 - sigmoid(x))

class NNClassifier(object):
    
    def __init__(self, sizes, learning_rate = 0.01, batch_size = 16, epochs = 10,
                shuffle = True, validation_split = None, verbose = True,
                record_params_history = False, seed = None):
        """ Constructor de la red neuronal """
        
        self.num_layers = len(sizes)         # Número total de capas de la red
        self.sizes = sizes                   # Lista conteniendo el número de neuronas por capa
        self.learning_rate = learning_rate   # Tasa de aprendizaje
        self.batch_size = batch_size         # Tamaño del batch
        self.epochs = epochs                 # Número de epochs durante los que entrenar la red
        if seed:
            np.random.seed(seed)
        self.weights = [np.random.randn(x, y) for (x, y) in zip(sizes[1:], sizes[:-1])]
        self.biases = [np.random.randn(n, 1) for n in sizes[1:]]
        self.shuffle = shuffle               # Si toma el valor True, se desordenará el dataset
        self.validation_split = validation_split   # Porcentaje de muestras para validación
        self.verbose = verbose               # Controla si se muestra info del entrenamiento
        self.__history = []                  # Exactitud del modelo tras cada epoch
        self.n_steps = 0                     # Número de actualizaciones de los parámetros
        self.record_params_history = record_params_history  # Registro de los parámetros
        self.weights_history = [self.weights]   # Valores de los pesos durante el entrenamiento
        self.bias_history = [self.biases]       # Valores de los bias durante el entrenamiento
    
    def __backpropagation(self, x, y):
        """ Devuelve una tupla representando el gradiente para la función de coste
        correspondiente a una muestra. Los valores de estas listas son arrays NumPy con
        la misma estructura que self.weights y self.bias """
        grad_b = [np.zeros(b.shape) for b in self.biases]  # Un valor para cada bias existente
        grad_w = [np.zeros(w.shape) for w in self.weights] # Un valor para cada peso existente
        # feedforward
        a = x
        a_layers = [x] # Valores devueltos por la función de activación
        z_layers = []  # Valores devueltos por la función sumatorio
        for b, w in zip(self.biases, self.weights):
            z = np.dot(w, a) + b
            z_layers.append(z)
            a = sigmoid(z)
            a_layers.append(a)
        # backward pass
        delta = (a_layers[-1] - y) * sigmoid_derivative(z_layers[-1])
        grad_b[-1] = delta
        grad_w[-1] = np.dot(delta, a_layers[-2].transpose())
        for layer in range(2, self.num_layers):
            z = z_layers[-layer]
            sp = sigmoid_derivative(z)
            delta = np.dot(self.weights[-layer + 1].transpose(), delta) * sp
            grad_b[-layer] = delta
            grad_w[-layer] = np.dot(delta, a_layers[-layer - 1].transpose())
        return (grad_w, grad_b)
    
    def __update_parameters(self, mini_batch):
        """ Actualiza los parámetros de la red aplicando descenso de gradiente a un
        mini-batch """
        self.n_steps += 1
        total_gradient_weights = [np.zeros(w.shape) for w in self.weights]
        total_gradient_bias = [np.zeros(b.shape) for b in self.biases]
        for x, y in mini_batch:
            delta_gradient_weights, delta_gradient_bias = self.__backpropagation(x, y)
            total_gradient_weights = [gw + dgw for gw, dgw \
                            in zip(total_gradient_weights, delta_gradient_weights)]
            total_gradient_bias = [gb + dgb for gb, dgb \
                            in zip(total_gradient_bias, delta_gradient_bias)]
        self.weights = [w - gw * self.learning_rate for w, gw \
                        in zip(self.weights, total_gradient_weights)]
        self.biases = [b - gb * self.learning_rate for b, gb \
                        in zip(self.biases, total_gradient_bias)]
        if self.record_params_history:
            self.weights_history.append(self.weights)
            self.bias_history.append(self.biases)
        
    def fit(self, X: pd.core.frame.DataFrame, y: pd.core.series.Series):
        """ Entrenamiento de la red neuronal"""
        self.__history = []
        self.weights_history = []
        self.bias_history = []
        self.n_steps = 0
        if self.shuffle:                          # Desordenación del dataset
            data = pd.concat([X, y], axis = 1)    # Unimos caract. predictivas y variable objetivo
            data = data.sample(frac = 1)          # Desordenamos el conjunto
            y = data.iloc[:, -1]                  # Volvemos a extraer las características predictivas
            X = data.iloc[:, :-1]                 # Y el resultado
        if self.validation_split:
            x_train, x_test, y_train, y_test = train_test_split(X, y, test_size = self.validation_split)
            x_train_transformed = [x.values.reshape(-1, 1) for (i, x) in x_train.iterrows()]
            y_train_transformed = [to_categorical(n, self.sizes[-1]) for n in y_train]
        else:
            x_train_transformed = [x.values.reshape(-1, 1) for (i, x) in X.iterrows()]
            y_train_transformed = [to_categorical(n, self.sizes[-1]) for n in y]
        training_data = [(x, y) for (x, y) in zip(x_train_transformed, y_train_transformed)]
        n = len(training_data)
        for epoch in range(self.epochs):
            if self.shuffle:
                np.random.shuffle(training_data)
            mini_batches = [training_data[start:start + self.batch_size]
                            for start in range(0, n, self.batch_size)]
            for mini_batch in mini_batches:
                self.__update_parameters(mini_batch)
            if self.validation_split:
                prediction = self.predict(x_test)
                accuracy = sum(int(p == y) for (p, y) in zip(prediction, y_test)) / len(x_test)
                self.__history.append(accuracy)
                if self.verbose:
                    print("Epoch {} complete. Accuracy: {:.4f}".format(epoch, accuracy))
            else:
                if self.verbose:
                    print("Epoch {} complete".format(epoch))
    
    def getOutput(self, x: np.ndarray):
        """ Obtención de la predicción para una muestra """
        for b, w in zip(self.biases, self.weights):
            x = sigmoid(np.dot(w, x) + b)
        return x
    
    def predict(self, X: pd.core.frame.DataFrame):
        """ Obtención de la predicción para las muestras contenidas en un DataFrame """
        predictions = np.zeros(shape = len(X))
        X.reset_index(inplace = True, drop = True)
        for i, sample in X.iterrows():
            prediction = model.getOutput(sample.values.reshape(-1, 1))
            predictions[i] = np.argmax(prediction)
        return predictions
    
    def get_history(self):
        """ Obtención de los valores de exactitud obtenidos tras cada epoch """
        return self.__history
    
    def score(self, X, y):
        """ Obtención de la exactitud del modelo tras aplicarlo a un dataset de evaluación """
        prediction = self.predict(X)
        return sum(int(p == y) for (p, y) in zip(prediction, y)) / len(X)