Prototyping Gradient Descent in Machine Learning

-

Learning

Supervised learning is a category of machine learning that uses labeled datasets to coach algorithms to predict outcomes and recognize patterns.

Unlike unsupervised learning, supervised learning algorithms are given labeled training to learn the connection between the input and the outputs.

Prerequisite: Linear algebra


Suppose we now have a regression problem where the model must predict continuous values by taking n variety of input features (xi).

The prediction value is defined as a function called hypothesis (h):

where:

  • θi: i-th parameter corresponding to every input feature (x_i), 
  • ϵ (epsilon): Gaussian error (ϵ ~ N(0, σ²)))

Because the hypothesis for a single input generates a scalar value (hθ​(x)∈R), it could possibly be denoted because the dot product of the transpose of the parameter vector (θT) and the feature vector for that input (x):

Batch Gradient Descent

Gradient Descent is an iterative optimization algorithm used to seek out local minima of a function. At each step, it moves within the direction opposite to the direction of steepest descent to progressively lower the function’s value — simply, keep going downhill.

Now, recall we now have n parameters that impact the prediction. So, we want to know the precise contribution of the (θi) corresponding to training data (xi)to the function.

Suppose we set size of every step as a learning rate (), and find a price curve (J), then the parameter is deducted at each step such that:

(learning rate, J(ost function, ∂​/∂ partial derivative of the fee function with respect to ​)

Gradient

The gradient represents the slope of the fee function.

Considering the remaining parameters and their corresponding partial derivatives of the fee function (J), the gradient of the fee function at θ for n parameters is defined as:

Gradient is a matrix notation of partial derivatives of the fee function with respect to all of the parameters (

For the reason that learning rate is a scalar (α∈R), the update rule of the gradient descent algorithm is expressed in matrix notation:

Consequently, the parameter (θ) resides within the (n+1)-dimensional space.

Geographically, it goes downhill at a step corresponding to the training rate until reaching the convergence.

Gradient Descent going downhill to optimize the parameter

Computation

The target of linear regression is to attenuate the gap (MSE) between predicted values and actual values given within the training dataset.

Cost Function (Objective Function)

This gap (MSE) is defined as a mean gap of all of the training examples:

where

  • J cost function (or loss function),
  • h: prediction from the model,
  • x: i_th input feature,
  • y: i_th goal value, and
  • m: the number of coaching examples.

The gradient is computed by takingpartial derivative of the fee function with respect to every parameter:

Because we now have n+1 parameters (including an intercept term θ0​) and m training examples, we’ll form a gradient vector using matrix notation:

In matrix notation, where X represents the design matrix including the intercept term and θ is the parameter vector, the gradient ∇θ​J(θ) is given by:

The LMS (Least Mean Squares) rule is an iterative algorithm that constantly adjusts the model’s parameters based on the error between its predictions and the actual goal values of the training examples.

Least Minimum Squares (LMS) Rule

In each epoch of gradient descent, every parameter θi​ is updated by subtracting a fraction of the common error across all training examples:

This process allows the algorithm to iteratively find the optimal parameters that minimize the fee function.

Normal Equation

To seek out the optimal parameter (θ*) that minimizes the fee function, we are able to use the normal equation.

This method offers an analytical solution for linear regression, allowing us to directly calculate the θ value that minimizes the fee function.

Unlike iterative optimization techniques, the conventional equation finds this optimum by directly solving for the purpose where the gradient is zero, ensuring immediate convergence:

Hence:

This relies on the idea that the design matrix X is invertible, which means that every one its input features (from x_0​ to x_n​) are linearly independent.

If X shouldn’t be invertible, we’ll need to regulate the input features to make sure their mutual independence.

Simulation

In point of fact, we repeat the method until convergence by setting:

  • Cost function and its gradient
  • Learning rate
  • Tolerance (min. cost threshold to stop the iteration)
  • Maximum variety of iterations
  • Start line

Batch by Learning Rate

The next coding snippet demonstrates the strategy of gradient descent finds local minima of a quadratic cost function by learning rates (0.1, 0.3, 0.8 and 0.9):

def cost_func(x):
    return x**2 - 4 * x + 1

def gradient(x):
    return 2*x - 4

def gradient_descent(gradient, start, learn_rate, max_iter, tol):
    x = start
    steps = [start] # records learning steps

    for _ in range(max_iter):
        diff = learn_rate * gradient(x)
        if np.abs(diff) < tol:
            break
        x = x - diff
        steps.append(x)

    return x, steps

x_values = np.linspace(-4, 11, 400)
y_values = cost_func(x_values)
initial_x = 9
iterations = 100
tolerance = 1e-6
learning_rates = [0.1, 0.3, 0.8, 0.9]

def gradient_descent_curve(ax, learning_rate):
    final_x, history = gradient_descent(gradient, initial_x, learning_rate, iterations, tolerance)

    ax.plot(x_values, y_values, label=f'Cost function: $J(x) = x^2 - 4x + 1$', lw=1, color='black')

    ax.scatter(history, [cost_func(x) for x in history], color='pink', zorder=5, label='Steps')
    ax.plot(history, [cost_func(x) for x in history], 'r--', lw=1, zorder=5)

    ax.annotate('Start', xy=(history[0], cost_func(history[0])), xytext=(history[0], cost_func(history[0]) + 10),
                arrowprops=dict(facecolor='black', shrink=0.05), ha='center')
    ax.annotate('End', xy=(final_x, cost_func(final_x)), xytext=(final_x, cost_func(final_x) + 10),
                arrowprops=dict(facecolor='black', shrink=0.05), ha='center')
    
    ax.set_title(f'Learning Rate: {learning_rate}')
    ax.set_xlabel('Input feature: x')
    ax.set_ylabel('Cost: J')
    ax.grid(True, alpha=0.5, ls='--', color='grey')
    ax.legend()

fig, axs = plt.subplots(1, 4, figsize=(30, 5))
fig.suptitle('Gradient Descent Steps by Learning Rate')

for ax, lr in zip(axs.flatten(), learning_rates):
    gradient_descent_curve(ax=ax, learning_rate=lr)
Learning rates control gradient descent steps. (Suppose the fee function J(x) is a quadratic function, taking one input feature x.)

Predicting Credit Card Transaction

Allow us to use a sample dataset on Kaggle to predict bank card transaction using linear regression with Batch GD.

1. Data Preprocessing

a) Base DataFrame

First, we’ll merge these 4 files from the sample dataset using IDs as the important thing, while sanitizing the raw data:

# load transaction data
trx_df = pd.read_csv(f'{dir}/transactions_data.csv')

# sanitize the dataset 
trx_df = trx_df[trx_df['errors'].isna()]
trx_df = trx_df.drop(columns=['merchant_city','merchant_state', 'date', 'mcc', 'errors'], axis='columns')
trx_df['amount'] = trx_df['amount'].apply(sanitize_df)

# merge the dataframe with fraud transaction flag.
with open(f'{dir}/train_fraud_labels.json', 'r') as fp:
    fraud_labels_json = json.load(fp=fp)

fraud_labels_dict = fraud_labels_json.get('goal', {})
fraud_labels_series = pd.Series(fraud_labels_dict, name='is_fraud')
fraud_labels_series.index = fraud_labels_series.index.astype(int)

merged_df = pd.merge(trx_df, fraud_labels_series, left_on='id', right_index=True, how='left')
merged_df.fillna({'is_fraud': 'No'}, inplace=True)
merged_df['is_fraud'] = merged_df['is_fraud'].map({'Yes': 1, 'No': 0})
merged_df = merged_df.dropna()

# load card data
card_df = pd.read_csv(f'{dir}/cards_data.csv')
card_df = card_df.replace('nan', np.nan).dropna()
card_df = card_df[card_df['card_on_dark_web'] == 'No']
card_df = card_df.drop(columns=['acct_open_date', 'card_number', 'expires', 'cvv', 'card_on_dark_web'], axis='columns')
card_df['credit_limit'] = card_df['credit_limit'].apply(sanitize_df)

# load user data
user_df = pd.read_csv(f'{dir}/users_data.csv')
user_df = user_df.drop(columns=['birth_year', 'birth_month', 'address', 'latitude', 'longitude'], axis='columns')
user_df = user_df.replace('nan', np.nan).dropna()
user_df['per_capita_income'] = user_df['per_capita_income'].apply(sanitize_df)
user_df['yearly_income'] = user_df['yearly_income'].apply(sanitize_df)
user_df['total_debt'] = user_df['total_debt'].apply(sanitize_df)

# merge transaction and card data
merged_df = pd.merge(left=merged_df, right=card_df, left_on='card_id', right_on='id', how='inner')
merged_df = pd.merge(left=merged_df, right=user_df, left_on='client_id_x', right_on='id', how='inner')
merged_df = merged_df.drop(columns=['id_x', 'client_id_x', 'card_id', 'merchant_id', 'id_y', 'client_id_y', 'id'], axis='columns')
merged_df = merged_df.dropna()

# finalize the dataframe
categorical_cols = merged_df.select_dtypes(include=['object']).columns
df = merged_df.copy()
df = pd.get_dummies(df, columns=categorical_cols, dummy_na=False, dtype=float)
df = df.dropna()
print('Base data frame: n', df.head(n=3))

b) Preprocessing
From the bottom DataFrame, we’ll select suitable input features with:
continuous values, and seemingly linear relationship with transaction amount.

df = df[df['is_fraud'] == 0]
df = df[['amount', 'per_capita_income', 'yearly_income', 'credit_limit', 'credit_score', 'current_age']]

Then, we’ll filter outliers beyond 3 standard deviations away from the mean:

def filter_outliers(df, column, std_threshold) -> pd.DataFrame:
    mean = df[column].mean()
    std = df[column].std()
    upper_bound = mean + std_threshold * std
    lower_bound = mean - std_threshold * std
    filtered_df = df[(df[column] <= upper_bound) | (df[column] >= lower_bound)]
    return filtered_df

df = df.replace(to_replace='NaN', value=0)
df = filter_outliers(df=df, column='amount', std_threshold=3)
df = filter_outliers(df=df, column='per_capita_income', std_threshold=3)
df = filter_outliers(df=df, column='credit_limit', std_threshold=3)

Finally, we’ll take the logarithm of the goal value amount to mitigate skewed distribution:

df['amount'] = df['amount'] + 1
df['amount_log'] = np.log(df['amount'])
df = df.drop(columns=['amount'], axis='columns')
df = df.dropna()

*Added one to amount to avoid negative infinity in amount_log column.

Final DataFrame:


c) Transformer
Now, we are able to split and transform the ultimate DataFrame into train/test datasets:

categorical_features = X.select_dtypes(include=['object']).columns.tolist()
categorical_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='most_frequent')),('onehot', OneHotEncoder(handle_unknown='ignore'))])

numerical_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
numerical_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler())])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)


X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)

2. Defining Batch GD Regresser

class BatchGradientDescentLinearRegressor:
    def __init__(self, learning_rate=0.01, n_iterations=1000, l2_penalty=0.01, tol=1e-4, patience=10):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.l2_penalty = l2_penalty
        self.tol = tol
        self.patience = patience
        self.weights = None
        self.bias = None
        self.history = {'loss': [], 'grad_norm': [], 'weight':[], 'bias': [], 'val_loss': []}
        self.best_weights = None
        self.best_bias = None
        self.best_val_loss = float('inf')
        self.epochs_no_improve = 0

    def _mse_loss(self, y_true, y_pred, weights):
        m = len(y_true)
        loss = (1 / (2 * m)) * np.sum((y_pred - y_true)**2)
        l2_term = (self.l2_penalty / (2 * m)) * np.sum(weights**2)
        return loss + l2_term

    def fit(self, X_train, y_train, X_val=None, y_val=None):
        n_samples, n_features = X_train.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        for i in range(self.n_iterations):
            y_pred = np.dot(X_train, self.weights) + self.bias
        
            dw = (1 / n_samples) * np.dot(X_train.T, (y_pred - y_train)) + (self.l2_penalty / n_samples) * self.weights
            db = (1 / n_samples) * np.sum(y_pred - y_train)

            loss = self._mse_loss(y_train, y_pred, self.weights)
            gradient = np.concatenate([dw, [db]])
            grad_norm = np.linalg.norm(gradient)

            # update history
            self.history['weight'].append(self.weights[0])
            self.history['loss'].append(loss)
            self.history['grad_norm'].append(grad_norm)
            self.history['bias'].append(self.bias)

            # descent
            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db

            if X_val shouldn't be None and y_val shouldn't be None:
                val_y_pred = np.dot(X_val, self.weights) + self.bias
                val_loss = self._mse_loss(y_val, val_y_pred, self.weights)
                self.history['val_loss'].append(val_loss)

                if val_loss < self.best_val_loss - self.tol:
                    self.best_val_loss = val_loss
                    self.best_weights = self.weights.copy()
                    self.best_bias = self.bias
                    self.epochs_no_improve = 0
                else:
                    self.epochs_no_improve += 1
                    if self.epochs_no_improve >= self.patience:
                        print(f"Early stopping at iteration {i+1} (validation loss didn't improve for {self.patience} epochs)")
                        self.weights = self.best_weights
                        self.bias = self.best_bias
                        break

            if (i + 1) % 100 == 0:
                print(f"Iteration {i+1}/{self.n_iterations}, Loss: {loss:.4f}", end="")
                if X_val shouldn't be None:
                    print(f", Validation Loss: {val_loss:.4f}")
                else:
                    pass

    def predict(self, X_test):
        return np.dot(X_test, self.weights) + self.bias

3. Prediction & Assessment

model = BatchGradientDescentLinearRegressor(learning_rate=0.001, n_iterations=10000, l2_penalty=0, tol=1e-5, patience=5)
model.fit(X_train_processed, y_train.values)
y_pred = model.predict(X_test_processed)

Output:
Of the five input features, per_capita_income showed the best correlation with the transaction amount:

Mean Squared Error (MSE): 1.5752
R-squared: 0.0206
Mean Absolute Error (MAE): 1.0472

Time complexity: Training: O(n²m+n³) + Prediction: O(n)
Space complexity: O(nm)
(m: training example size, n: input feature size, assuming m >>> n)


Stochastic Gradient Descent

Batch GD uses to compute gradient in each iteration step (epoch), which is computationally expensive especially when we now have hundreds of thousands of dataset.

Stochastic Gradient Descent (SGD) then again,

  1. typically shuffles the training data at first of every epoch,
  2. randomly select a training example in each iteration,
  3. calculates the gradient using the instance, and
  4. updates the model’s weights and bias .

This ends in many weight updates per epoch (equal to the number of coaching samples), many quick and computationally low-cost updates based on individual data points, allowing it to iterate through large datasets much faster.

Simulation

Much like Batch GD, we’ll define the SGD class and run the prediction:

class StochasticGradientDescentLinearRegressor:
    def __init__(self, learning_rate=0.01, n_iterations=100, l2_penalty=0.01, random_state=None):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.l2_penalty = l2_penalty
        self.random_state = random_state
        self._rng = np.random.default_rng(seed=random_state)
        self.weights_history = []
        self.bias_history = []
        self.loss_history = []
        self.weights = None
        self.bias = None

    def _mse_loss_single(self, y_true, y_pred):
        return 0.5 * (y_pred - y_true)**2

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = self._rng.random(n_features)
        self.bias = 0.0

        for epoch in range(self.n_iterations):
            permutation = self._rng.permutation(n_samples)
            X_shuffled = X[permutation]
            y_shuffled = y[permutation]

            epoch_loss = 0
            for i in range(n_samples):
                xi = X_shuffled[i]
                yi = y_shuffled[i]

                y_pred = np.dot(xi, self.weights) + self.bias
                dw = xi * (y_pred - yi) + self.l2_penalty * self.weights
                db = y_pred - yi

                self.weights -= self.learning_rate * dw
                self.bias -= self.learning_rate * db
                epoch_loss += self._mse_loss_single(yi, y_pred)

                if n_features >= 2:
                    self.weights_history.append(self.weights[:2].copy())
                elif n_features == 1:
                    self.weights_history.append(np.array([self.weights[0], 0]))
                self.bias_history.append(self.bias)
                self.loss_history.append(self._mse_loss_single(yi, y_pred) + (self.l2_penalty / (2 * n_samples)) * (np.sum(self.weights**2) + self.bias**2)) # Approx L2

            print(f"Epoch {epoch+1}/{self.n_iterations}, Loss: {epoch_loss/n_samples:.4f}")

    def predict(self, X):
        return np.dot(X, self.weights) + self.bias

model = StochasticGradientDescentLinearRegressor(learning_rate=0.001, n_iterations=200, random_state=42)
model.fit(X=X_train_processed, y=y_train.values)
y_pred = model.predict(X_test_processed)

Output:


Left: Weight by input features, Right: Cost function (=0.001, i=200, m=50,000, n=5)

SGD introduced randomness into the optimization process (fig. right).

This may help the algorithm jump out of shallow local minima or saddle points and potentially find higher regions of the parameter space.

Results:
Mean Squared Error (MSE): 1.5808
R-squared: 0.0172
Mean Absolute Error (MAE): 1.0475

Time complexity: Training: O(n²m+n³) + Prediction: O(n)
Space complexity: O(n) < BGD: O(nm)
(m: training example size, n: input feature size, assuming m >>> n)


Conclusion

While the easy linear model is computationally efficient, its inherent simplicity often prevents it from capturing complex relationships throughout the data.

Considering the trade-offs of varied modeling approaches against specific objectives is crucial for achieving optimal results.


Reference

All images, unless otherwise noted, are by the creator.

The article utilizes synthetic data, licensed under Apache 2.0 for industrial use.


Creator: Kuriko IWAI

Portfolio / LinkedIn / Github

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x