Clase NNClassifier v6

Ahora ya tenemos nuestra primera versión de la clase Network funcional. Se incluye a continuación tanto el código principal de la clase como las funciones sigmoid, to_categorical y sigmoid_derivative:

def sigmoid(a: np.ndarray):
    """ Aplica la función sigmoide al array a elemento por elemento """
    return 1 / (1 + np.exp(-a))
    
def to_categorical(n, size):
    """ Convierte un número n en un array de tamaño (size, 1) de ceros salvo el valor
    correspondiente a n, que toma el valor 1. Así, -si size toma el valor 4- el número 2
    se convertiría en el vector columna formado por los valores [0, 0, 1, 0] """
    
    v = np.zeros(shape = (size, 1))      # Creamos el array vertical con ceros
    v[n, 0] = 1                          # Modificamos el valor correspondiente a n por 1
    return v

def sigmoid_derivative(x):
    """ Derivada de la función sigmoide """
    return sigmoid(x) * (1 - sigmoid(x))

class NNClassifier(object):
    
    def __init__(self, sizes, learning_rate = 0.01, batch_size = 16, epochs = 10):
        """ Constructor de la red neuronal """
        self.num_layers = len(sizes)         # Número total de capas de la red
        self.sizes = sizes                   # Lista conteniendo el número de neuronas por capa
        self.learning_rate = learning_rate   # Tasa de aprendizaje
        self.batch_size = batch_size         # Tamaño del batch
        self.epochs = epochs                 # Número de epochs durante los que entrenar la red
        self.weights = [np.random.randn(x, y) for (x, y) in zip(sizes[1:], sizes[:-1])]
        self.biases = [np.random.randn(n, 1) for n in sizes[1:]]
    
    def __backpropagation(self, x, y):
        """ Devuelve una tupla representando el gradiente para la función de coste
        correspondiente a una muestra. Los valores de estas listas son arrays NumPy con
        la misma estructura que self.weights y self.bias """
        grad_b = [np.zeros(b.shape) for b in self.biases]  # Un valor para cada bias existente
        grad_w = [np.zeros(w.shape) for w in self.weights] # Un valor para cada peso existente
        
        # feedforward
        a = x
        a_layers = [x] # Valores devueltos por la función de activación
        z_layers = []  # Valores devueltos por la función sumatorio
        for b, w in zip(self.biases, self.weights):
            z = np.dot(w, a) + b
            z_layers.append(z)
            a = sigmoid(z)
            a_layers.append(a)
            
        # backward pass
        delta = (a_layers[-1] - y) * sigmoid_derivative(z_layers[-1])
        grad_b[-1] = delta
        grad_w[-1] = np.dot(delta, a_layers[-2].transpose())
        
        for layer in range(2, self.num_layers):
            z = z_layers[-layer]
            sp = sigmoid_derivative(z)
            delta = np.dot(self.weights[-layer + 1].transpose(), delta) * sp
            grad_b[-layer] = delta
            grad_w[-layer] = np.dot(delta, a_layers[-layer - 1].transpose())
        return (grad_w, grad_b)
    
    def __update_parameters(self, mini_batch):
        """ Actualiza los parámetros de la red aplicando descenso de gradiente a un
        mini-batch """
        total_gradient_weights = [np.zeros(w.shape) for w in self.weights]
        total_gradient_bias = [np.zeros(b.shape) for b in self.biases]
        for x, y in mini_batch:
            delta_gradient_weights, delta_gradient_bias = self.__backpropagation(x, y)
            total_gradient_weights = [gw + dgw for gw, dgw \
                            in zip(total_gradient_weights, delta_gradient_weights)]
            total_gradient_bias = [gb + dgb for gb, dgb \
                            in zip(total_gradient_bias, delta_gradient_bias)]
        self.weights = [w - gw * self.learning_rate for w, gw \
                        in zip(self.weights, total_gradient_weights)]
        self.biases = [b - gb * self.learning_rate for b, gb \
                        in zip(self.biases, total_gradient_bias)]
    
    def fit(self, X: pd.core.frame.DataFrame, y: pd.core.series.Series):
        """ Entrenamiento de la red neuronal"""
        x_train = [x.values.reshape(-1, 1) for (i, x) in X.iterrows()]
        y_train = [to_categorical(n, self.sizes[-1]) for n in y]
        training_data = [(x, y) for (x, y) in zip(x_train, y_train)]
        n = len(training_data)
        for epoch in range(self.epochs):
            mini_batches = [training_data[start:start + self.batch_size]
                            for start in range(0, n, self.batch_size)]
            for mini_batch in mini_batches:
                self.__update_parameters(mini_batch)
            print("Epoch {} complete".format(epoch))
    
    def getOutput(self, x: np.ndarray):
        """ Obtención de la predicción para una muestra """
        for b, w in zip(self.biases, self.weights):
            x = sigmoid(np.dot(w, x) + b)
        return x
    
    def predict(self, X: pd.core.frame.DataFrame):
        """ Obtención de la predicción para las muestras contenidas en un DataFrame """
        predictions = np.zeros(shape = len(X))
        X.reset_index(inplace = True, drop = True)
        for i, sample in X.iterrows():
            prediction = model.getOutput(sample.values.reshape(-1, 1))
            predictions[i] = np.argmax(prediction)
        return predictions