Data Scientists spend their days working in Jupyter notebooks, which are then passed to an implementation team to prepare for production. This post guides you through that process, emphasizing iterative refinement.
I will be using the scikit-learn and XGBoost libraries, but other ML libraries could be swapped in. While scikit-learn offers a comprehensive library of algorithms and tools that simplify data analysis and model building, it doesn’t impose a strict framework for organizing projects. This flexibility can be advantageous, but it often requires users to develop their own project structure and best practices. My goal is to help establish a consistent and effective workflow when using machine learning libraries that can be used from training to production.
This article uses the titanic dataset from https://www.kaggle.com/competitions/titanic. Learning ML can be tough without a goal and Kaggle provides challenges which will help give purpose while you hone your skills.
The viewpoint below is from a data engineer and most projects take quite a bit of data investigation before getting into the actual machine learning. I have skipped that investigation to keep things focused on getting to production.
Evolution Index:
Jupyter
Most machine learning projects start with a Jupyter notebook where the data is sorted out along with experimenting with different models to determine where to start.
The notebook has some disadvantages though:
- Hard to track changes when tuning features and hyperparameters
- Modularization is possible, but keeping track of what is active code is difficult
- Running on a remote machine needs more configuration and maintenance
Some data scientists are wizards in Jupyter notebooks so they may be able to overcome these challenges. I am not one.
Script
Once I reached my limits in the notebook I started to develop the code into a python script. The script limits the interactive computing the notebook gives, but increases the reproducibility of the experiment. The code is now executed as a single unit, eliminating the need to run each code block individually after changes.
# titanic_prediction_step_1.py import numpy as np import matplotlib.pyplot as plt import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OneHotEncoder from sklearn.model_selection import train_test_split from xgboost import XGBClassifier from sklearn.model_selection import cross_val_score dataset = pd.read_csv('data/train.csv') # Training segment X = dataset.iloc[:, [2, 4, 5, 6, 7, 9, 10, 11]] y = dataset.iloc[:, 1:2] # Apply the fit_transform method on the instance of ColumnTransformer ct = ColumnTransformer(transformers=[('encoder', OneHotEncoder( handle_unknown='ignore'), [1, 6, 7])], remainder='passthrough') X_encoded = ct.fit_transform(X.values).toarray() # placeholder if encoding is ever needed y_encoded = y.values X_train_encoded, X_test_encoded, y_train_encoded, y_test_encoded = train_test_split( X_encoded, y_encoded, test_size=0.2, random_state=0) classifier = XGBClassifier(n_estimators=500, learning_rate=0.01) classifier.fit(X=X_train_encoded, y=y_train_encoded) # Accuracy Scoring accuracies = cross_val_score( estimator=classifier, X=X_test_encoded, y=y_test_encoded, cv=10) print("Accuracy: {:.2f} %".format(accuracies.mean()*100)) print("Standard Deviation: {:.2f} %".format(accuracies.std()*100)) # Prediction # need to change column indexes because of column index selector dataset = pd.read_csv('data/test.csv') X = dataset.iloc[:, [1, 3, 4, 5, 6, 8, 9, 10]] X_encoded = ct.transform(X.values).toarray() pd.DataFrame( { 'PassengerId': dataset.iloc[:, 0], 'Survived': classifier.predict(X_encoded) } ).to_csv('predictions/submission.csv', index=False)
For all subsequent iterations, I highly recommend beginning to track your project using Git or another version control system. Implementing version control will significantly streamline the debugging process, allowing you to easily trace and understand changes made to the model over time. Version control features, such as commit hashes and tags, can be integrated to effectively track and manage saved models.
Object
The python script can be hard to understand as written; let’s reduce the main script to just the essentials, and move everything else into an object.
# titanic_prediction_step_2.py import numpy as np import matplotlib.pyplot as plt import pandas as pd from model_base import model_base # Training dataset = pd.read_csv('data/train.csv') dependent_variable = 'Survived' excluded_fields = ['PassengerId', 'Ticket', 'Name'] category_fields = ['Sex', 'Cabin', 'Embarked'] titanic_model = model_base( dependent_variable, excluded_fields, category_fields) titanic_model.train(dataset) # Prediction test_dataset = pd.read_csv('data/test.csv') titanic_model_predict = model_base( dependent_variable, excluded_fields, category_fields) pd.DataFrame( { 'PassengerId': test_dataset.iloc[:, 0], 'Survived': titanic_model_predict.predict_dataset(test_dataset)['y'] } ).to_csv('predictions/submission_2.csv', index=False)
The main script now contains configuration variables that are easier to manage and track. Multiple versions of the models can be ran with minimal redundant code.
By leveraging the object-oriented strategy, we can decompose each step of the model into distinct object methods. These methods can then be developed and tested independently, providing greater transparency into the specific operations each step performs on the data.
Methods in model_base
:
training_prepare | Ingest the dataset and divide the dataset into the independent and dependent features |
exclude_fields | Instead of manipulating the original dataset, the features are dynamically filtered |
split_training_and_test | Split the dataset into train and test. This can also be used for specific splitting like time based splitting. |
transform | Method to do any ML specific transformations like one hot encoding or label encoding |
fit | Allows configuration of the ML model and training(fitting) |
predict | Splitting the prediction away from the model training allows the method to be used later for standalone prediction with a different data set |
analytics | This could develop into many methods and report generation. Currently it just reports the accuracy score for simplicity. |
train and predict_dataset | These are the command methods that calls the other methods in sequence. Using command methods allows different sequences of operations without manipulating the other methods. |
save_model and load_model | Utility functions to save all the models and accessory data structures(one hot encodings) |
# model_base.py import os import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OneHotEncoder from sklearn.model_selection import train_test_split from xgboost import XGBClassifier from sklearn.model_selection import cross_val_score from sklearn.preprocessing import LabelEncoder import joblib class model_base(): def __init__(self, dependent_variable: str = None, excluded_fields: list[str] = [], category_fields: list[str] = []): self.dependent_field = dependent_variable self.excluded_fields = excluded_fields self.category_fields = category_fields def training_prepare(self, dataset: pd.DataFrame) -> bool: if self.dependent_field is None: return False self.dataset = {} self.dataset['X'] = dataset.loc[:, ~ dataset.columns.isin([self.dependent_field])] self.dataset['y'] = dataset[[self.dependent_field]] def exclude_fields(self) -> None: self.dataset['X'] = self.dataset['X'].loc[:, ~ self.dataset['X'].columns.isin(self.excluded_fields)] def split_training_and_test(self, ratio: int = 0.2) -> None: self.training_set = {} self.test_set = {} self.training_set['X'], self.test_set['X'], self.training_set['y'], self.test_set['y'] = train_test_split( self.transformed_set['X'], self.transformed_set['y'], test_size=ratio, random_state=0) def transform(self) -> None: self.transformed_set = {} if hasattr(self, 'onehot_encoder') is False: self.onehot_encoder = ColumnTransformer(transformers=[('encoder', OneHotEncoder( handle_unknown='ignore'), self.category_fields)], remainder='passthrough') self.transformed_set['X'] = self.onehot_encoder.fit_transform( self.dataset['X']).toarray() else: # need to use the original encoder without fitting self.transformed_set['X'] = self.onehot_encoder.transform( self.dataset['X']).toarray() # do not run on predictions since dependent variable is not there if 'y' in self.dataset: # placeholder if encoding is ever needed self.transformed_set['y'] = self.dataset['y'].values def fit(self) -> None: if hasattr(self, 'model') is False: self.model = XGBClassifier(n_estimators=500, learning_rate=0.01) self.model.fit(X=self.training_set['X'], y=self.training_set['y']) def predict(self, test_set: list = None) -> list: self.predictions = {} if test_set is None: test_set = self.transformed_set['X'] if hasattr(self, 'model') is False: raise Exception('No model has been loaded') self.predictions['y'] = self.model.predict(test_set).tolist() def analytics(self) -> None: accuracies = cross_val_score( estimator=self.model, X=self.test_set['X'], y=self.test_set['y'], cv=10) print("Accuracy: {:.2f} %".format(accuracies.mean()*100)) print("Standard Deviation: {:.2f} %".format(accuracies.std()*100)) def train(self, dataset: pd.DataFrame) -> None: self.training_prepare(dataset) self.exclude_fields() self.transform() self.split_training_and_test() self.fit() self.analytics() self.save_model() def predict_dataset(self, dataset: pd.DataFrame) -> list: self.load_model() self.dataset = {'X': dataset} self.exclude_fields() self.transform() self.predict() return self.predictions def save_model(self) -> None: os.makedirs('models', exist_ok=True) joblib.dump(self.model, 'models/model.pkl.compressed', compress=True) joblib.dump(self.onehot_encoder, 'models/onehotencoder.pkl.compressed', compress=True) def load_model(self) -> None: self.model = joblib.load('models/model.pkl.compressed') self.onehot_encoder = joblib.load( 'models/onehotencoder.pkl.compressed')
The object strategy allows for more structure and control to be introduced compared to the script. It also allows different models and methods to be introduced relatively easy by reusing code through inheritance which is covered later.
You may notice in the main script the model class is called twice, once for training and once for prediction. Saving the model to disk and reloading for prediction is not needed when only local testing, but it is good practice to start saving it out for future production use. Currently the model and one hot encoder both need to be saved. Any additional models or transformers that need to be persisted will have to be included in the save_model and load_model methods. Another code iteration is needed to simplify this operation.
Pipeline
Similar to breaking down our script into methods, the scikit-learn pipeline is a feature that facilitates chaining together multiple processing steps into a single, cohesive model. Pipelines can then be saved as a single, all-in-one file for easier management and deployment. They allow chaining of transformations and models together which can help when ensemble models are used or many transformations on the dataset are needed. Pipelines also help prevent data leakage by ensuring that all transformations are applied consistently during both training and prediction operations.
If other projects are to use the features, additional steps may be needed to decouple the feature transformations from the model. Pipelines are still useful though as you may start to add additional models and transformations for this specific project.
To change our existing code to utilize pipelines we need to only change out a couple of methods to append steps to the pipeline instead of calling methods directly.
def add_exclusion(self) -> None: self.pipeline.steps.append(('exclusion', custom_transformers.ExcludeFields( fields_to_exclude=self.excluded_fields))) def add_encoding(self) -> None: preprocessor = ColumnTransformer( transformers=[ ('cat', OneHotEncoder(handle_unknown='ignore'), self.category_fields) ], remainder='passthrough' # Keep the rest of the features as is ) self.pipeline.steps.append(('preprocessor', preprocessor)) def add_model(self) -> None: self.pipeline.steps.append( ('classifier', XGBClassifier(n_estimators=500, learning_rate=0.01)))
We cannot add methods such as training_prepare and split_training_and_test to the pipeline as these are training specific methods and should not be used when predicting.
Another feature of the pipeline is integrating custom transformers instead of using direct methods. Here is a custom transformer that is used to exclude certain fields:
from sklearn.base import BaseEstimator, TransformerMixin # Custom Transformers class ExcludeFields(BaseEstimator, TransformerMixin): def __init__(self, fields_to_exclude): self.fields_to_exclude = fields_to_exclude def fit(self, X, y=None): return self def transform(self, X): return X.drop(columns=self.fields_to_exclude)
The last step is to change our class code to integrate the pipeline which includes simplifying the command methods train and predict_dataset. Here is the final code for the class:
# model_base_pipeline.py import os import pandas as pd import json from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OneHotEncoder from sklearn.model_selection import train_test_split from xgboost import XGBClassifier from sklearn.model_selection import cross_val_score from sklearn.preprocessing import LabelEncoder from sklearn.pipeline import Pipeline import joblib import custom_transformers class model_base_pipeline(): def __init__(self, dependent_variable: str = None, excluded_fields: list[str] = [], category_fields: list[str] = []): self.dependent_field = dependent_variable self.excluded_fields = excluded_fields self.category_fields = category_fields self.pipeline = Pipeline(steps=[]) def training_prepare(self, dataset: pd.DataFrame) -> bool: if self.dependent_field is None: return False self.dataset = {} self.dataset['X'] = dataset.loc[:, ~ dataset.columns.isin([self.dependent_field])] self.dataset['y'] = dataset[[self.dependent_field]] def split_training_and_test(self, ratio: int = 0.2) -> None: self.training_set = {} self.test_set = {} self.training_set['X'], self.test_set['X'], self.training_set['y'], self.test_set['y'] = train_test_split( self.dataset['X'], self.dataset['y'], test_size=ratio, random_state=0) def add_exclusion(self) -> None: self.pipeline.steps.append(('exclusion', custom_transformers.ExcludeFields( fields_to_exclude=self.excluded_fields))) def add_encoding(self) -> None: preprocessor = ColumnTransformer( transformers=[ ('cat', OneHotEncoder(handle_unknown='ignore'), self.category_fields) ], remainder='passthrough' # Keep the rest of the features as is ) self.pipeline.steps.append(('preprocessor', preprocessor)) def add_model(self) -> None: self.pipeline.steps.append( ('classifier', XGBClassifier(n_estimators=500, learning_rate=0.01))) def predict(self, test_set: list = None) -> list: self.predictions = {} if test_set is None: test_set = self.dataset['X'] if hasattr(self, 'pipeline') is False: raise Exception('No pipeline has been loaded') self.predictions['y'] = self.pipeline.predict(test_set).tolist() def analytics(self) -> None: accuracies = cross_val_score( estimator=self.pipeline, X=self.test_set['X'], y=self.test_set['y'], cv=10) print("Accuracy: {:.2f} %".format(accuracies.mean()*100)) print("Standard Deviation: {:.2f} %".format(accuracies.std()*100)) def train(self, dataset: pd.DataFrame) -> None: self.training_prepare(dataset) self.add_exclusion() self.add_encoding() self.add_model() self.split_training_and_test() self.pipeline.fit(self.training_set['X'], self.training_set['y']) self.analytics() self.save_model() def predict_dataset(self, dataset: pd.DataFrame) -> list: self.load_model() self.dataset = {'X': dataset} self.predict() return self.predictions def output_config(self) -> dict: return { 'steps': [f'{step[0]}: {str(step[1])}' for step in self.pipeline.steps], 'columns': self.pipeline.named_steps['exclusion'].get_feature_names_out() } def save_model(self) -> None: os.makedirs('models', exist_ok=True) joblib.dump(self.pipeline, 'models/pipeline.pkl.compressed', compress=True) with open('models/pipeline.config.json', 'w') as file: json.dump(self.output_config(), file, indent=4) def load_model(self) -> None: self.pipeline = joblib.load('models/pipeline.pkl.compressed')
The only modifications to the base script is to call the new model model_base_pipeline instead of model_base.
Inheritance
Inheritance is a further enhancement to keep redundant code to a minimum while exploring other models and transformers. Here is an example replacing the Xgboost classifier ML model with the HistGradientBoostingClassifier ML model.
# model_histgradientboosting.py from sklearn.ensemble import HistGradientBoostingClassifier from model_base_pipeline import model_base_pipeline import custom_transformers class model_decisiontree(model_base_pipeline): def add_model(self): self.pipeline.steps.append(('denser', custom_transformers.ArrayTransformer())) self.pipeline.steps.append(('classifier', HistGradientBoostingClassifier()))
The first appended step above is a custom transformer that is needed to change the data to an array. Let’s add the transformation to our custom transformers file:
# custom_transformers.py from sklearn.base import BaseEstimator, TransformerMixin class ExcludeFields(BaseEstimator, TransformerMixin): def __init__(self, fields_to_exclude): self.fields_to_exclude = fields_to_exclude def fit(self, X, y=None): return self def transform(self, X): return X.drop(columns=self.fields_to_exclude) class ArrayTransformer(TransformerMixin): def fit(self, X, y=None, **fit_params): return self def transform(self, X, y=None, **fit_params): return X.toarray()
Now our main script can be changed to replace the existing model or run a second model to compare. I modified it to run both to compare performance and output both results.
#titanic_prediction_step_4.py import numpy as np import matplotlib.pyplot as plt import pandas as pd from model_base_pipeline import model_base_pipeline from model_histgradientboosting import model_histgradientboosting # Training dataset = pd.read_csv('data/train.csv') dependent_variable = 'Survived' excluded_fields = ['PassengerId', 'Ticket', 'Name'] category_fields = ['Sex', 'Cabin', 'Embarked'] test_dataset = pd.read_csv('data/test.csv') # first model to test print('XGBoost Results:') titanic_model_xgboost = model_base_pipeline( dependent_variable, excluded_fields, category_fields) titanic_model_xgboost.train(dataset) titanic_model_xgboost_predict = model_base_pipeline( dependent_variable, excluded_fields, category_fields) pd.DataFrame( { 'PassengerId': test_dataset.iloc[:, 0], 'Survived': titanic_model_xgboost_predict.predict_dataset(test_dataset)['y'] } ).to_csv('predictions/submission_4_1.csv', index=False) # second model to test print('HistGradientBoostingClassifier Results:') titanic_model_histgradient = model_histgradientboosting( dependent_variable, excluded_fields, category_fields) titanic_model_histgradient.train(dataset) # Prediction titanic_model_histgradient_predict = model_histgradientboosting( dependent_variable, excluded_fields, category_fields) pd.DataFrame( { 'PassengerId': test_dataset.iloc[:, 0], 'Survived': titanic_model_histgradient_predict.predict_dataset(test_dataset)['y'] } ).to_csv('predictions/submission_4_2.csv', index=False)
% python titanic_prediction_step_4.py HistGradientBoostingClassifier Results: Accuracy: 82.71 % Standard Deviation: 7.59 % HistGradientBoostingClassifier Results: Accuracy: 82.16 % Standard Deviation: 6.41 %
Debugging
The ease of adding steps to the scikit-learn pipeline simplifies the process of saving the entire model, eliminating the need to individually save each persistent component. Additionally, the pipeline structure enhances debugging by providing a clear, step-by-step framework. Below is an example of a custom transformer that outputs information before or after a pipeline step:
# Custom transformer for debugging class DebugTransformer(BaseEstimator, TransformerMixin): def __init__(self, message): self.message = message def fit(self, X, y=None): return self def transform(self, X): print(f"{self.message} - Shape: {X.shape}") print(X.head()) # Print the first few rows for a quick check return X
Example of calling the custom transformer:
self.pipeline.steps.append(('exclusion', custom_transformers.ExcludeFields( fields_to_exclude=self.excluded_fields))) self.pipeline.steps.append(('post_exclusion', custom_transformers.DebugTransformer( message="Post exclusion data dump")))
Further debugging and model clarification can be done by outputting the pipeline steps to the screen or a file to accompany the pipeline save file. Several code changes can be made to accomplish this.
Modifying save_model to output a save config file allows us to tag each model with any data that pertains to that particular model. I further extended the modularization by creating an info method that can be overridden in other classes.
def output_config(self): return { 'steps': [f'{step[0]}: {str(step[1])}' for step in self.pipeline.steps], 'columns': self.pipeline.named_steps['exclusion'].get_feature_names_out() } def save_model(self): os.makedirs('models', exist_ok=True) joblib.dump(self.pipeline, 'models/pipeline.pkl.compressed', compress=True) with open('models/pipeline.config.json', 'w') as file: json.dump(self.output_config(), file, indent=4)
If you notice our custom transformer now needs the method get_feature_names_out. We can add that quite easily:
class ExcludeFields(BaseEstimator, TransformerMixin): def __init__(self, fields_to_exclude): self.fields_to_exclude = fields_to_exclude def fit(self, X, y=None): self.feature_names_in_ = X.columns.tolist() self.feature_names_out_ = [ col for col in self.feature_names_in_ if col not in self.fields_to_exclude] return self def transform(self, X): return X.drop(columns=self.fields_to_exclude) def get_feature_names_out(self, input_features=None): return self.feature_names_out_
Here are the contents of the json file and we can see the steps parameter needs some formatting help. The output_config
could be modified to help structure this more.
{ "steps": [ "exclusion: ExcludeFields(fields_to_exclude=['PassengerId', 'Ticket', 'Name'])", "preprocessor: ColumnTransformer(remainder='passthrough',\n transformers=[('cat', OneHotEncoder(handle_unknown='ignore'),\n ['Sex', 'Cabin', 'Embarked'])])", "classifier: XGBClassifier(base_score=None, booster=None, callbacks=None,\n colsample_bylevel=None, colsample_bynode=None,\n colsample_bytree=None, device=None, early_stopping_rounds=None,\n enable_categorical=False, eval_metric=None, feature_types=None,\n gamma=None, grow_policy=None, importance_type=None,\n interaction_constraints=None, learning_rate=0.01, max_bin=None,\n max_cat_threshold=None, max_cat_to_onehot=None,\n max_delta_step=None, max_depth=None, max_leaves=None,\n min_child_weight=None, missing=nan, monotone_constraints=None,\n multi_strategy=None, n_estimators=500, n_jobs=None,\n num_parallel_tree=None, random_state=None, ...)" ], "columns": [ "Pclass", "Sex", "Age", "SibSp", "Parch", "Fare", "Cabin", "Embarked" ] }
The Jupyter notebook we initially started with has now evolved into a structured template that is easy to extend with additional functionality, traceable, and adaptable for various purposes. Using the same code base for both training and prediction ensures consistency, making it easier to trace and address any introduced anomalies. Since production use can take many forms, I leave room for future enhancements.
Further Iterations
- Create a library of the model classes so that it can be pulled into other projects and uploaded to a package repository
- Create a script to batch predict or create an API using a library like Fast API to do on the fly predictions
- Add the git commit tag to the model saved config and uniquely name the model for better tracking
- Use a framework like Data Version Control to further enhance tracking of model performance
- Create a docker image that contains the library and saved pipeline along with a script to perform the action needed for easy deployment.
Evolved?
After all these code modifications, what have we achieved?
Enhancement | What does this mean to production |
---|---|
Several models can be ran without copying or modifying many lines of code | Minimal changes to production active code |
Pipeline persistence with metadata | Saved pipelines can be stored in a different repository and can be recalled using the attached metadata |
Tracking of models and code changes | Experimentation is key to finding the optimal model and configuration and keeping track of each experiment helps give a broader view of change. |
Standardization of metadata and analytical methods | Consistent analytics and metrics allows for easier research into which models perform the best |
All of the code used in this post is available on Github at https://github.com/chariotsolutions/examples/tree/main/20240624-an-ml-tale-from-notebook-to-production