Se muestra a continuación la clase NNClassifier tras las mejoras implementadas:
import numpy as np
def sigmoid(a: np.ndarray):
""" Aplica la función sigmoide al array a elemento por elemento """
return 1 / (1 + np.exp(-a))
def to_categorical(n, size):
""" Convierte un número n en un array de tamaño (size, 1) de ceros salvo el valor
correspondiente a n, que toma el valor 1. Así, -si size toma el valor 4- el número 2
se convertiría en el vector columna formado por los valores [0, 0, 1, 0] """
v = np.zeros(shape = (size, 1)) # Creamos el array vertical con ceros
v[n, 0] = 1 # Modificamos el valor correspondiente a n por 1
return v
def sigmoid_derivative(x):
""" Derivada de la función sigmoide """
return sigmoid(x) * (1 - sigmoid(x))
class NNClassifier(object):
def __init__(self, sizes, learning_rate = 0.01, batch_size = 16, epochs = 10,
shuffle = True, validation_split = None, verbose = True,
record_params_history = False, seed = None):
""" Constructor de la red neuronal """
self.num_layers = len(sizes) # Número total de capas de la red
self.sizes = sizes # Lista conteniendo el número de neuronas por capa
self.learning_rate = learning_rate # Tasa de aprendizaje
self.batch_size = batch_size # Tamaño del batch
self.epochs = epochs # Número de epochs durante los que entrenar la red
if seed:
np.random.seed(seed)
self.weights = [np.random.randn(x, y) for (x, y) in zip(sizes[1:], sizes[:-1])]
self.biases = [np.random.randn(n, 1) for n in sizes[1:]]
self.shuffle = shuffle # Si toma el valor True, se desordenará el dataset
self.validation_split = validation_split # Porcentaje de muestras para validación
self.verbose = verbose # Controla si se muestra info del entrenamiento
self.__history = [] # Exactitud del modelo tras cada epoch
self.n_steps = 0 # Número de actualizaciones de los parámetros
self.record_params_history = record_params_history # Registro de los parámetros
self.weights_history = [self.weights] # Valores de los pesos durante el entrenamiento
self.bias_history = [self.biases] # Valores de los bias durante el entrenamiento
def __backpropagation(self, x, y):
""" Devuelve una tupla representando el gradiente para la función de coste
correspondiente a una muestra. Los valores de estas listas son arrays NumPy con
la misma estructura que self.weights y self.bias """
grad_b = [np.zeros(b.shape) for b in self.biases] # Un valor para cada bias existente
grad_w = [np.zeros(w.shape) for w in self.weights] # Un valor para cada peso existente
# feedforward
a = x
a_layers = [x] # Valores devueltos por la función de activación
z_layers = [] # Valores devueltos por la función sumatorio
for b, w in zip(self.biases, self.weights):
z = np.dot(w, a) + b
z_layers.append(z)
a = sigmoid(z)
a_layers.append(a)
# backward pass
delta = (a_layers[-1] - y) * sigmoid_derivative(z_layers[-1])
grad_b[-1] = delta
grad_w[-1] = np.dot(delta, a_layers[-2].transpose())
for layer in range(2, self.num_layers):
z = z_layers[-layer]
sp = sigmoid_derivative(z)
delta = np.dot(self.weights[-layer + 1].transpose(), delta) * sp
grad_b[-layer] = delta
grad_w[-layer] = np.dot(delta, a_layers[-layer - 1].transpose())
return (grad_w, grad_b)
def __update_parameters(self, mini_batch):
""" Actualiza los parámetros de la red aplicando descenso de gradiente a un
mini-batch """
self.n_steps += 1
total_gradient_weights = [np.zeros(w.shape) for w in self.weights]
total_gradient_bias = [np.zeros(b.shape) for b in self.biases]
for x, y in mini_batch:
delta_gradient_weights, delta_gradient_bias = self.__backpropagation(x, y)
total_gradient_weights = [gw + dgw for gw, dgw \
in zip(total_gradient_weights, delta_gradient_weights)]
total_gradient_bias = [gb + dgb for gb, dgb \
in zip(total_gradient_bias, delta_gradient_bias)]
self.weights = [w - gw * self.learning_rate for w, gw \
in zip(self.weights, total_gradient_weights)]
self.biases = [b - gb * self.learning_rate for b, gb \
in zip(self.biases, total_gradient_bias)]
if self.record_params_history:
self.weights_history.append(self.weights)
self.bias_history.append(self.biases)
def fit(self, X: pd.core.frame.DataFrame, y: pd.core.series.Series):
""" Entrenamiento de la red neuronal"""
self.__history = []
self.weights_history = []
self.bias_history = []
self.n_steps = 0
if self.shuffle: # Desordenación del dataset
data = pd.concat([X, y], axis = 1) # Unimos caract. predictivas y variable objetivo
data = data.sample(frac = 1) # Desordenamos el conjunto
y = data.iloc[:, -1] # Volvemos a extraer las características predictivas
X = data.iloc[:, :-1] # Y el resultado
if self.validation_split:
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size = self.validation_split)
x_train_transformed = [x.values.reshape(-1, 1) for (i, x) in x_train.iterrows()]
y_train_transformed = [to_categorical(n, self.sizes[-1]) for n in y_train]
else:
x_train_transformed = [x.values.reshape(-1, 1) for (i, x) in X.iterrows()]
y_train_transformed = [to_categorical(n, self.sizes[-1]) for n in y]
training_data = [(x, y) for (x, y) in zip(x_train_transformed, y_train_transformed)]
n = len(training_data)
for epoch in range(self.epochs):
if self.shuffle:
np.random.shuffle(training_data)
mini_batches = [training_data[start:start + self.batch_size]
for start in range(0, n, self.batch_size)]
for mini_batch in mini_batches:
self.__update_parameters(mini_batch)
if self.validation_split:
prediction = self.predict(x_test)
accuracy = sum(int(p == y) for (p, y) in zip(prediction, y_test)) / len(x_test)
self.__history.append(accuracy)
if self.verbose:
print("Epoch {} complete. Accuracy: {:.4f}".format(epoch, accuracy))
else:
if self.verbose:
print("Epoch {} complete".format(epoch))
def getOutput(self, x: np.ndarray):
""" Obtención de la predicción para una muestra """
for b, w in zip(self.biases, self.weights):
x = sigmoid(np.dot(w, x) + b)
return x
def predict(self, X: pd.core.frame.DataFrame):
""" Obtención de la predicción para las muestras contenidas en un DataFrame """
predictions = np.zeros(shape = len(X))
X.reset_index(inplace = True, drop = True)
for i, sample in X.iterrows():
prediction = model.getOutput(sample.values.reshape(-1, 1))
predictions[i] = np.argmax(prediction)
return predictions
def get_history(self):
""" Obtención de los valores de exactitud obtenidos tras cada epoch """
return self.__history
def score(self, X, y):
""" Obtención de la exactitud del modelo tras aplicarlo a un dataset de evaluación """
prediction = self.predict(X)
return sum(int(p == y) for (p, y) in zip(prediction, y)) / len(X)