MLOps is a combination of software engineering, data science and DevOps. It consists on using these practices in order to promove a machine learning to production environments. Actors as machine learning engineers, software engineers and data scientists must put models into production.
Typically a MLOps model is compose on the followings steps:
Feature engineering: is where data is generated, we combine datasets, clean data, and split into train, test and validation datasets; basically, we prepare data to enter to training process.
Training: is the process where data scientist build machine learning models. Clean data from feature engineering is used to traing the models.
Evaluation: is the process where we validate if machine learning model metric is between parameters that we expect.
Model Artifacts Generation: if our models pass the evaluation, then we can generate artifacts to consume the models, artifacts like pickle files or other formats that store the models.
In this story I will show how build a MLOps Pipeline using AWS SageMaker Pipelines and AWS SDK and we continue using feature group that we deployed in previous stories about wines quality.
The following picture shows steps that we are gonna develop in order to train and deploy a machine learning model:
First of all, we import libraries to build the pipeline:
import boto3
import sagemaker
import tarfile
from sagemaker.workflow.parameters import ParameterInteger,ParameterString, ParameterFloat
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import ProcessingStep,TrainingStep, CreateModelStep
from sagemaker.inputs import CreateModelInput, TrainingInput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.model import Model
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo, ConditionEquals
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
We define some variables to work with Sagemaker like role and session:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
1. Make Dataset
This step consists on read data. After that, you can prepare, clean, standarize your data. We need to define a script .py which will be executed into a container, its name is “make_dataset.py”:
import logging
import argparse
import subprocess
import sys
import boto3
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r",
"/opt/ml/processing/input/requirements/requirements.txt"
])
except subprocess.CalledProcessError as error:
print('requirements.txt file not found ', error.output)
import awswrangler as wr
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sklearn.model_selection import train_test_split# define input parameters for script execution
parser = argparse.ArgumentParser()
parser.add_argument('--region', type=str, default='us-east-1')
parser.add_argument('--db-feature-store', type=str)
parser.add_argument('--feature-group-name', type=str)
parser.add_argument('--sagemaker-bucket', type=str)
class MakeDataset():
"""Class to generate mineable view and
datasets we are gonna use to train"""
def __init__(self):
"""
Define input parameters for script execution
"""
args, _ = parser.parse_known_args()
log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_fmt)
self.logger = logging.getLogger(__name__)
self.region = args.region
self.boto_session = boto3.session.Session(region_name=self.region)
self.sagemaker_session = sagemaker.Session(
boto_session=self.boto_session)
self.feature_group_name = args.feature_group_name
self.feature_db = args.db_feature_store
self.sagemaker_bucket = args.sagemaker_bucket
def read_fg_tablename(self):
"""
Function to identify feature group table name from a feature group name
"""
sagemaker_fg = FeatureGroup(name=self.feature_group_name,
sagemaker_session=self.sagemaker_session)
query = sagemaker_fg.athena_query()
tb_feature_store = query.table_name
return tb_feature_store
def query_feature_store(self, features):
"""Function to get data from the last load"""
feature_aug = features.copy()
joined_features = ','.join(feature_aug)
feature_store_table = self.read_fg_tablename()
self.logger.info('Creating query definition...')
query = f'SELECT {joined_features} FROM
(SELECT *, row_number()
OVER (PARTITION BY observation_id
ORDER BY EventTime desc) AS row_number
FROM "{feature_store_table}")
WHERE row_number = 1'
self.logger.info(
"Feature Store database: %s, table: %s, features %s",
self.feature_db, feature_store_table, joined_features)
df_wines = self.read_sql_query(query)
self.logger.info('Feature store has %s samples', df_wines.shape[0])
self.logger.info(
"Feature store has %s features", df_wines.shape[1]-1
)
return df_wines
def read_sql_query(self, query):
"""Function to execute a query using awswrangler"""
dataframe = wr.athena.read_sql_query(
sql=query,
database=self.feature_db,
ctas_approach=False,
s3_output=f's3://{self.sagemaker_bucket}/athena/',
boto3_session=boto3.session.Session(
region_name=self.region))
return dataframe
def to_file(self, dataset, save_location, dtype):
"""Function to save a dataframe into AWS S3"""
dataset.to_csv(f'{save_location}/{dtype}.csv', index=False)
def save_file(self, dataset):
"""Function to save a file into specific path"""
save_location_train = '/opt/ml/processing/train'
save_location_test = '/opt/ml/processing/test'
save_location_val = '/opt/ml/processing/validation'
train, test, val = self.split_dataset(dataset,
train_size=0.7,
test_size=0.15,
validation_size=0.15)
self.logger.info("Save train, test and validation data")
print("Dataframes info")
print(train.info())
print(test.info())
print(val.info())
self.to_file(dataset=train, save_location=save_location_train,dtype='train')
self.to_file(dataset=test, save_location=save_location_test, dtype='test')
self.to_file(dataset=val, save_location=save_location_val, dtype='validation')
def transform_data(self, dataframe, features, classes):
"""We apply business logic to prepare data"""
features.remove('observation_id')
dataframe = dataframe[features]
print(dataframe.info())
if dataframe.empty:
raise CustomError('There is not enough information to train ')
else:
return dataframe
def split_dataset(self, dataset, train_size, test_size, validation_size=0):
"""Function to split dataset to training"""
if validation_size == 0:
train, test = train_test_split(dataset,
train_size=train_size,
test_size=test_size,
random_state=0,
)
return train, test
else:
# In the first step we will split the data in training and
# remaining dataset
size = (test_size + validation_size)
train, rem = train_test_split(dataset,
train_size=train_size,
test_size=1-train_size,
random_state=0,
)
test, validation = train_test_split(rem,
test_size=test_size / size,
random_state=0,
)
return train, test, validation
if __name__ == "__main__":
wines_ds = MakeDataset()
features_wines = ['observation_id','fixed_acidity','volatile_acidity','citric_acid','residual_sugar','chlorides','free_sulfur_dioxide','total_sulfur_dioxide','density','pH','sulphates','alcohol','quality']
class_wines = 'quality'
data_fs = wines_ds.query_feature_store(features_wines)
print(data_fs.head())
data = wines_ds.transform_data(
data_fs,
features_wines,
class_wines)
wines_ds.logger.info('Transforming data... Done.')
print(data.columns)
wines_ds.logger.info("Save wines train, test and validation data")
wines_ds.save_file(dataset=data)
wines_ds.logger.info("Save wines train, test and validation: Done")
Note we are passing passing feature store database and feature group name to the script. Now we configure parameters for this first step:
framework_version='0.23-1'
model_approval_status='Approved'
processing_instance_type = ParameterString(name="ProcessingInstanceType",default_value="ml.t3.large") # Instance where make data is gonna be executed
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount",default_value=1) # Quantity of instances we are gonna use
processing_outputs_path = ParameterString(name="ProcessingOutputsPath") #S3 URI where data is gonna be upload
dataset_outputs_path=f's3://{default_bucket}/tests/data/' #S3 URI where data is gonna be upload
Now, we configure the processor to make dataset:
# Processing step
make_dataset_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type, #or 'local'
instance_count=processing_instance_count,
role=role,
sagemaker_session=sagemaker_session
)
We need define a requirements file to pass to the container to be able to execute our code. Also, we set feature store name and feature group name to pass to the code (we deployed before a feature group with data to train):
%%writefile requirements.txt
sagemaker == 2.49.0
awswrangler == 2.4.0
feature_store_database = 'sagemaker_featurestore'
feature_group_name = 'white-wines-feature-group'
We configure the make dataset step:
step_make_dataset= ProcessingStep(
name=f"make_dataset",
processor=make_dataset_processor,
inputs=[
ProcessingInput(
input_name = 'requirements',
source = 'requirements.txt', # or S3 URI where file exists
destination = '/opt/ml/processing/input/requirements')
],
outputs=[
ProcessingOutput(
output_name="train",
source=f"/opt/ml/processing/train/",
destination=processing_outputs_path),
ProcessingOutput(
output_name="test",
source="/opt/ml/processing/test",
destination=processing_outputs_path),
ProcessingOutput(
output_name="validation",
source="/opt/ml/processing/validation",
destination=processing_outputs_path),
],
code = 'make_dataset.py', # or S3 URI where file exists
job_arguments=[
'--region', region,
'--db-feature-store', feature_store_database,
'--feature-group-name', feature_group_name,
'--sagemaker-bucket', default_bucket,
],)
Note inputs and outputs of the processing job. We have a requirements file as input and train, test and validation data as output.
2. Training
We define a function to convert a file into tar.gz file, which we’ll use to pass our training file to the container:
def make_tarfile(files):
tar = tarfile.open("sourcedir.tar.gz", "w:gz")
for name in files:
tar.add(name)
tar.close()
return "file processed"
Now, we create a script with the code to train the model, its name will be “train.py”:
import os
import pandas as pd
from sklearn.linear_model import LogisticRegression
import joblib
import argparse
import cgi
import json
from io import StringIO
import numpy as np
from sagemaker_containers.beta.framework import encoders, worker###############################################################################
# INFERENCE ###################################################################
###############################################################################
def input_fn(input_data, content_type):
"""Parse input data payload.
We currently only take csv input. Since we need to process both labelled
and unlabelled data we first determine whether the label column is present
by looking at how many columns were provided.
"""
# cgi.parse_header extracts all arguments after ';' as key-value pairs
# e.g. cgi.parse_header('text/csv;label_size=1;charset=utf8') returns
# the tuple ('text/csv', {'label_size': '1', 'charset': 'utf8'})
content_type, params = cgi.parse_header(content_type.lower())
if content_type == 'text/csv':
# Read the raw input data as CSV.
df = pd.read_csv(StringIO(input_data), header=None)
return df
else:
raise NotImplementedError(
f"content_type '{content_type}' not implemented!")
def predict_fn(input_data, model):
return model.predict(input_data)
#def predict_fn(input_data, model):
# prediction = model.predict(input_data)
# pred_prob = model.predict_proba(input_data)
# return np.array([prediction, pred_prob])
def model_fn(model_dir):
return joblib.load(os.path.join(model_dir, 'model.joblib'))
def output_fn(prediction, accept):
"""Format prediction output.
The default accept/content-type between containers for serial inference is JSON.
We also want to set the ContentType or mimetype as the same value as accept so the next
container can read the response payload correctly.
"""
# cgi.parse_header extracts all arguments after ';' as key-value pairs
# e.g. cgi.parse_header('text/csv;label_size=1;charset=utf8') returns
# the tuple ('text/csv', {'label_size': '1', 'charset': 'utf8'})
accept, params = cgi.parse_header(accept.lower())
if accept == 'text/csv':
return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
elif accept == "application/json":
# https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-inference.html
instances = []
for row in prediction.tolist():
instances.append({"features": row})
json_output = {"instances": instances}
return worker.Response(json.dumps(json_output), mimetype=accept)
else:
raise NotImplementedError(f"accept '{accept}' not implemented!")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
args = parser.parse_args()
# reading the file
print('leyendo archivo')
input_files = [os.path.join(args.train, file) for file in os.listdir(args.train)]
raw_data = [pd.read_csv(file, engine="python") for file in input_files]
df = pd.concat(raw_data)
print(df.info())
# we separate data into x and y
print('we assign values')
X_train = df.iloc[:,0:-1]
y_train = df.iloc[:,-1]
# training the model
model = LogisticRegression(class_weight="balanced", solver="lbfgs")
print("Training LR model")
model.fit(X_train, y_train)
# Store tar.gz file
print("Saving model")
joblib.dump(model, os.path.join(args.model_dir, "model.joblib"))
make_tarfile(['train.py'])
Note this file contains some functions: input_fn, predict_fn, model_fn and output_fn, which are neccesary to do inferences.
We define parameteres to execute the training process and model metrics definition:
model_output_path=f's3://{default_bucket}/tests/wines/modelos/'
training_instance_count = ParameterInteger(name="TrainingInstanceCount",default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType",default_value="ml.m5.large")
training_output_path=ParameterString(name="TrainingOutputPath", default_value=f"s3://{default_bucket}/artefactos/modelos/wines")
metric_definitions = [{"Name":"f1_score",
"Regex":"f1_score: ([0-9\.]+)"}]
Now, we configure an estimator of Scikit-Learn:
# estimator
sklearn_estimator = SKLearn(
entry_point='train.py',
source_dir=f's3://{default_bucket}/tests/wines/train/sourcedir.tar.gz',
framework_version=framework_version,
instance_type=training_instance_type,
enable_sagemaker_metrics=True,
metric_definitions=metric_definitions,
role=role,
output_path=training_output_path,
sagemaker_session=sagemaker_session,
#hyperparameters={key:value} we can set hyperparameters
)
Note we pass entry point and source_dir. It corresponds to our tar.gz file, which shall be upload to a S3 Location with the name “sourcedir.tar.gz” and inside we find entry_point as “train.py”.
Now, we define the training step:
step_train = TrainingStep(
name=f'train-wines',
estimator=sklearn_estimator,
inputs={
"train": TrainingInput(
s3_data=step_make_dataset.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, #or s3 URI where data exists
content_type="text/csv"
)
},
depends_on=[f'make_dataset']
)
Note we pass as input the output of previous step and we have this step depends on execution of make_dataset step.
In the next story, we’ll see the rest to the pipeline, and we will invoke an endpoint to to inferences of the model.
I hope you enjoy this content!