14.1 AWS Pipelines: SageMaker Pipelines & Step Functions
In the ecosystem of Amazon Web Services (AWS), orchestrating Machine Learning workflows is a discipline that sits at the intersection of data engineering, model development, and operations. Unlike traditional software CI/CD pipelines which focus on compiling code and running unit tests, ML pipelines—specifically Continuous Training (CT) pipelines—must manage the flow of data, the provisioning of specialized compute resources (GPUs/TPUs), and the complex state management of probabilistic experiments.
This chapter explores the two primary engines for this orchestration on AWS: Amazon SageMaker Pipelines and AWS Step Functions. While both can technically execute a sequence of tasks, they serve different masters and shine in different operational contexts. We will dissect their architectures, dive deep into their implementation details, and provide comprehensive code examples to illustrate their practical application in a production MLOps environment.
The Role of Orchestration in Continuous Training
Continuous Training (CT) is the “Heartbeat” of a mature MLOps system (Level 2+ in maturity models). It ensures that your models do not become stagnant artifacts but are living entities that adapt to shifting data distributions.
An effective CT pipeline must solve several problems simultaneously:
- Data Lineage & Provenance: Tracking exactly which dataset version produced which model version.
- Resource Management: Spinning up transient clusters for heavy training jobs and tearing them down immediately to control costs.
- State Management: Handling failures, retries, and conditional logic (e.g., “only register this model if accuracy > 90%”).
- Reproducibility: Ensuring that a pipeline run from six months ago can be re-executed with identical results.
Amazon SageMaker Pipelines
SageMaker Pipelines is the first purpose-built CI/CD service for Machine Learning. It is deeply integrated into the SageMaker ecosystem, treating ML concepts like “Models”, “Experiments”, and “Model Registry” as first-class citizens. Unlike general-purpose orchestrators, it intuitively understands what a “Training Job” is.
Core Architecture
A SageMaker Pipeline is a Directed Acyclic Graph (DAG) of Steps. Each step represents a distinct unit of work, such as:
- ProcessingStep: Running a data preprocessing script on a Spark or Scikit-Learn container.
- TrainingStep: Launching a training job on a GPU instance.
- TuningStep: Executing a Hyperparameter Optimization (HPO) job.
- ModelStep: Creating a SageMaker Model object.
- RegisterModel: Registering the model version in the Model Registry.
- ConditionStep: Branching logic based on step properties (e.g., evaluation metrics).
- CallbackStep: Waiting for external systems (Human-in-the-loop, approval workflows).
When you define a pipeline using the Python SDK, it compiles down to a JSON Pipeline Definition which is then submitted to the SageMaker control plane. The control plane manages the execution, handling dependencies and data movement between steps.
Implementation Guide: The Python SDK
The most common way to define SageMaker Pipelines is via the sagemaker Python SDK. Let’s build a robust, production-grade CT pipeline.
Prerequisites and Setup
First, we define our pipeline parameters. These allow us to inject variables at runtime, making the pipeline reusable across environments (Dev, Staging, Prod).
import sagemaker
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
# Define Pipeline Parameters
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.large"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.p3.2xlarge"
)
model_approval_status = ParameterString(
name="ModelApprovalStatus",
default_value="PendingManualApproval"
)
input_data_uri = ParameterString(
name="InputDataUrl",
default_value="s3://my-mlops-bucket/data/raw/census.csv"
)
Step 1: Data Processing
We use the SKLearnProcessor to run a preprocessing script. This step scales out to handle data transformation.
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
framework_version="1.2-1",
instance_type=processing_instance_type,
instance_count=processing_instance_count,
base_job_name="census-process",
role=role,
)
step_process = ProcessingStep(
name="CensusProcess",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data_uri, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
],
code="code/preprocessing.py",
)
Crucial Detail: Note how we pass the pipeline parameter processing_instance_type directly into the processor definition. This late binding allows us to override instance types for heavy runs without changing the code.
Step 2: Model Training
Here we define the estimator and the training step. We connect the output of the processing step to the input of the training step using step_process.properties. This implicit dependency builds the DAG.
from sagemaker.estimator import Estimator
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region="us-east-1",
version="1.5-1"
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path="s3://my-mlops-bucket/models",
role=role,
)
xgb_train.set_hyperparameters(
objective="binary:logistic",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.8,
)
step_train = TrainingStep(
name="CensusTrain",
estimator=xgb_train,
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
content_type="text/csv",
),
},
)
Step 3: Model Evaluation
Before registering a model, we must confirm it performs better than a baseline. We run a dedicated processing job to evaluate the model against the test set.
from sagemaker.workflow.properties import PropertyFile
# Define a PropertyFile to store evaluation metrics
# This file allows the ConditionStep to "read" the results of the evaluation.
evaluation_report = PropertyFile(
name="EvaluationReport",
output_name="evaluation",
path="evaluation.json"
)
step_eval = ProcessingStep(
name="CensusEval",
processor=sklearn_processor,
inputs=[
ProcessingInput(
source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model",
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
],
code="code/evaluation.py",
property_files=[evaluation_report],
)
The evaluation.py script must write a JSON file to /opt/ml/processing/evaluation/evaluation.json that looks like this:
{
"binary_classification_metrics": {
"accuracy": {
"value": 0.92,
"standard_deviation": 0.01
},
"auc": {
"value": 0.96,
"standard_deviation": 0.005
}
}
}
Step 4: Condition and Registration
This is the gatekeeper. We only proceed to registration if the model accuracy exceeds a threshold.
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model
# Define the Model object
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sagemaker_session,
role=role,
)
# Step to register model in Model Registry
step_register = ModelStep(
name="CensusRegisterModel",
step_args=model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name="CensusModelGroup",
approval_status=model_approval_status,
)
)
# Condition Step
cond_lte = ConditionGreaterThanOrEqualTo(
left=step_eval.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri, # Note: This path syntax is conceptual, you typically use JsonGet
# Correct usage of JsonGet to parse the property file:
left=JsonGet(
step_name=step_eval.name,
property_file=evaluation_report,
json_path="binary_classification_metrics.accuracy.value",
),
right=0.80,
)
step_cond = ConditionStep(
name="CheckAUCScore",
conditions=[cond_lte],
if_steps=[step_register],
else_steps=[], # You could send a notification here on failure
)
Pipeline Definition & Execution
Finally, we assemble the pipeline.
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name="CensusPipeline",
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
model_approval_status,
input_data_uri,
],
steps=[step_process, step_train, step_eval, step_cond],
)
# Upsert the pipeline definition
pipeline.upsert(role_arn=role)
# Start an execution
execution = pipeline.start()
execution.wait()
JSON-Based Definition: Under the Hood
While the Python SDK is convenient, the source of truth is the JSON definition. Understanding this is critical for debugging complex pipelines or generating pipelines programmatically from other languages (e.g., via Terraform or Go).
A typical TrainingStep in the JSON format looks like this:
{
"Name": "CensusTrain",
"Type": "Training",
"Arguments": {
"AlgorithmSpecification": {
"TrainingImage": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.5-1",
"TrainingInputMode": "File"
},
"OutputDataConfig": {
"S3OutputPath": "s3://my-mlops-bucket/models"
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 86400
},
"ResourceConfig": {
"InstanceCount": 1,
"InstanceType": "ml.p3.2xlarge",
"VolumeSizeInGB": 30
},
"RoleArn": "arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole-20220101T000000",
"InputDataConfig": [
{
"ChannelName": "train",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": {
"Get": "Steps.CensusProcess.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri"
},
"S3DataDistributionType": "FullyReplicated"
}
},
"ContentType": "text/csv"
}
],
"HyperParameters": {
"objective": "binary:logistic",
"num_round": "50"
}
}
}
Notice the Get syntax in S3Uri. This is the JSON Path interpolation that SageMaker performs at runtime to resolve dependencies between steps.
AWS Step Functions: The Generalist Orchestrator
While SageMaker Pipelines focuses on the inner loop of model creation, AWS Step Functions is a general-purpose serverless orchestrator that can coordinate any AWS service.
State Machine Definition Language (ASL)
Step Functions uses the Amazon States Language (ASL), a JSON-based structured language.
A simple MLOps workflow in ASL might look like this:
{
"Comment": "A simple MLOps pipeline using Lambda and SageMaker",
"StartAt": "PreprocessData",
"States": {
"PreprocessData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:DataPreprocessor",
"Next": "TrainModel"
},
"TrainModel": {
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
"Parameters": {
"TrainingJobName.$": "$$.Execution.Name",
"AlgorithmSpecification": {
"TrainingImage": "...",
"TrainingInputMode": "File"
},
"OutputDataConfig": {
"S3OutputPath": "s3://my-bucket/models"
},
"ResourceConfig": {
"InstanceCount": 1,
"InstanceType": "ml.m5.xlarge",
"VolumeSizeInGB": 10
},
"RoleArn": "arn:aws:iam::123456789012:role/SageMakerRole",
"StoppingCondition": {
"MaxRuntimeInSeconds": 3600
}
},
"Next": "SaveModel"
},
"SaveModel": {
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createModel",
"Parameters": {
"ExecutionRoleArn": "arn:aws:iam::123456789012:role/SageMakerRole",
"ModelName.$": "$.TrainingJobName",
"PrimaryContainer": {
"Image": "...",
"ModelDataUrl.$": "$.ModelArtifacts.S3ModelArtifacts"
}
},
"End": true
}
}
}
The “Sync” Integration Pattern
One of the most powerful features of Step Functions for MLOps is the .sync service integration (e.g., arn:aws:states:::sagemaker:createTrainingJob.sync).
- Standard Call: Step Functions fires the API call to SageMaker and immediately moves to the next state. This is bad for training jobs, as the pipeline would finish while training is still starting.
- Sync Call: Step Functions halts the state transition, polls the SageMaker Training Job status, and only proceeds when the job completes (Success or Failure). It even captures the output artifacts automatically.
Step Functions Data Science SDK
For Python-native data scientists who find writing ASL JSON tedious, AWS provides the stepfunctions Python SDK. It allows defining state machines similarly to SageMaker Pipelines.
import stepfunctions
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.workflow importWorkflow
training_step = TrainingStep(
"Train Model",
estimator=xgb,
data={
"train": s3_input_train,
"validation": s3_input_validation
},
wait_for_completion=True
)
model_step = ModelStep(
"Save Model",
model=training_step.get_expected_model(),
result_path="$.ModelArtifacts"
)
workflow = Workflow(
name="MyMLOpsWorkflow",
definition=training_step.next(model_step),
role=workflow_execution_role
)
workflow.create()
workflow.execute()
SageMaker Pipelines vs. Step Functions
When should you use which? This is a common architectural decision point.
| Feature | SageMaker Pipelines | AWS Step Functions |
|---|---|---|
| Primary Audience | Data Scientists, ML Engineers | DevOps Engineers, Cloud Architects |
| Scope | Model Development Lifecycle (Train/Eval/Register) | End-to-End System Integration (Ingest -> Train -> Deploy -> Notify) |
| Visualization | Dedicated DAG UI in SageMaker Studio | General Purpose State Machine Graph in AWS Console |
| Local Testing | Supported via Local Mode | Limited (requires mocks or stepfunctions-local) |
| Integration | Deeply integrated with SageMaker Experiments & Model Registry | Integrates with 200+ AWS Services (Lambda, Glue, DynamoDB, SNS) |
| Cost | Free (no additional charge for the pipeline itself) | Charged per state transition (Standard) or duration (Express) |
| Latency | Medium (Setup overhead for containers) | Low (Instant state transitions) |
The “Dual-Pipeline” Strategy
In sophisticated enterprise setups, we often see a Dual-Pipeline Strategy that leverages the strengths of both:
- Outer Loop (Step Functions): Handles the macro-orchestration. It triggers data ingestion (Glue), checks for data quality (Deequ), and then triggers the SageMaker Pipeline. After the model is approved, it handles the deployment to production endpoints and sets up CloudWatch alarms.
- Inner Loop (SageMaker Pipelines): Handles the core ML iteration. It takes the prepared data, runs training, performs hyperparameter tuning, evaluates the model, and registers it.
This separation of concerns allows Data Scientists to own the “Inner Loop” (iterating on model architecture in SageMaker Studio) without worrying about the complex IAM roles and cross-account logic often required in the “Outer Loop” (owned by the Platform Engineering team).
Best Practices for AWS Pipelines
-
Cache Steps Aggressively: SageMaker Pipelines supports step caching. If you change only the Training step code, the pipeline should not re-run the expensive Data Processing step if the inputs haven’t changed. Enable this via
CacheConfig.from sagemaker.workflow.steps import CacheConfig cache_config = CacheConfig(enable_caching=True, expire_after="P30D") step_process = ProcessingStep(..., cache_config=cache_config) -
Use Processing Jobs for Evaluation: Do not run evaluation logic inside the training script. Separation of concerns allows you to change evaluation metrics without retraining the model.
-
Tag Everything: Propagate tags from the Pipeline execution to the underlying jobs. This is vital for
FinOpsand cost attribution. -
Parameterize Infrastructure: Never hardcode instance types (
ml.p3.2xlarge). Use Pipeline Parameters so that you can run small “smoke tests” on cheap instances (ml.m5.large) before committing to a full training run. -
Artifact Management: Use structured naming conventions for S3 paths, often leveraging the
Execution.PipelineExecutionIdto isolate runs.
Conclusion
Mastering AWS Pipelines requires navigating the trade-offs between the specialized, data-science-friendly features of SageMaker Pipelines and the robust, integrative power of Step Functions. By employing patterns like the Dual-Pipeline strategy and adhering to strict Infrastructure-as-Code principles with the Python SDKs, organizations can build resilient, self-healing Continuous Training systems that scale with their AI ambitions.