An ML tale: From notebook to production

by
Tags: , , ,
Category: , , ,

Data Scientists spend their days working in Jupyter notebooks, which are then passed to an implementation team to prepare for production. This post guides you through that process, emphasizing iterative refinement.

I will be using the scikit-learn and XGBoost libraries, but other ML libraries could be swapped in. While scikit-learn offers a comprehensive library of algorithms and tools that simplify data analysis and model building, it doesn’t impose a strict framework for organizing projects. This flexibility can be advantageous, but it often requires users to develop their own project structure and best practices. My goal is to help establish a consistent and effective workflow when using machine learning libraries that can be used from training to production.

This article uses the titanic dataset from https://www.kaggle.com/competitions/titanic. Learning ML can be tough without a goal and Kaggle provides challenges which will help give purpose while you hone your skills.

The viewpoint below is from a data engineer and most projects take quite a bit of data investigation before getting into the actual machine learning. I have skipped that investigation to keep things focused on getting to production.

Evolution Index:

Jupyter

Most machine learning projects start with a Jupyter notebook where the data is sorted out along with experimenting with different models to determine where to start.

Jupyter Notebook

The notebook has some disadvantages though:

  • Hard to track changes when tuning features and hyperparameters
  • Modularization is possible, but keeping track of what is active code is difficult
  • Running on a remote machine needs more configuration and maintenance

Some data scientists are wizards in Jupyter notebooks so they may be able to overcome these challenges. I am not one.

Script

Once I reached my limits in the notebook I started to develop the code into a python script. The script limits the interactive computing the notebook gives, but increases the reproducibility of the experiment. The code is now executed as a single unit, eliminating the need to run each code block individually after changes.

# titanic_prediction_step_1.py
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.model_selection import cross_val_score

dataset = pd.read_csv('data/train.csv')

# Training segment
X = dataset.iloc[:, [2, 4, 5, 6, 7, 9, 10, 11]]
y = dataset.iloc[:, 1:2]

# Apply the fit_transform method on the instance of ColumnTransformer
ct = ColumnTransformer(transformers=[('encoder', OneHotEncoder(
    handle_unknown='ignore'), [1, 6, 7])], remainder='passthrough')
X_encoded = ct.fit_transform(X.values).toarray()

# placeholder if encoding is ever needed
y_encoded = y.values

X_train_encoded, X_test_encoded, y_train_encoded, y_test_encoded = train_test_split(
    X_encoded, y_encoded, test_size=0.2, random_state=0)

classifier = XGBClassifier(n_estimators=500, learning_rate=0.01)
classifier.fit(X=X_train_encoded, y=y_train_encoded)

# Accuracy Scoring
accuracies = cross_val_score(
    estimator=classifier, X=X_test_encoded, y=y_test_encoded, cv=10)
print("Accuracy: {:.2f} %".format(accuracies.mean()*100))
print("Standard Deviation: {:.2f} %".format(accuracies.std()*100))

# Prediction
# need to change column indexes because of column index selector
dataset = pd.read_csv('data/test.csv')
X = dataset.iloc[:, [1, 3, 4, 5, 6, 8, 9, 10]]
X_encoded = ct.transform(X.values).toarray()
pd.DataFrame(
    {
        'PassengerId': dataset.iloc[:, 0],
        'Survived': classifier.predict(X_encoded)
    }
).to_csv('predictions/submission.csv', index=False)

 

For all subsequent iterations, I highly recommend beginning to track your project using Git or another version control system. Implementing version control will significantly streamline the debugging process, allowing you to easily trace and understand changes made to the model over time. Version control features, such as commit hashes and tags, can be integrated to effectively track and manage saved models.

Object

The python script can be hard to understand as written; let’s reduce the main script to just the essentials, and move everything else into an object.

# titanic_prediction_step_2.py
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from model_base import model_base

# Training
dataset = pd.read_csv('data/train.csv')
dependent_variable = 'Survived'
excluded_fields = ['PassengerId', 'Ticket', 'Name']
category_fields = ['Sex', 'Cabin', 'Embarked']
titanic_model = model_base(
    dependent_variable, excluded_fields, category_fields)
titanic_model.train(dataset)

# Prediction
test_dataset = pd.read_csv('data/test.csv')

titanic_model_predict = model_base(
    dependent_variable, excluded_fields, category_fields)
pd.DataFrame(
    {
        'PassengerId': test_dataset.iloc[:, 0],
        'Survived': titanic_model_predict.predict_dataset(test_dataset)['y']
    }
).to_csv('predictions/submission_2.csv', index=False)

The main script now contains configuration variables that are easier to manage and track. Multiple versions of the models can be ran with minimal redundant code.

By leveraging the object-oriented strategy, we can decompose each step of the model into distinct object methods. These methods can then be developed and tested independently, providing greater transparency into the specific operations each step performs on the data.

Methods in model_base:

training_prepare Ingest the dataset and divide the dataset into the independent and dependent features
exclude_fields Instead of manipulating the original dataset, the features are dynamically filtered
split_training_and_test Split the dataset into train and test. This can also be used for specific splitting like time based splitting.
transform Method to do any ML specific transformations like one hot encoding or label encoding
fit Allows configuration of the ML model and training(fitting)
predict Splitting the prediction away from the model training allows the method to be used later for standalone prediction with a different data set
analytics This could develop into many methods and report generation. Currently it just reports the accuracy score for simplicity.
train and predict_dataset These are the command methods that calls the other methods in sequence. Using command methods allows different sequences of operations without manipulating the other methods.
save_model and load_model Utility functions to save all the models and accessory data structures(one hot encodings)
# model_base.py
import os
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
import joblib


class model_base():
    def __init__(self, dependent_variable: str = None, excluded_fields: list[str] = [], category_fields: list[str] = []):
        self.dependent_field = dependent_variable
        self.excluded_fields = excluded_fields
        self.category_fields = category_fields

    def training_prepare(self, dataset: pd.DataFrame) -> bool:
        if self.dependent_field is None:
            return False
        self.dataset = {}
        self.dataset['X'] = dataset.loc[:, ~
                                        dataset.columns.isin([self.dependent_field])]
        self.dataset['y'] = dataset[[self.dependent_field]]

    def exclude_fields(self) -> None:
        self.dataset['X'] = self.dataset['X'].loc[:, ~
                                                  self.dataset['X'].columns.isin(self.excluded_fields)]

    def split_training_and_test(self, ratio: int = 0.2) -> None:
        self.training_set = {}
        self.test_set = {}
        self.training_set['X'], self.test_set['X'], self.training_set['y'], self.test_set['y'] = train_test_split(
            self.transformed_set['X'], self.transformed_set['y'], test_size=ratio, random_state=0)

    def transform(self) -> None:
        self.transformed_set = {}
        if hasattr(self, 'onehot_encoder') is False:
            self.onehot_encoder = ColumnTransformer(transformers=[('encoder', OneHotEncoder(
                handle_unknown='ignore'), self.category_fields)], remainder='passthrough')
            self.transformed_set['X'] = self.onehot_encoder.fit_transform(
                self.dataset['X']).toarray()
        else:  # need to use the original encoder without fitting
            self.transformed_set['X'] = self.onehot_encoder.transform(
                self.dataset['X']).toarray()

        # do not run on predictions since dependent variable is not there
        if 'y' in self.dataset:
            # placeholder if encoding is ever needed
            self.transformed_set['y'] = self.dataset['y'].values

    def fit(self) -> None:
        if hasattr(self, 'model') is False:
            self.model = XGBClassifier(n_estimators=500, learning_rate=0.01)
        self.model.fit(X=self.training_set['X'], y=self.training_set['y'])

    def predict(self, test_set: list = None) -> list:
        self.predictions = {}
        if test_set is None:
            test_set = self.transformed_set['X']
        if hasattr(self, 'model') is False:
            raise Exception('No model has been loaded')
        self.predictions['y'] = self.model.predict(test_set).tolist()

    def analytics(self) -> None:
        accuracies = cross_val_score(
            estimator=self.model, X=self.test_set['X'], y=self.test_set['y'], cv=10)
        print("Accuracy: {:.2f} %".format(accuracies.mean()*100))
        print("Standard Deviation: {:.2f} %".format(accuracies.std()*100))

    def train(self, dataset: pd.DataFrame) -> None:
        self.training_prepare(dataset)
        self.exclude_fields()
        self.transform()
        self.split_training_and_test()
        self.fit()
        self.analytics()
        self.save_model()

    def predict_dataset(self, dataset: pd.DataFrame) -> list:
        self.load_model()
        self.dataset = {'X': dataset}
        self.exclude_fields()
        self.transform()
        self.predict()
        return self.predictions

    def save_model(self) -> None:
        os.makedirs('models', exist_ok=True)
        joblib.dump(self.model, 'models/model.pkl.compressed', compress=True)
        joblib.dump(self.onehot_encoder,
                    'models/onehotencoder.pkl.compressed', compress=True)

    def load_model(self) -> None:
        self.model = joblib.load('models/model.pkl.compressed')
        self.onehot_encoder = joblib.load(
            'models/onehotencoder.pkl.compressed')

The object strategy allows for more structure and control to be introduced compared to the script. It also allows different models and methods to be introduced relatively easy by reusing code through inheritance which is covered later.

You may notice in the main script the model class is called twice, once for training and once for prediction. Saving the model to disk and reloading for prediction is not needed when only local testing, but it is good practice to start saving it out for future production use. Currently the model and one hot encoder both need to be saved. Any additional models or transformers that need to be persisted will have to be included in the save_model and load_model methods. Another code iteration is needed to simplify this operation.

Pipeline

Similar to breaking down our script into methods, the scikit-learn pipeline is a feature that facilitates chaining together multiple processing steps into a single, cohesive model. Pipelines can then be saved as a single, all-in-one file for easier management and deployment. They allow chaining of transformations and models together which can help when ensemble models are used or many transformations on the dataset are needed. Pipelines also help prevent data leakage by ensuring that all transformations are applied consistently during both training and prediction operations.

If other projects are to use the features, additional steps may be needed to decouple the feature transformations from the model. Pipelines are still useful though as you may start to add additional models and transformations for this specific project.

To change our existing code to utilize pipelines we need to only change out a couple of methods to append steps to the pipeline instead of calling methods directly.

def add_exclusion(self) -> None:
    self.pipeline.steps.append(('exclusion', custom_transformers.ExcludeFields(
        fields_to_exclude=self.excluded_fields)))

def add_encoding(self) -> None:
    preprocessor = ColumnTransformer(
        transformers=[
            ('cat', OneHotEncoder(handle_unknown='ignore'), self.category_fields)
        ],
        remainder='passthrough'  # Keep the rest of the features as is
    )
    self.pipeline.steps.append(('preprocessor', preprocessor))

def add_model(self) -> None:
    self.pipeline.steps.append(
        ('classifier', XGBClassifier(n_estimators=500, learning_rate=0.01)))

We cannot add methods such as training_prepare and split_training_and_test to the pipeline as these are training specific methods and should not be used when predicting.

Another feature of the pipeline is integrating custom transformers instead of using direct methods. Here is a custom transformer that is used to exclude certain fields:

from sklearn.base import BaseEstimator, TransformerMixin

# Custom Transformers
class ExcludeFields(BaseEstimator, TransformerMixin):
    def __init__(self, fields_to_exclude):
        self.fields_to_exclude = fields_to_exclude

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.drop(columns=self.fields_to_exclude)

The last step is to change our class code to integrate the pipeline which includes simplifying the command methods train and predict_dataset. Here is the final code for the class:

# model_base_pipeline.py
import os
import pandas as pd
import json
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import Pipeline
import joblib

import custom_transformers


class model_base_pipeline():
    def __init__(self, dependent_variable: str = None, excluded_fields: list[str] = [], category_fields: list[str] = []):
        self.dependent_field = dependent_variable
        self.excluded_fields = excluded_fields
        self.category_fields = category_fields

        self.pipeline = Pipeline(steps=[])

    def training_prepare(self, dataset: pd.DataFrame) -> bool:
        if self.dependent_field is None:
            return False
        self.dataset = {}
        self.dataset['X'] = dataset.loc[:, ~
                                        dataset.columns.isin([self.dependent_field])]
        self.dataset['y'] = dataset[[self.dependent_field]]

    def split_training_and_test(self, ratio: int = 0.2) -> None:
        self.training_set = {}
        self.test_set = {}
        self.training_set['X'], self.test_set['X'], self.training_set['y'], self.test_set['y'] = train_test_split(
            self.dataset['X'], self.dataset['y'], test_size=ratio, random_state=0)

    def add_exclusion(self) -> None:
        self.pipeline.steps.append(('exclusion', custom_transformers.ExcludeFields(
            fields_to_exclude=self.excluded_fields)))

    def add_encoding(self) -> None:
        preprocessor = ColumnTransformer(
            transformers=[
                ('cat', OneHotEncoder(handle_unknown='ignore'), self.category_fields)
            ],
            remainder='passthrough'  # Keep the rest of the features as is
        )
        self.pipeline.steps.append(('preprocessor', preprocessor))

    def add_model(self) -> None:
        self.pipeline.steps.append(
            ('classifier', XGBClassifier(n_estimators=500, learning_rate=0.01)))

    def predict(self, test_set: list = None) -> list:
        self.predictions = {}
        if test_set is None:
            test_set = self.dataset['X']
        if hasattr(self, 'pipeline') is False:
            raise Exception('No pipeline has been loaded')
        self.predictions['y'] = self.pipeline.predict(test_set).tolist()

    def analytics(self) -> None:
        accuracies = cross_val_score(
            estimator=self.pipeline, X=self.test_set['X'], y=self.test_set['y'], cv=10)
        print("Accuracy: {:.2f} %".format(accuracies.mean()*100))
        print("Standard Deviation: {:.2f} %".format(accuracies.std()*100))

    def train(self, dataset: pd.DataFrame) -> None:
        self.training_prepare(dataset)
        self.add_exclusion()
        self.add_encoding()
        self.add_model()
        self.split_training_and_test()
        self.pipeline.fit(self.training_set['X'], self.training_set['y'])
        self.analytics()
        self.save_model()

    def predict_dataset(self, dataset: pd.DataFrame) -> list:
        self.load_model()
        self.dataset = {'X': dataset}
        self.predict()
        return self.predictions

    def output_config(self) -> dict:
        return {
            'steps': [f'{step[0]}: {str(step[1])}' for step in self.pipeline.steps],
            'columns': self.pipeline.named_steps['exclusion'].get_feature_names_out()
        }

    def save_model(self) -> None:
        os.makedirs('models', exist_ok=True)
        joblib.dump(self.pipeline,
                    'models/pipeline.pkl.compressed', compress=True)
        with open('models/pipeline.config.json', 'w') as file:
            json.dump(self.output_config(), file, indent=4)

    def load_model(self) -> None:
        self.pipeline = joblib.load('models/pipeline.pkl.compressed')

The only modifications to the base script is to call the new model model_base_pipeline instead of model_base.

Inheritance

Inheritance is a further enhancement to keep redundant code to a minimum while exploring other models and transformers. Here is an example replacing the Xgboost classifier ML model with the HistGradientBoostingClassifier ML model.

# model_histgradientboosting.py
from sklearn.ensemble import HistGradientBoostingClassifier
from model_base_pipeline import model_base_pipeline
import custom_transformers

class model_decisiontree(model_base_pipeline):
    def add_model(self):
        self.pipeline.steps.append(('denser', custom_transformers.ArrayTransformer()))
        self.pipeline.steps.append(('classifier', HistGradientBoostingClassifier()))

The first appended step above is a custom transformer that is needed to change the data to an array. Let’s add the transformation to our custom transformers file:

# custom_transformers.py
from sklearn.base import BaseEstimator, TransformerMixin

class ExcludeFields(BaseEstimator, TransformerMixin):
    def __init__(self, fields_to_exclude):
        self.fields_to_exclude = fields_to_exclude

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.drop(columns=self.fields_to_exclude)
    
class ArrayTransformer(TransformerMixin):

    def fit(self, X, y=None, **fit_params):
        return self

    def transform(self, X, y=None, **fit_params):
        return X.toarray()

Now our main script can be changed to replace the existing model or run a second model to compare. I modified it to run both to compare performance and output both results.

#titanic_prediction_step_4.py
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from model_base_pipeline import model_base_pipeline
from model_histgradientboosting import model_histgradientboosting

# Training
dataset = pd.read_csv('data/train.csv')
dependent_variable = 'Survived'
excluded_fields = ['PassengerId', 'Ticket', 'Name']
category_fields = ['Sex', 'Cabin', 'Embarked']

test_dataset = pd.read_csv('data/test.csv')

# first model to test
print('XGBoost Results:')
titanic_model_xgboost = model_base_pipeline(
    dependent_variable, excluded_fields, category_fields)
titanic_model_xgboost.train(dataset)

titanic_model_xgboost_predict = model_base_pipeline(
    dependent_variable, excluded_fields, category_fields)
pd.DataFrame(
    {
        'PassengerId': test_dataset.iloc[:, 0],
        'Survived': titanic_model_xgboost_predict.predict_dataset(test_dataset)['y']
    }
).to_csv('predictions/submission_4_1.csv', index=False)

# second model to test
print('HistGradientBoostingClassifier Results:')
titanic_model_histgradient = model_histgradientboosting(
    dependent_variable, excluded_fields, category_fields)
titanic_model_histgradient.train(dataset)

# Prediction
titanic_model_histgradient_predict = model_histgradientboosting(
    dependent_variable, excluded_fields, category_fields)
pd.DataFrame(
    {
        'PassengerId': test_dataset.iloc[:, 0],
        'Survived': titanic_model_histgradient_predict.predict_dataset(test_dataset)['y']
    }
).to_csv('predictions/submission_4_2.csv', index=False)

% python titanic_prediction_step_4.py
HistGradientBoostingClassifier Results:
Accuracy: 82.71 %
Standard Deviation: 7.59 %

HistGradientBoostingClassifier Results:
Accuracy: 82.16 %
Standard Deviation: 6.41 %

Debugging

The ease of adding steps to the scikit-learn pipeline simplifies the process of saving the entire model, eliminating the need to individually save each persistent component. Additionally, the pipeline structure enhances debugging by providing a clear, step-by-step framework. Below is an example of a custom transformer that outputs information before or after a pipeline step:

# Custom transformer for debugging
class DebugTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, message):
        self.message = message
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        print(f"{self.message} - Shape: {X.shape}")
        print(X.head())  # Print the first few rows for a quick check
        return X

Example of calling the custom transformer:

self.pipeline.steps.append(('exclusion', custom_transformers.ExcludeFields(
    fields_to_exclude=self.excluded_fields)))
self.pipeline.steps.append(('post_exclusion', custom_transformers.DebugTransformer(
    message="Post exclusion data dump")))

Further debugging and model clarification can be done by outputting the pipeline steps to the screen or a file to accompany the pipeline save file. Several code changes can be made to accomplish this.

Modifying save_model to output a save config file allows us to tag each model with any data that pertains to that particular model. I further extended the modularization by creating an info method that can be overridden in other classes.

    
def output_config(self):
    return {
        'steps': [f'{step[0]}: {str(step[1])}' for step in self.pipeline.steps],
        'columns': self.pipeline.named_steps['exclusion'].get_feature_names_out()
    }
def save_model(self):
    os.makedirs('models', exist_ok=True)
    joblib.dump(self.pipeline, 'models/pipeline.pkl.compressed', compress=True)
    with open('models/pipeline.config.json', 'w') as file:
        json.dump(self.output_config(), file, indent=4)

If you notice our custom transformer now needs the method get_feature_names_out. We can add that quite easily:

class ExcludeFields(BaseEstimator, TransformerMixin):
    def __init__(self, fields_to_exclude):
        self.fields_to_exclude = fields_to_exclude

    def fit(self, X, y=None):
        self.feature_names_in_ = X.columns.tolist()
        self.feature_names_out_ = [
            col for col in self.feature_names_in_ if col not in self.fields_to_exclude]
        return self

    def transform(self, X):
        return X.drop(columns=self.fields_to_exclude)

    def get_feature_names_out(self, input_features=None):
        return self.feature_names_out_

Here are the contents of the json file and we can see the steps parameter needs some formatting help. The output_config could be modified to help structure this more.

{
    "steps": [
        "exclusion: ExcludeFields(fields_to_exclude=['PassengerId', 'Ticket', 'Name'])",
        "preprocessor: ColumnTransformer(remainder='passthrough',\n                  transformers=[('cat', OneHotEncoder(handle_unknown='ignore'),\n                                 ['Sex', 'Cabin', 'Embarked'])])",
        "classifier: XGBClassifier(base_score=None, booster=None, callbacks=None,\n              colsample_bylevel=None, colsample_bynode=None,\n              colsample_bytree=None, device=None, early_stopping_rounds=None,\n              enable_categorical=False, eval_metric=None, feature_types=None,\n              gamma=None, grow_policy=None, importance_type=None,\n              interaction_constraints=None, learning_rate=0.01, max_bin=None,\n              max_cat_threshold=None, max_cat_to_onehot=None,\n              max_delta_step=None, max_depth=None, max_leaves=None,\n              min_child_weight=None, missing=nan, monotone_constraints=None,\n              multi_strategy=None, n_estimators=500, n_jobs=None,\n              num_parallel_tree=None, random_state=None, ...)"
    ],
    "columns": [
        "Pclass",
        "Sex",
        "Age",
        "SibSp",
        "Parch",
        "Fare",
        "Cabin",
        "Embarked"
    ]
}

The Jupyter notebook we initially started with has now evolved into a structured template that is easy to extend with additional functionality, traceable, and adaptable for various purposes. Using the same code base for both training and prediction ensures consistency, making it easier to trace and address any introduced anomalies. Since production use can take many forms, I leave room for future enhancements.

Further Iterations

  • Create a library of the model classes so that it can be pulled into other projects and uploaded to a package repository
  • Create a script to batch predict or create an API using a library like Fast API to do on the fly predictions
  • Add the git commit tag to the model saved config and uniquely name the model for better tracking
  • Use a framework like Data Version Control to further enhance tracking of model performance
  • Create a docker image that contains the library and saved pipeline along with a script to perform the action needed for easy deployment.

Evolved?

After all these code modifications, what have we achieved?

Enhancement What does this mean to production
Several models can be ran without copying or modifying many lines of code Minimal changes to production active code
Pipeline persistence with metadata Saved pipelines can be stored in a different repository and can be recalled using the attached metadata
Tracking of models and code changes Experimentation is key to finding the optimal model and configuration and keeping track of each experiment helps give a broader view of change.
Standardization of metadata and analytical methods Consistent analytics and metrics allows for easier research into which models perform the best

All of the code used in this post is available on Github at https://github.com/chariotsolutions/examples/tree/main/20240624-an-ml-tale-from-notebook-to-production