Machine Learning News Hubb
Advertisement Banner
  • Home
  • Machine Learning
  • Artificial Intelligence
  • Big Data
  • Deep Learning
  • Edge AI
  • Neural Network
  • Contact Us
  • Home
  • Machine Learning
  • Artificial Intelligence
  • Big Data
  • Deep Learning
  • Edge AI
  • Neural Network
  • Contact Us
Machine Learning News Hubb
No Result
View All Result
Home Machine Learning

Creating Sagemaker Pipelines for MLOps — Part 1 | by Santiago Arboleda Quiroz | Jan, 2023

admin by admin
January 10, 2023
in Machine Learning


MLOps is a combination of software engineering, data science and DevOps. It consists on using these practices in order to promove a machine learning to production environments. Actors as machine learning engineers, software engineers and data scientists must put models into production.

Typically a MLOps model is compose on the followings steps:

Feature engineering: is where data is generated, we combine datasets, clean data, and split into train, test and validation datasets; basically, we prepare data to enter to training process.

Training: is the process where data scientist build machine learning models. Clean data from feature engineering is used to traing the models.

Evaluation: is the process where we validate if machine learning model metric is between parameters that we expect.

Model Artifacts Generation: if our models pass the evaluation, then we can generate artifacts to consume the models, artifacts like pickle files or other formats that store the models.

In this story I will show how build a MLOps Pipeline using AWS SageMaker Pipelines and AWS SDK and we continue using feature group that we deployed in previous stories about wines quality.

The following picture shows steps that we are gonna develop in order to train and deploy a machine learning model:

AWS SageMaker Training Pipeline

First of all, we import libraries to build the pipeline:

import boto3
import sagemaker
import tarfile
from sagemaker.workflow.parameters import ParameterInteger,ParameterString, ParameterFloat
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import ProcessingStep,TrainingStep, CreateModelStep
from sagemaker.inputs import CreateModelInput, TrainingInput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.model import Model
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo, ConditionEquals
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

We define some variables to work with Sagemaker like role and session:

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

1. Make Dataset

This step consists on read data. After that, you can prepare, clean, standarize your data. We need to define a script .py which will be executed into a container, its name is “make_dataset.py”:

import logging
import argparse
import subprocess
import sys
import boto3
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r",
"/opt/ml/processing/input/requirements/requirements.txt"
])
except subprocess.CalledProcessError as error:
print('requirements.txt file not found ', error.output)
import awswrangler as wr
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sklearn.model_selection import train_test_split

# define input parameters for script execution
parser = argparse.ArgumentParser()
parser.add_argument('--region', type=str, default='us-east-1')
parser.add_argument('--db-feature-store', type=str)
parser.add_argument('--feature-group-name', type=str)
parser.add_argument('--sagemaker-bucket', type=str)

class MakeDataset():
"""Class to generate mineable view and
datasets we are gonna use to train"""

def __init__(self):
"""
Define input parameters for script execution
"""
args, _ = parser.parse_known_args()
log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_fmt)
self.logger = logging.getLogger(__name__)
self.region = args.region
self.boto_session = boto3.session.Session(region_name=self.region)
self.sagemaker_session = sagemaker.Session(
boto_session=self.boto_session)
self.feature_group_name = args.feature_group_name
self.feature_db = args.db_feature_store
self.sagemaker_bucket = args.sagemaker_bucket

def read_fg_tablename(self):
"""
Function to identify feature group table name from a feature group name
"""
sagemaker_fg = FeatureGroup(name=self.feature_group_name,
sagemaker_session=self.sagemaker_session)
query = sagemaker_fg.athena_query()
tb_feature_store = query.table_name

return tb_feature_store

def query_feature_store(self, features):
"""Function to get data from the last load"""

feature_aug = features.copy()

joined_features = ','.join(feature_aug)
feature_store_table = self.read_fg_tablename()

self.logger.info('Creating query definition...')
query = f'SELECT {joined_features} FROM
(SELECT *, row_number()
OVER (PARTITION BY observation_id
ORDER BY EventTime desc) AS row_number
FROM "{feature_store_table}")
WHERE row_number = 1'

self.logger.info(
"Feature Store database: %s, table: %s, features %s",
self.feature_db, feature_store_table, joined_features)

df_wines = self.read_sql_query(query)

self.logger.info('Feature store has %s samples', df_wines.shape[0])
self.logger.info(
"Feature store has %s features", df_wines.shape[1]-1
)

return df_wines

def read_sql_query(self, query):
"""Function to execute a query using awswrangler"""
dataframe = wr.athena.read_sql_query(
sql=query,
database=self.feature_db,
ctas_approach=False,
s3_output=f's3://{self.sagemaker_bucket}/athena/',
boto3_session=boto3.session.Session(
region_name=self.region))
return dataframe

def to_file(self, dataset, save_location, dtype):
"""Function to save a dataframe into AWS S3"""
dataset.to_csv(f'{save_location}/{dtype}.csv', index=False)

def save_file(self, dataset):
"""Function to save a file into specific path"""
save_location_train = '/opt/ml/processing/train'
save_location_test = '/opt/ml/processing/test'
save_location_val = '/opt/ml/processing/validation'

train, test, val = self.split_dataset(dataset,
train_size=0.7,
test_size=0.15,
validation_size=0.15)
self.logger.info("Save train, test and validation data")
print("Dataframes info")
print(train.info())
print(test.info())
print(val.info())
self.to_file(dataset=train, save_location=save_location_train,dtype='train')
self.to_file(dataset=test, save_location=save_location_test, dtype='test')
self.to_file(dataset=val, save_location=save_location_val, dtype='validation')

def transform_data(self, dataframe, features, classes):
"""We apply business logic to prepare data"""

features.remove('observation_id')
dataframe = dataframe[features]
print(dataframe.info())
if dataframe.empty:
raise CustomError('There is not enough information to train ')
else:
return dataframe

def split_dataset(self, dataset, train_size, test_size, validation_size=0):
"""Function to split dataset to training"""

if validation_size == 0:
train, test = train_test_split(dataset,
train_size=train_size,
test_size=test_size,
random_state=0,
)

return train, test
else:
# In the first step we will split the data in training and
# remaining dataset
size = (test_size + validation_size)
train, rem = train_test_split(dataset,
train_size=train_size,
test_size=1-train_size,
random_state=0,
)
test, validation = train_test_split(rem,
test_size=test_size / size,
random_state=0,
)

return train, test, validation

if __name__ == "__main__":
wines_ds = MakeDataset()
features_wines = ['observation_id','fixed_acidity','volatile_acidity','citric_acid','residual_sugar','chlorides','free_sulfur_dioxide','total_sulfur_dioxide','density','pH','sulphates','alcohol','quality']
class_wines = 'quality'
data_fs = wines_ds.query_feature_store(features_wines)
print(data_fs.head())
data = wines_ds.transform_data(
data_fs,
features_wines,
class_wines)
wines_ds.logger.info('Transforming data... Done.')
print(data.columns)
wines_ds.logger.info("Save wines train, test and validation data")
wines_ds.save_file(dataset=data)
wines_ds.logger.info("Save wines train, test and validation: Done")

Note we are passing passing feature store database and feature group name to the script. Now we configure parameters for this first step:

framework_version='0.23-1'
model_approval_status='Approved'
processing_instance_type = ParameterString(name="ProcessingInstanceType",default_value="ml.t3.large") # Instance where make data is gonna be executed
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount",default_value=1) # Quantity of instances we are gonna use
processing_outputs_path = ParameterString(name="ProcessingOutputsPath") #S3 URI where data is gonna be upload
dataset_outputs_path=f's3://{default_bucket}/tests/data/' #S3 URI where data is gonna be upload

Now, we configure the processor to make dataset:

# Processing step 
make_dataset_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type, #or 'local'
instance_count=processing_instance_count,
role=role,
sagemaker_session=sagemaker_session
)

We need define a requirements file to pass to the container to be able to execute our code. Also, we set feature store name and feature group name to pass to the code (we deployed before a feature group with data to train):

%%writefile requirements.txt
sagemaker == 2.49.0
awswrangler == 2.4.0
feature_store_database = 'sagemaker_featurestore'
feature_group_name = 'white-wines-feature-group'

We configure the make dataset step:

step_make_dataset= ProcessingStep(
name=f"make_dataset",
processor=make_dataset_processor,
inputs=[
ProcessingInput(
input_name = 'requirements',
source = 'requirements.txt', # or S3 URI where file exists
destination = '/opt/ml/processing/input/requirements')
],
outputs=[
ProcessingOutput(
output_name="train",
source=f"/opt/ml/processing/train/",
destination=processing_outputs_path),
ProcessingOutput(
output_name="test",
source="/opt/ml/processing/test",
destination=processing_outputs_path),
ProcessingOutput(
output_name="validation",
source="/opt/ml/processing/validation",
destination=processing_outputs_path),
],
code = 'make_dataset.py', # or S3 URI where file exists
job_arguments=[
'--region', region,
'--db-feature-store', feature_store_database,
'--feature-group-name', feature_group_name,
'--sagemaker-bucket', default_bucket,
],

)

Note inputs and outputs of the processing job. We have a requirements file as input and train, test and validation data as output.

2. Training

We define a function to convert a file into tar.gz file, which we’ll use to pass our training file to the container:

def make_tarfile(files):
tar = tarfile.open("sourcedir.tar.gz", "w:gz")
for name in files:
tar.add(name)
tar.close()
return "file processed"

Now, we create a script with the code to train the model, its name will be “train.py”:

import os
import pandas as pd
from sklearn.linear_model import LogisticRegression
import joblib
import argparse
import cgi
import json
from io import StringIO
import numpy as np
from sagemaker_containers.beta.framework import encoders, worker

###############################################################################
# INFERENCE ###################################################################
###############################################################################
def input_fn(input_data, content_type):
"""Parse input data payload.

We currently only take csv input. Since we need to process both labelled
and unlabelled data we first determine whether the label column is present
by looking at how many columns were provided.
"""
# cgi.parse_header extracts all arguments after ';' as key-value pairs
# e.g. cgi.parse_header('text/csv;label_size=1;charset=utf8') returns
# the tuple ('text/csv', {'label_size': '1', 'charset': 'utf8'})
content_type, params = cgi.parse_header(content_type.lower())

if content_type == 'text/csv':
# Read the raw input data as CSV.
df = pd.read_csv(StringIO(input_data), header=None)
return df
else:
raise NotImplementedError(
f"content_type '{content_type}' not implemented!")

def predict_fn(input_data, model):

return model.predict(input_data)

#def predict_fn(input_data, model):
# prediction = model.predict(input_data)
# pred_prob = model.predict_proba(input_data)
# return np.array([prediction, pred_prob])

def model_fn(model_dir):

return joblib.load(os.path.join(model_dir, 'model.joblib'))

def output_fn(prediction, accept):
"""Format prediction output.

The default accept/content-type between containers for serial inference is JSON.
We also want to set the ContentType or mimetype as the same value as accept so the next
container can read the response payload correctly.
"""
# cgi.parse_header extracts all arguments after ';' as key-value pairs
# e.g. cgi.parse_header('text/csv;label_size=1;charset=utf8') returns
# the tuple ('text/csv', {'label_size': '1', 'charset': 'utf8'})
accept, params = cgi.parse_header(accept.lower())

if accept == 'text/csv':
return worker.Response(encoders.encode(prediction, accept), mimetype=accept)

elif accept == "application/json":
# https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-inference.html
instances = []
for row in prediction.tolist():
instances.append({"features": row})

json_output = {"instances": instances}

return worker.Response(json.dumps(json_output), mimetype=accept)

else:
raise NotImplementedError(f"accept '{accept}' not implemented!")

if __name__ == "__main__":

parser = argparse.ArgumentParser()
parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"])

args = parser.parse_args()

# reading the file
print('leyendo archivo')
input_files = [os.path.join(args.train, file) for file in os.listdir(args.train)]
raw_data = [pd.read_csv(file, engine="python") for file in input_files]
df = pd.concat(raw_data)
print(df.info())

# we separate data into x and y
print('we assign values')

X_train = df.iloc[:,0:-1]
y_train = df.iloc[:,-1]

# training the model
model = LogisticRegression(class_weight="balanced", solver="lbfgs")
print("Training LR model")
model.fit(X_train, y_train)

# Store tar.gz file
print("Saving model")
joblib.dump(model, os.path.join(args.model_dir, "model.joblib"))

make_tarfile(['train.py']) 

Note this file contains some functions: input_fn, predict_fn, model_fn and output_fn, which are neccesary to do inferences.

We define parameteres to execute the training process and model metrics definition:

model_output_path=f's3://{default_bucket}/tests/wines/modelos/'
training_instance_count = ParameterInteger(name="TrainingInstanceCount",default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType",default_value="ml.m5.large")
training_output_path=ParameterString(name="TrainingOutputPath", default_value=f"s3://{default_bucket}/artefactos/modelos/wines")
metric_definitions = [{"Name":"f1_score",
"Regex":"f1_score: ([0-9\.]+)"}]

Now, we configure an estimator of Scikit-Learn:

# estimator
sklearn_estimator = SKLearn(
entry_point='train.py',
source_dir=f's3://{default_bucket}/tests/wines/train/sourcedir.tar.gz',
framework_version=framework_version,
instance_type=training_instance_type,
enable_sagemaker_metrics=True,
metric_definitions=metric_definitions,
role=role,
output_path=training_output_path,
sagemaker_session=sagemaker_session,
#hyperparameters={key:value} we can set hyperparameters
)

Note we pass entry point and source_dir. It corresponds to our tar.gz file, which shall be upload to a S3 Location with the name “sourcedir.tar.gz” and inside we find entry_point as “train.py”.

Now, we define the training step:

step_train = TrainingStep(
name=f'train-wines',
estimator=sklearn_estimator,
inputs={
"train": TrainingInput(
s3_data=step_make_dataset.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, #or s3 URI where data exists
content_type="text/csv"
)
},
depends_on=[f'make_dataset']
)

Note we pass as input the output of previous step and we have this step depends on execution of make_dataset step.

In the next story, we’ll see the rest to the pipeline, and we will invoke an endpoint to to inferences of the model.

I hope you enjoy this content!



Source link

Previous Post

5 Reasons Why Pandas is the Best Library for Data Science in Python | by Roberto | Jan, 2023

Next Post

Introduction to Sampling Methods. Implementing inverse transform… | by Herman Michaels | Jan, 2023

Next Post

Introduction to Sampling Methods. Implementing inverse transform… | by Herman Michaels | Jan, 2023

Best practices for load testing Amazon SageMaker real-time inference endpoints

Top Electronic Document Management System in 2023

Related Post

Artificial Intelligence

Amazon SageMaker built-in LightGBM now offers distributed training using Dask

by admin
January 30, 2023
Artificial Intelligence

Don’t blame a Data Scientist on failed projects! | by Darya Petrashka | Dec, 2022

by admin
January 30, 2023
Edge AI

BrainChip Tapes Out AKD1500 Chip in GlobalFoundries 22nm FD SOI Process

by admin
January 30, 2023
Big Data

You Have More Data Quality Issues Than You Think: Here’s Why.

by admin
January 30, 2023
Artificial Intelligence

Using Generative Models for Creativity

by admin
January 30, 2023
Artificial Intelligence

CRPS : Scoring Function for Bayesian ML Models | by Itamar Faran

by admin
January 29, 2023

© 2023 Machine Learning News Hubb All rights reserved.

Use of these names, logos, and brands does not imply endorsement unless specified. By using this site, you agree to the Privacy Policy and Terms & Conditions.

Navigate Site

  • Home
  • Machine Learning
  • Artificial Intelligence
  • Big Data
  • Deep Learning
  • Edge AI
  • Neural Network
  • Contact Us

Newsletter Sign Up.

No Result
View All Result
  • Home
  • Machine Learning
  • Artificial Intelligence
  • Big Data
  • Deep Learning
  • Edge AI
  • Neural Network
  • Contact Us

© 2023 JNews - Premium WordPress news & magazine theme by Jegtheme.