20.3 Triggering Patterns: Event-Driven vs. Drift-Driven
Automating the execution of a pipeline is only half the battle. The other half is determining when that pipeline should execute. In the early stages of MLOps maturity, humans push buttons. In the middle stages, cron jobs run on schedules. In advanced stages, the system reacts to the world around it.
This chapter explores the architectural patterns for triggering Continuous Training (CT) pipelines, moving from static schedules to dynamic, event-driven, and drift-aware systems.
20.3.1. The Triggering Maturity Model
graph LR
subgraph "Level 0: Manual"
A[Data Scientist] -->|"Button Click"| B[Pipeline]
end
subgraph "Level 1: Scheduled"
C[Cron Job] -->|"Every Sunday 2AM"| D[Pipeline]
end
subgraph "Level 2: Event-Driven"
E[Data Landing] -->|"New Data Event"| F[Pipeline]
end
subgraph "Level 3: Drift-Driven"
G[Monitor] -->|"Quality Alert"| H[Pipeline]
end
subgraph "Level 4: Adaptive"
I[Cost/Quality Optimizer] -->|"Dynamic Decision"| J[Pipeline]
end
The Triggering Hierarchy
| Level | Trigger | Decision Maker | Latency | Cost Efficiency |
|---|---|---|---|---|
| 0 | Manual | Human | Days-Weeks | Very Low |
| 1 | Scheduled | Time | Fixed | Low-Medium |
| 2 | Event-Driven | Data Arrival | Minutes-Hours | Medium |
| 3 | Drift-Driven | Model Quality | Hours-Days | High |
| 4 | Adaptive | Multi-factor | Optimal | Very High |
20.3.2. Pattern 1: Scheduled Triggers (The Baseline)
Before diving into sophisticated patterns, let’s establish the baseline: cron-based scheduling.
When Scheduled Makes Sense
- Stable domains: Sales forecasting, monthly reports
- Predictable data cadence: Daily batch loads, weekly updates
- Budget constraints: Training costs are fixed and predictable
- Early maturity: Simple to implement, easy to debug
AWS: EventBridge Scheduled Rules
# scheduled_trigger.tf - AWS Implementation
resource "aws_cloudwatch_event_rule" "weekly_retrain" {
name = "weekly-model-retrain"
description = "Triggers model retraining every Sunday at 2 AM UTC"
schedule_expression = "cron(0 2 ? * SUN *)"
tags = {
Environment = var.environment
Purpose = "scheduled-retraining"
}
}
resource "aws_cloudwatch_event_target" "sagemaker_scheduled" {
rule = aws_cloudwatch_event_rule.weekly_retrain.name
target_id = "WeeklyRetraining"
arn = aws_sagemaker_pipeline.training_pipeline.arn
role_arn = aws_iam_role.eventbridge_execution.arn
sagemaker_pipeline_target {
pipeline_parameter_list {
name = "TrainingMode"
value = "scheduled"
}
pipeline_parameter_list {
name = "DataWindowDays"
value = "7"
}
}
}
# IAM Role for EventBridge
resource "aws_iam_role" "eventbridge_execution" {
name = "eventbridge-sagemaker-execution"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "events.amazonaws.com"
}
}]
})
}
resource "aws_iam_role_policy" "start_pipeline" {
name = "start-sagemaker-pipeline"
role = aws_iam_role.eventbridge_execution.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Action = [
"sagemaker:StartPipelineExecution"
]
Resource = aws_sagemaker_pipeline.training_pipeline.arn
}]
})
}
GCP: Cloud Scheduler with Pub/Sub
# gcp_scheduled_trigger.tf
resource "google_cloud_scheduler_job" "weekly_retrain" {
name = "weekly-model-retrain"
description = "Triggers weekly model retraining"
schedule = "0 2 * * 0" # Every Sunday at 2 AM
time_zone = "UTC"
pubsub_target {
topic_name = google_pubsub_topic.pipeline_trigger.id
data = base64encode(jsonencode({
trigger_type = "scheduled"
data_window_days = 7
}))
}
}
resource "google_pubsub_topic" "pipeline_trigger" {
name = "ml-pipeline-trigger"
}
resource "google_cloudfunctions2_function" "trigger_vertex" {
name = "trigger-vertex-pipeline"
location = var.region
description = "Triggers Vertex AI Pipeline from Pub/Sub"
build_config {
runtime = "python311"
entry_point = "trigger_pipeline"
source {
storage_source {
bucket = google_storage_bucket.functions.name
object = google_storage_bucket_object.function_code.name
}
}
}
service_config {
max_instance_count = 1
available_memory = "256M"
timeout_seconds = 60
service_account_email = google_service_account.pipeline_trigger.email
}
event_trigger {
trigger_region = var.region
event_type = "google.cloud.pubsub.topic.v1.messagePublished"
pubsub_topic = google_pubsub_topic.pipeline_trigger.id
}
}
# Cloud Function to trigger Vertex AI Pipeline
import functions_framework
import base64
import json
from google.cloud import aiplatform
@functions_framework.cloud_event
def trigger_pipeline(cloud_event):
"""Triggered by Pub/Sub message to start Vertex AI Pipeline."""
# Decode message
message_data = base64.b64decode(cloud_event.data["message"]["data"])
params = json.loads(message_data)
# Initialize Vertex AI
aiplatform.init(
project="my-project",
location="us-central1"
)
# Create and submit pipeline job
job = aiplatform.PipelineJob(
display_name=f"scheduled-training-{params.get('trigger_type', 'manual')}",
template_path="gs://my-bucket/pipelines/training-pipeline.json",
parameter_values={
"training_mode": params.get("trigger_type", "scheduled"),
"data_window_days": params.get("data_window_days", 7)
},
enable_caching=True
)
job.submit(service_account="pipeline-runner@my-project.iam.gserviceaccount.com")
return {"status": "submitted", "job_name": job.resource_name}
Azure: Logic Apps with Azure ML
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"resources": [
{
"type": "Microsoft.Logic/workflows",
"apiVersion": "2019-05-01",
"name": "weekly-retrain-trigger",
"location": "[resourceGroup().location]",
"properties": {
"definition": {
"$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
"triggers": {
"Recurrence": {
"type": "Recurrence",
"recurrence": {
"frequency": "Week",
"interval": 1,
"schedule": {
"weekDays": ["Sunday"],
"hours": ["2"]
},
"timeZone": "UTC"
}
}
},
"actions": {
"Submit_Pipeline": {
"type": "Http",
"inputs": {
"method": "POST",
"uri": "[concat('https://', parameters('mlWorkspaceName'), '.api.azureml.ms/pipelines/v1.0/subscriptions/', subscription().subscriptionId, '/resourceGroups/', resourceGroup().name, '/providers/Microsoft.MachineLearningServices/workspaces/', parameters('mlWorkspaceName'), '/PipelineRuns')]",
"headers": {
"Authorization": "Bearer @{body('Get_Access_Token')?['access_token']}",
"Content-Type": "application/json"
},
"body": {
"PipelineId": "[parameters('pipelineId')]",
"RunSource": "SDK",
"ParameterAssignments": {
"training_mode": "scheduled",
"data_window_days": 7
}
}
}
}
}
}
}
}
]
}
20.3.3. Pattern 2: Event-Driven Architectures (EDA)
Event-driven triggering is ideal when model freshness is paramount and data arrives in batches or streams.
AWS: The EventBridge + S3 Pattern
In AWS, Amazon EventBridge is the central nervous system. A common pattern involves triggering a pipeline when new ground-truth labels land in S3.
graph LR
A[Labeling Job] -->|"Writes"| B[S3: labels/]
B -->|"Object Created"| C[EventBridge]
C -->|"Rule Match"| D{Lambda Buffer}
D -->|"Batch Ready"| E[SageMaker Pipeline]
D -->|"Wait"| D
Complete Implementation: Terraform + Lambda
# event_driven_trigger.tf
# S3 bucket with EventBridge notifications enabled
resource "aws_s3_bucket" "mlops_data" {
bucket = "mlops-data-${var.environment}"
}
resource "aws_s3_bucket_notification" "eventbridge" {
bucket = aws_s3_bucket.mlops_data.id
eventbridge = true
}
# EventBridge Rule for new labels
resource "aws_cloudwatch_event_rule" "new_data" {
name = "new-training-data-arrived"
description = "Triggers when new labeled data arrives in S3"
event_pattern = jsonencode({
source = ["aws.s3"]
detail-type = ["Object Created"]
detail = {
bucket = {
name = [aws_s3_bucket.mlops_data.id]
}
object = {
key = [{ prefix = "labels/" }]
}
}
})
}
# Lambda for batching/deduplication
resource "aws_lambda_function" "event_batcher" {
function_name = "training-event-batcher"
runtime = "python3.11"
handler = "handler.lambda_handler"
role = aws_iam_role.lambda_execution.arn
timeout = 60
memory_size = 256
environment {
variables = {
DYNAMODB_TABLE = aws_dynamodb_table.event_buffer.name
PIPELINE_ARN = aws_sagemaker_pipeline.training.arn
BATCH_SIZE = "1000"
BATCH_WINDOW_SECS = "3600"
}
}
filename = "lambda/event_batcher.zip"
source_code_hash = filebase64sha256("lambda/event_batcher.zip")
}
resource "aws_cloudwatch_event_target" "lambda_batcher" {
rule = aws_cloudwatch_event_rule.new_data.name
target_id = "EventBatcher"
arn = aws_lambda_function.event_batcher.arn
}
resource "aws_lambda_permission" "eventbridge" {
statement_id = "AllowEventBridge"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.event_batcher.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.new_data.arn
}
# DynamoDB for event batching
resource "aws_dynamodb_table" "event_buffer" {
name = "training-event-buffer"
billing_mode = "PAY_PER_REQUEST"
hash_key = "batch_id"
range_key = "event_time"
attribute {
name = "batch_id"
type = "S"
}
attribute {
name = "event_time"
type = "S"
}
ttl {
attribute_name = "expiry_time"
enabled = true
}
}
# lambda/handler.py - Event Batching Logic
import boto3
import os
import json
import time
from datetime import datetime, timedelta
from decimal import Decimal
dynamodb = boto3.resource('dynamodb')
sagemaker = boto3.client('sagemaker')
TABLE_NAME = os.environ['DYNAMODB_TABLE']
PIPELINE_ARN = os.environ['PIPELINE_ARN']
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 1000))
BATCH_WINDOW_SECS = int(os.environ.get('BATCH_WINDOW_SECS', 3600))
table = dynamodb.Table(TABLE_NAME)
def lambda_handler(event, context):
"""
Batches S3 events and triggers pipeline when threshold is reached.
Batching Logic:
1. Each event is stored in DynamoDB
2. Check total count in current batch window
3. If count >= BATCH_SIZE or window expired, trigger pipeline
"""
# Extract S3 event details
detail = event.get('detail', {})
bucket = detail.get('bucket', {}).get('name')
key = detail.get('object', {}).get('key')
if not bucket or not key:
return {'statusCode': 400, 'body': 'Invalid event'}
# Current hour as batch ID (hourly batching)
batch_id = datetime.utcnow().strftime('%Y-%m-%d-%H')
event_time = datetime.utcnow().isoformat()
# Store event
table.put_item(Item={
'batch_id': batch_id,
'event_time': event_time,
's3_path': f's3://{bucket}/{key}',
'expiry_time': int(time.time()) + 86400 # 24h TTL
})
# Count events in current batch
response = table.query(
KeyConditionExpression='batch_id = :bid',
ExpressionAttributeValues={':bid': batch_id},
Select='COUNT'
)
count = response['Count']
# Check if we should trigger
should_trigger = False
trigger_reason = None
if count >= BATCH_SIZE:
should_trigger = True
trigger_reason = f'batch_size_reached:{count}'
# Check for window expiry (trigger at end of window even if below threshold)
window_start = datetime.strptime(batch_id, '%Y-%m-%d-%H')
window_end = window_start + timedelta(seconds=BATCH_WINDOW_SECS)
if datetime.utcnow() >= window_end and count > 0:
should_trigger = True
trigger_reason = f'window_expired:count={count}'
if should_trigger:
# Get all S3 paths in batch
items = table.query(
KeyConditionExpression='batch_id = :bid',
ExpressionAttributeValues={':bid': batch_id}
)['Items']
s3_paths = [item['s3_path'] for item in items]
# Trigger pipeline
response = sagemaker.start_pipeline_execution(
PipelineName=PIPELINE_ARN.split('/')[-1],
PipelineExecutionDisplayName=f'event-driven-{batch_id}',
PipelineParameters=[
{'Name': 'TriggerType', 'Value': 'event_driven'},
{'Name': 'TriggerReason', 'Value': trigger_reason},
{'Name': 'DataPaths', 'Value': json.dumps(s3_paths)},
{'Name': 'EventCount', 'Value': str(len(s3_paths))}
]
)
# Clear processed batch
for item in items:
table.delete_item(Key={
'batch_id': item['batch_id'],
'event_time': item['event_time']
})
return {
'statusCode': 200,
'body': json.dumps({
'triggered': True,
'reason': trigger_reason,
'pipeline_execution': response['PipelineExecutionArn']
})
}
return {
'statusCode': 200,
'body': json.dumps({
'triggered': False,
'current_count': count,
'threshold': BATCH_SIZE
})
}
GCP: Pub/Sub with Cloud Run
# cloud_run_trigger/main.py
from flask import Flask, request
from google.cloud import aiplatform, firestore
from datetime import datetime, timedelta
import json
import os
app = Flask(__name__)
db = firestore.Client()
PROJECT = os.environ['PROJECT_ID']
REGION = os.environ['REGION']
PIPELINE_TEMPLATE = os.environ['PIPELINE_TEMPLATE']
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 1000))
@app.route('/trigger', methods=['POST'])
def handle_pubsub():
"""Handle Pub/Sub push notifications from GCS."""
envelope = request.get_json()
if not envelope:
return 'Bad Request', 400
message = envelope.get('message', {})
data = json.loads(
base64.b64decode(message.get('data', '')).decode('utf-8')
)
bucket = data.get('bucket')
name = data.get('name')
if not bucket or not name:
return 'Invalid message', 400
# Store event in Firestore
batch_id = datetime.utcnow().strftime('%Y-%m-%d-%H')
doc_ref = db.collection('event_batches').document(batch_id)
# Atomic increment
doc_ref.set({
'events': firestore.ArrayUnion([{
'gcs_path': f'gs://{bucket}/{name}',
'timestamp': datetime.utcnow()
}]),
'updated_at': datetime.utcnow()
}, merge=True)
# Get current count
doc = doc_ref.get()
events = doc.to_dict().get('events', [])
if len(events) >= BATCH_SIZE:
trigger_pipeline(batch_id, events)
# Clear batch
doc_ref.delete()
return json.dumps({
'triggered': True,
'event_count': len(events)
}), 200
return json.dumps({
'triggered': False,
'current_count': len(events)
}), 200
def trigger_pipeline(batch_id: str, events: list):
"""Trigger Vertex AI Pipeline."""
aiplatform.init(project=PROJECT, location=REGION)
gcs_paths = [e['gcs_path'] for e in events]
job = aiplatform.PipelineJob(
display_name=f'event-driven-{batch_id}',
template_path=PIPELINE_TEMPLATE,
parameter_values={
'trigger_type': 'event_driven',
'data_paths': json.dumps(gcs_paths),
'event_count': len(events)
}
)
job.submit()
return job.resource_name
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
20.3.4. Pattern 3: Drift-Driven Architectures
This is the sophisticated “Self-Healing” pattern. Instead of training on a schedule (which might be wasteful) or on data arrival (which ignores model quality), we train only when the model needs it.
Types of Drift
| Drift Type | Description | Detection Method | Example |
|---|---|---|---|
| Data Drift | Input feature distribution changes | KL Divergence, PSI | New device types in traffic |
| Concept Drift | X→Y relationship changes | Performance degradation | Inflation affects $ thresholds |
| Prediction Drift | Output distribution changes | Distribution tests | Model becoming more conservative |
| Label Drift | Ground truth distribution changes | Historical comparison | Fraud rates increasing |
AWS: SageMaker Model Monitor Pipeline
graph TB
subgraph "Inference Layer"
A[SageMaker Endpoint] -->|Data Capture| B[S3: captured/]
end
subgraph "Monitoring Layer"
C[Model Monitor Schedule] -->|Hourly| D[Monitor Job]
D -->|Analyze| B
D -->|Baseline| E[S3: baseline/]
D -->|Results| F[S3: violations/]
end
subgraph "Alerting Layer"
F -->|Metric| G[CloudWatch]
G -->|Alarm| H[SNS]
H -->|Event| I[EventBridge]
end
subgraph "Response Layer"
I -->|Trigger| J[Lambda: Evaluate]
J -->|Auto-Approve?| K{Severity Check}
K -->|Low| L[SageMaker Pipeline]
K -->|High| M[Human Review]
end
Complete Implementation
# drift_driven_trigger.tf
# Model Monitor Schedule
resource "aws_sagemaker_monitoring_schedule" "drift_monitor" {
name = "model-drift-monitor"
monitoring_schedule_config {
monitoring_job_definition_name = aws_sagemaker_data_quality_job_definition.drift.name
monitoring_type = "DataQuality"
schedule_config {
schedule_expression = "cron(0 * * * ? *)" # Hourly
}
}
}
resource "aws_sagemaker_data_quality_job_definition" "drift" {
name = "drift-detection-job"
role_arn = aws_iam_role.sagemaker_execution.arn
data_quality_app_specification {
image_uri = "123456789.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer"
}
data_quality_job_input {
endpoint_input {
endpoint_name = aws_sagemaker_endpoint.production.name
local_path = "/opt/ml/processing/input"
s3_data_distribution_type = "FullyReplicated"
s3_input_mode = "File"
}
}
data_quality_job_output_config {
monitoring_outputs {
s3_output {
s3_uri = "s3://${aws_s3_bucket.monitoring.id}/violations/"
local_path = "/opt/ml/processing/output"
s3_upload_mode = "EndOfJob"
}
}
}
data_quality_baseline_config {
constraints_resource {
s3_uri = "s3://${aws_s3_bucket.monitoring.id}/baseline/constraints.json"
}
statistics_resource {
s3_uri = "s3://${aws_s3_bucket.monitoring.id}/baseline/statistics.json"
}
}
job_resources {
cluster_config {
instance_count = 1
instance_type = "ml.m5.xlarge"
volume_size_in_gb = 50
}
}
}
# CloudWatch Alarm on Drift Metrics
resource "aws_cloudwatch_metric_alarm" "drift_detected" {
alarm_name = "model-drift-detected"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "FeatureBaseline/drift_check_fail"
namespace = "/aws/sagemaker/Endpoints/data-metrics"
period = 3600
statistic = "Maximum"
threshold = 0
alarm_description = "Model drift detected by SageMaker Model Monitor"
dimensions = {
EndpointName = aws_sagemaker_endpoint.production.name
}
alarm_actions = [aws_sns_topic.drift_alerts.arn]
}
# SNS Topic for Drift Alerts
resource "aws_sns_topic" "drift_alerts" {
name = "model-drift-alerts"
}
# Lambda to evaluate drift severity and trigger response
resource "aws_lambda_function" "drift_evaluator" {
function_name = "drift-severity-evaluator"
runtime = "python3.11"
handler = "handler.evaluate_drift"
role = aws_iam_role.lambda_execution.arn
timeout = 120
memory_size = 512
environment {
variables = {
PIPELINE_ARN = aws_sagemaker_pipeline.training.arn
SEVERITY_THRESHOLD = "0.3"
AUTO_RETRAIN_ENABLED = "true"
SNS_TOPIC_ARN = aws_sns_topic.human_review.arn
}
}
filename = "lambda/drift_evaluator.zip"
source_code_hash = filebase64sha256("lambda/drift_evaluator.zip")
}
resource "aws_sns_topic_subscription" "drift_to_lambda" {
topic_arn = aws_sns_topic.drift_alerts.arn
protocol = "lambda"
endpoint = aws_lambda_function.drift_evaluator.arn
}
# lambda/drift_evaluator.py
import boto3
import json
import os
from typing import Dict, List, Tuple
from dataclasses import dataclass
sagemaker = boto3.client('sagemaker')
s3 = boto3.client('s3')
sns = boto3.client('sns')
PIPELINE_ARN = os.environ['PIPELINE_ARN']
SEVERITY_THRESHOLD = float(os.environ.get('SEVERITY_THRESHOLD', 0.3))
AUTO_RETRAIN_ENABLED = os.environ.get('AUTO_RETRAIN_ENABLED', 'false').lower() == 'true'
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')
@dataclass
class DriftAnalysis:
severity: str # 'low', 'medium', 'high', 'critical'
score: float
drifted_features: List[str]
recommendation: str
def evaluate_drift(event, context):
"""
Evaluate drift severity and determine response action.
Severity Levels:
- Low (<0.1): Log only, no action
- Medium (0.1-0.3): Auto-retrain if enabled
- High (0.3-0.5): Trigger retraining, notify team
- Critical (>0.5): Block automated retraining, require human review
"""
# Parse SNS message
message = json.loads(event['Records'][0]['Sns']['Message'])
# Get violation report from S3
violations = get_latest_violations()
# Analyze severity
analysis = analyze_drift_severity(violations)
# Determine response
response = determine_response(analysis)
return response
def get_latest_violations() -> Dict:
"""Retrieve latest violation report from S3."""
bucket = os.environ.get('MONITORING_BUCKET', 'mlops-monitoring')
prefix = 'violations/'
# Get latest violation file
response = s3.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
MaxKeys=1
)
if not response.get('Contents'):
return {}
latest_key = response['Contents'][0]['Key']
obj = s3.get_object(Bucket=bucket, Key=latest_key)
return json.loads(obj['Body'].read())
def analyze_drift_severity(violations: Dict) -> DriftAnalysis:
"""Analyze drift violations and determine severity."""
if not violations:
return DriftAnalysis(
severity='none',
score=0.0,
drifted_features=[],
recommendation='No action required'
)
# Extract feature violations
feature_violations = violations.get('features', {})
drifted_features = []
max_drift_score = 0.0
for feature, stats in feature_violations.items():
if stats.get('constraint_check_status') == 'Failed':
drifted_features.append(feature)
drift_score = stats.get('drift_score', 0)
max_drift_score = max(max_drift_score, drift_score)
# Determine severity
if max_drift_score >= 0.5:
severity = 'critical'
recommendation = 'BLOCK automated retraining. Investigate data source issues.'
elif max_drift_score >= 0.3:
severity = 'high'
recommendation = 'Trigger retraining with increased monitoring. Notify ML team.'
elif max_drift_score >= 0.1:
severity = 'medium'
recommendation = 'Auto-retrain if enabled. Monitor closely.'
else:
severity = 'low'
recommendation = 'Log for tracking. No immediate action required.'
return DriftAnalysis(
severity=severity,
score=max_drift_score,
drifted_features=drifted_features,
recommendation=recommendation
)
def determine_response(analysis: DriftAnalysis) -> Dict:
"""Determine and execute response based on drift analysis."""
response = {
'analysis': {
'severity': analysis.severity,
'score': analysis.score,
'drifted_features': analysis.drifted_features,
'recommendation': analysis.recommendation
},
'action_taken': None
}
if analysis.severity == 'critical':
# Human review required
notify_human_review(analysis)
response['action_taken'] = 'human_review_requested'
elif analysis.severity == 'high':
# Retrain + notify
trigger_retraining(analysis, require_approval=True)
notify_team(analysis)
response['action_taken'] = 'retraining_triggered_with_approval'
elif analysis.severity == 'medium' and AUTO_RETRAIN_ENABLED:
# Auto-retrain
trigger_retraining(analysis, require_approval=False)
response['action_taken'] = 'auto_retraining_triggered'
else:
# Log only
log_drift_event(analysis)
response['action_taken'] = 'logged_only'
return response
def trigger_retraining(analysis: DriftAnalysis, require_approval: bool = False):
"""Trigger SageMaker Pipeline for retraining."""
sagemaker.start_pipeline_execution(
PipelineName=PIPELINE_ARN.split('/')[-1],
PipelineExecutionDisplayName=f'drift-triggered-{analysis.severity}',
PipelineParameters=[
{'Name': 'TriggerType', 'Value': 'drift_driven'},
{'Name': 'DriftSeverity', 'Value': analysis.severity},
{'Name': 'DriftScore', 'Value': str(analysis.score)},
{'Name': 'DriftedFeatures', 'Value': json.dumps(analysis.drifted_features)},
{'Name': 'RequireApproval', 'Value': str(require_approval)}
]
)
def notify_human_review(analysis: DriftAnalysis):
"""Send notification requiring human review."""
if SNS_TOPIC_ARN:
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=f'[CRITICAL] Model Drift Requires Review',
Message=json.dumps({
'severity': analysis.severity,
'score': analysis.score,
'drifted_features': analysis.drifted_features,
'recommendation': analysis.recommendation,
'action_required': 'Review drift report and approve/reject retraining'
}, indent=2)
)
def notify_team(analysis: DriftAnalysis):
"""Send notification to ML team."""
# Similar to notify_human_review but less urgent
pass
def log_drift_event(analysis: DriftAnalysis):
"""Log drift event for tracking."""
print(json.dumps({
'event': 'drift_detected',
'severity': analysis.severity,
'score': analysis.score,
'features': analysis.drifted_features
}))
GCP: Vertex AI Model Monitoring
# vertex_drift_monitor.py
from google.cloud import aiplatform
from google.cloud.aiplatform import model_monitoring
from google.cloud import pubsub_v1
import json
def setup_model_monitoring(
project: str,
region: str,
endpoint_name: str,
email_recipients: list
):
"""Setup Vertex AI Model Monitoring with drift detection."""
aiplatform.init(project=project, location=region)
# Get endpoint
endpoint = aiplatform.Endpoint(endpoint_name)
# Define monitoring config
skew_config = model_monitoring.SkewDetectionConfig(
data_source="bq://project.dataset.training_table",
default_skew_threshold=0.3,
attribute_skew_thresholds={
"high_risk_feature": 0.1, # Stricter threshold for critical features
"medium_risk_feature": 0.2
}
)
drift_config = model_monitoring.DriftDetectionConfig(
default_drift_threshold=0.3,
attribute_drift_thresholds={
"high_risk_feature": 0.1
}
)
# Alerting config
email_config = model_monitoring.EmailAlertConfig(
user_emails=email_recipients
)
# Create monitoring job
monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
display_name="production-model-monitor",
endpoint=endpoint,
logging_sampling_strategy=model_monitoring.RandomSampleConfig(
sample_rate=1.0 # 100% sampling
),
schedule_config=model_monitoring.ScheduleConfig(
monitor_interval_hours=1
),
skew_detection_config=skew_config,
drift_detection_config=drift_config,
alert_config=email_config
)
return monitoring_job
def create_drift_response_pipeline():
"""Create Pub/Sub triggered Cloud Function for drift response."""
# Cloud Function code
function_code = '''
import functions_framework
from google.cloud import aiplatform
import json
import base64
@functions_framework.cloud_event
def handle_drift_alert(cloud_event):
"""Handle Model Monitoring drift alerts."""
data = json.loads(base64.b64decode(cloud_event.data["message"]["data"]))
# Parse monitoring alert
anomaly_type = data.get("anomalyType")
feature_name = data.get("featureName")
score = data.get("score", 0)
# Determine response
if score > 0.5:
# Critical - human review
send_to_slack("#ml-alerts", f"🚨 Critical drift: {feature_name} = {score}")
elif score > 0.3:
# High - auto retrain with approval
trigger_pipeline(
trigger_type="drift",
require_approval=True,
drift_info={"feature": feature_name, "score": score}
)
elif score > 0.1:
# Medium - auto retrain
trigger_pipeline(
trigger_type="drift",
require_approval=False,
drift_info={"feature": feature_name, "score": score}
)
def trigger_pipeline(trigger_type: str, require_approval: bool, drift_info: dict):
aiplatform.init(project="my-project", location="us-central1")
job = aiplatform.PipelineJob(
display_name=f"drift-triggered-{drift_info['feature']}",
template_path="gs://my-bucket/pipelines/training.json",
parameter_values={
"trigger_type": trigger_type,
"drift_feature": drift_info["feature"],
"drift_score": drift_info["score"],
"require_approval": require_approval
}
)
job.submit()
'''
return function_code
20.3.5. Pattern 4: Hybrid Triggering (Multi-Signal)
Production systems often combine multiple trigger types for robustness.
Multi-Signal Architecture
graph TB
subgraph "Trigger Sources"
A[Schedule: Weekly] --> D
B[Event: New Data] --> D
C[Drift: Quality Alert] --> D
end
D[Trigger Orchestrator] --> E{Evaluate Context}
E -->|Recent Training?| F[Debounce]
E -->|Cost Budget OK?| G[Cost Check]
E -->|Human Blocked?| H[Override Check]
F --> I[Training Pipeline]
G --> I
H --> I
Implementation: Smart Trigger Coordinator
# trigger_coordinator.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Dict, List
import boto3
import json
class TriggerType(Enum):
SCHEDULED = "scheduled"
EVENT_DRIVEN = "event_driven"
DRIFT_DRIVEN = "drift_driven"
MANUAL = "manual"
@dataclass
class TriggerContext:
trigger_type: TriggerType
timestamp: datetime
metadata: Dict
priority: int # 1=highest, 5=lowest
@dataclass
class TrainingDecision:
should_train: bool
reason: str
delay_until: Optional[datetime] = None
parameters: Optional[Dict] = None
class TriggerCoordinator:
"""
Coordinates multiple trigger sources and makes intelligent training decisions.
Features:
- Debouncing: Prevents training storms
- Cost management: Respects budget constraints
- Priority handling: Critical triggers override normal ones
- Cooldown periods: Minimum time between trainings
"""
def __init__(
self,
min_training_interval_hours: int = 6,
daily_training_budget: float = 1000.0,
max_daily_trainings: int = 4
):
self.min_interval = timedelta(hours=min_training_interval_hours)
self.daily_budget = daily_training_budget
self.max_daily = max_daily_trainings
self.dynamodb = boto3.resource('dynamodb')
self.state_table = self.dynamodb.Table('trigger-coordinator-state')
def evaluate(self, trigger: TriggerContext) -> TrainingDecision:
"""Evaluate whether to proceed with training."""
state = self._get_current_state()
# Check 1: Cooldown period
if state['last_training']:
last_training = datetime.fromisoformat(state['last_training'])
if datetime.utcnow() - last_training < self.min_interval:
remaining = self.min_interval - (datetime.utcnow() - last_training)
# Override for critical drift
if trigger.trigger_type == TriggerType.DRIFT_DRIVEN and trigger.priority <= 2:
pass # Allow critical drift to bypass cooldown
else:
return TrainingDecision(
should_train=False,
reason=f"In cooldown period. {remaining.total_seconds()/3600:.1f}h remaining",
delay_until=last_training + self.min_interval
)
# Check 2: Daily training limit
today_count = state.get('today_training_count', 0)
if today_count >= self.max_daily:
if trigger.priority > 2: # Non-critical
return TrainingDecision(
should_train=False,
reason=f"Daily training limit ({self.max_daily}) reached"
)
# Check 3: Budget check
today_spent = state.get('today_budget_spent', 0.0)
estimated_cost = self._estimate_training_cost(trigger)
if today_spent + estimated_cost > self.daily_budget:
if trigger.priority > 1: # Non-urgent
return TrainingDecision(
should_train=False,
reason=f"Would exceed daily budget (${today_spent:.2f} + ${estimated_cost:.2f} > ${self.daily_budget:.2f})"
)
# Check 4: Pending approvals
if state.get('pending_approval'):
return TrainingDecision(
should_train=False,
reason="Previous training pending approval"
)
# All checks passed
return TrainingDecision(
should_train=True,
reason=f"Approved: {trigger.trigger_type.value} trigger",
parameters=self._build_training_parameters(trigger, state)
)
def _get_current_state(self) -> Dict:
"""Get current coordinator state from DynamoDB."""
try:
response = self.state_table.get_item(Key={'pk': 'coordinator_state'})
return response.get('Item', {})
except Exception:
return {}
def _estimate_training_cost(self, trigger: TriggerContext) -> float:
"""Estimate training cost based on trigger context."""
base_cost = 150.0 # Base GPU cost
# Adjust based on data volume
data_size_gb = trigger.metadata.get('data_size_gb', 10)
size_factor = 1 + (data_size_gb / 100)
return base_cost * size_factor
def _build_training_parameters(self, trigger: TriggerContext, state: Dict) -> Dict:
"""Build training parameters based on trigger and state."""
return {
'trigger_type': trigger.trigger_type.value,
'trigger_timestamp': trigger.timestamp.isoformat(),
'trigger_priority': trigger.priority,
'training_sequence': state.get('total_trainings', 0) + 1,
**trigger.metadata
}
def record_training_started(self, decision: TrainingDecision):
"""Record that training has started."""
self.state_table.update_item(
Key={'pk': 'coordinator_state'},
UpdateExpression='''
SET last_training = :now,
today_training_count = if_not_exists(today_training_count, :zero) + :one,
total_trainings = if_not_exists(total_trainings, :zero) + :one
''',
ExpressionAttributeValues={
':now': datetime.utcnow().isoformat(),
':zero': 0,
':one': 1
}
)
# Lambda handler
def lambda_handler(event, context):
"""Main entry point for trigger coordination."""
coordinator = TriggerCoordinator(
min_training_interval_hours=6,
daily_training_budget=1000.0,
max_daily_trainings=4
)
# Parse trigger context from event
trigger = TriggerContext(
trigger_type=TriggerType(event.get('trigger_type', 'manual')),
timestamp=datetime.utcnow(),
metadata=event.get('metadata', {}),
priority=event.get('priority', 3)
)
# Evaluate
decision = coordinator.evaluate(trigger)
if decision.should_train:
# Start pipeline
sagemaker = boto3.client('sagemaker')
sagemaker.start_pipeline_execution(
PipelineName='training-pipeline',
PipelineParameters=[
{'Name': k, 'Value': str(v)}
for k, v in decision.parameters.items()
]
)
coordinator.record_training_started(decision)
return {
'should_train': decision.should_train,
'reason': decision.reason,
'delay_until': decision.delay_until.isoformat() if decision.delay_until else None
}
20.3.6. Feedback Loop Prevention
Caution
The Silent Killer: Automated drift-driven retraining can create catastrophic feedback loops where the model accepts gradually degrading data as “normal.”
The Feedback Loop Problem
graph LR
A[Model Drifts] --> B[Auto-Retrain on Drifted Data]
B --> C[New Model Accepts Drift]
C --> D[Drift Metrics Look Normal]
D --> E[Real Performance Degrades]
E --> A
Mitigation Strategies
# feedback_loop_prevention.py
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timedelta
import numpy as np
@dataclass
class SafetyCheck:
passed: bool
check_name: str
details: str
class RetrainingGuardrails:
"""
Guardrails to prevent feedback loop catastrophe.
Key Principles:
1. Never retrain on purely production data
2. Always compare against immutable baseline
3. Require performance validation before promotion
4. Implement staged rollouts
"""
def __init__(
self,
min_golden_set_performance: float = 0.85,
max_baseline_drift: float = 0.4,
require_human_approval_threshold: float = 0.2
):
self.min_golden_performance = min_golden_set_performance
self.max_baseline_drift = max_baseline_drift
self.human_approval_threshold = require_human_approval_threshold
def validate_retraining_safety(
self,
new_model_metrics: dict,
baseline_metrics: dict,
golden_set_results: dict
) -> List[SafetyCheck]:
"""Run all safety checks before allowing model promotion."""
checks = []
# Check 1: Golden Set Performance
golden_accuracy = golden_set_results.get('accuracy', 0)
checks.append(SafetyCheck(
passed=golden_accuracy >= self.min_golden_performance,
check_name="golden_set_performance",
details=f"Accuracy: {golden_accuracy:.3f} (min: {self.min_golden_performance})"
))
# Check 2: Baseline Comparison
baseline_delta = abs(
new_model_metrics.get('accuracy', 0) -
baseline_metrics.get('accuracy', 0)
)
checks.append(SafetyCheck(
passed=baseline_delta <= self.max_baseline_drift,
check_name="baseline_drift",
details=f"Delta from baseline: {baseline_delta:.3f} (max: {self.max_baseline_drift})"
))
# Check 3: Prediction Distribution Sanity
pred_distribution = new_model_metrics.get('prediction_distribution', {})
baseline_distribution = baseline_metrics.get('prediction_distribution', {})
distribution_shift = self._calculate_distribution_shift(
pred_distribution, baseline_distribution
)
checks.append(SafetyCheck(
passed=distribution_shift < 0.5,
check_name="prediction_distribution",
details=f"Distribution shift: {distribution_shift:.3f}"
))
# Check 4: Error Pattern Analysis
error_patterns = new_model_metrics.get('error_patterns', [])
checks.append(SafetyCheck(
passed=not self._detect_systematic_errors(error_patterns),
check_name="systematic_errors",
details=f"Checked {len(error_patterns)} error patterns"
))
return checks
def _calculate_distribution_shift(
self,
current: dict,
baseline: dict
) -> float:
"""Calculate KL divergence between distributions."""
# Simplified implementation
all_keys = set(current.keys()) | set(baseline.keys())
current_vals = np.array([current.get(k, 0.001) for k in all_keys])
baseline_vals = np.array([baseline.get(k, 0.001) for k in all_keys])
# Normalize
current_vals = current_vals / current_vals.sum()
baseline_vals = baseline_vals / baseline_vals.sum()
# KL Divergence
return np.sum(current_vals * np.log(current_vals / baseline_vals))
def _detect_systematic_errors(self, error_patterns: List) -> bool:
"""Detect if errors are systematic (potential feedback loop)."""
if not error_patterns:
return False
# Check for clustering of errors
# (simplified - would use more sophisticated analysis in production)
error_types = [e.get('type') for e in error_patterns]
type_counts = {}
for t in error_types:
type_counts[t] = type_counts.get(t, 0) + 1
max_concentration = max(type_counts.values()) / len(error_types)
return max_concentration > 0.7 # Too concentrated = systematic
def create_safe_retraining_pipeline():
"""Example SageMaker Pipeline with safety checks."""
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThan
# Step 1: Train on mixed data (production + baseline holdout)
# Step 2: Evaluate on golden set (immutable)
# Step 3: Safety checks
# Step 4: Conditional promotion
pipeline_definition = """
Steps:
1. DataPreparation:
- Mix production data (70%) with baseline holdout (30%)
- This prevents pure drift absorption
2. Training:
- Train new candidate model
- Log all metrics
3. GoldenSetEvaluation:
- Evaluate on immutable golden set
- Golden set is NEVER updated
4. SafetyChecks:
- Run guardrails validation
- All checks must pass
5. ShadowDeployment:
- Deploy to shadow endpoint
- Run A/B against production (no user impact)
6. HumanApproval (if drift > threshold):
- Require manual review
- Present safety check results
7. GradualRollout:
- 5% -> 25% -> 50% -> 100%
- Auto-rollback if metrics degrade
"""
return pipeline_definition
20.3.7. Comparison: Choosing Your Trigger Pattern
| Trigger Type | Pros | Cons | Best For | AWS Service | GCP Service |
|---|---|---|---|---|---|
| Scheduled | Simple, Predictable | Can be wasteful or too slow | Stable domains | EventBridge | Cloud Scheduler |
| Event-Driven | Reactive, Fresh data | Noisy, Trigger storms | Real-time critical | EventBridge + Lambda | Pub/Sub + Cloud Functions |
| Drift-Driven | Efficient, ROI-focused | Complex, Loop risk | High-scale, Cost-sensitive | Model Monitor + CloudWatch | Vertex AI Monitoring |
| Hybrid | Robust, Flexible | Complex orchestration | Enterprise production | Step Functions | Cloud Workflows |
Decision Matrix
IF data_arrival_is_predictable AND model_is_stable:
USE scheduled_trigger
INTERVAL = business_cycle (daily/weekly/monthly)
ELIF data_is_streaming AND freshness_critical:
USE event_driven_trigger
ADD batching_layer (prevent storm)
ADD deduplication
ELIF cost_is_primary_concern AND have_monitoring:
USE drift_driven_trigger
SET conservative_thresholds
ADD human_approval_for_critical
ELSE:
USE hybrid_approach
COMBINE scheduled_baseline + drift_override
ADD central_coordinator
20.3.8. Observability for Triggers
CloudWatch Dashboard (Terraform)
resource "aws_cloudwatch_dashboard" "trigger_monitoring" {
dashboard_name = "ml-trigger-monitoring"
dashboard_body = jsonencode({
widgets = [
{
type = "metric"
x = 0
y = 0
width = 12
height = 6
properties = {
metrics = [
["MLOps", "TriggerEvents", "Type", "scheduled"],
[".", ".", ".", "event_driven"],
[".", ".", ".", "drift_driven"]
]
title = "Trigger Events by Type"
region = var.aws_region
stat = "Sum"
period = 3600
}
},
{
type = "metric"
x = 12
y = 0
width = 12
height = 6
properties = {
metrics = [
["MLOps", "TrainingCost", "Result", "completed"],
[".", ".", ".", "rejected"]
]
title = "Training Decisions"
region = var.aws_region
}
},
{
type = "metric"
x = 0
y = 6
width = 24
height = 6
properties = {
metrics = [
["MLOps", "DriftScore", "Feature", "All"]
]
title = "Drift Scores Over Time"
region = var.aws_region
view = "timeSeries"
}
}
]
})
}
20.3.9. Summary Checklist
For Scheduled Triggers
- Define appropriate interval based on business cycle
- Set up alerting for missed executions
- Monitor for data staleness between runs
For Event-Driven Triggers
- Implement batching to prevent trigger storms
- Add deduplication logic
- Set up dead-letter queues for failed triggers
- Monitor event processing latency
For Drift-Driven Triggers
- Establish immutable baseline dataset
- Define thresholds for each severity level
- Implement feedback loop prevention
- Require human approval for critical drift
- Set up golden set evaluation
For Hybrid Systems
- Implement central coordinator
- Define priority system for competing triggers
- Set up cost budgets and limits
- Configure cooldown periods
- Monitor trigger decision rationale
Conclusion
The choice of triggering pattern defines the “liveness” of your AI system.
- Start with Scheduled (Cron is King for a reason)
- Move to Event-Driven only if latency costs revenue
- Move to Drift-Driven only if you have robust automated evaluation and rollout safety nets in place
- Consider Hybrid for production-grade systems that need resilience
Ultimately, the goal is to close the loop between the data scientist’s code and the production environment, minimizing the “Time-to-Adapt” for the AI system while maintaining safety and cost efficiency.
[End of Section 20.3]