Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

20.3 Triggering Patterns: Event-Driven vs. Drift-Driven

Automating the execution of a pipeline is only half the battle. The other half is determining when that pipeline should execute. In the early stages of MLOps maturity, humans push buttons. In the middle stages, cron jobs run on schedules. In advanced stages, the system reacts to the world around it.

This chapter explores the architectural patterns for triggering Continuous Training (CT) pipelines, moving from static schedules to dynamic, event-driven, and drift-aware systems.


20.3.1. The Triggering Maturity Model

graph LR
    subgraph "Level 0: Manual"
        A[Data Scientist] -->|"Button Click"| B[Pipeline]
    end
    
    subgraph "Level 1: Scheduled"
        C[Cron Job] -->|"Every Sunday 2AM"| D[Pipeline]
    end
    
    subgraph "Level 2: Event-Driven"
        E[Data Landing] -->|"New Data Event"| F[Pipeline]
    end
    
    subgraph "Level 3: Drift-Driven"
        G[Monitor] -->|"Quality Alert"| H[Pipeline]
    end
    
    subgraph "Level 4: Adaptive"
        I[Cost/Quality Optimizer] -->|"Dynamic Decision"| J[Pipeline]
    end

The Triggering Hierarchy

LevelTriggerDecision MakerLatencyCost Efficiency
0ManualHumanDays-WeeksVery Low
1ScheduledTimeFixedLow-Medium
2Event-DrivenData ArrivalMinutes-HoursMedium
3Drift-DrivenModel QualityHours-DaysHigh
4AdaptiveMulti-factorOptimalVery High

20.3.2. Pattern 1: Scheduled Triggers (The Baseline)

Before diving into sophisticated patterns, let’s establish the baseline: cron-based scheduling.

When Scheduled Makes Sense

  • Stable domains: Sales forecasting, monthly reports
  • Predictable data cadence: Daily batch loads, weekly updates
  • Budget constraints: Training costs are fixed and predictable
  • Early maturity: Simple to implement, easy to debug

AWS: EventBridge Scheduled Rules

# scheduled_trigger.tf - AWS Implementation

resource "aws_cloudwatch_event_rule" "weekly_retrain" {
  name                = "weekly-model-retrain"
  description         = "Triggers model retraining every Sunday at 2 AM UTC"
  schedule_expression = "cron(0 2 ? * SUN *)"
  
  tags = {
    Environment = var.environment
    Purpose     = "scheduled-retraining"
  }
}

resource "aws_cloudwatch_event_target" "sagemaker_scheduled" {
  rule      = aws_cloudwatch_event_rule.weekly_retrain.name
  target_id = "WeeklyRetraining"
  arn       = aws_sagemaker_pipeline.training_pipeline.arn
  role_arn  = aws_iam_role.eventbridge_execution.arn

  sagemaker_pipeline_target {
    pipeline_parameter_list {
      name  = "TrainingMode"
      value = "scheduled"
    }
    pipeline_parameter_list {
      name  = "DataWindowDays"
      value = "7"
    }
  }
}

# IAM Role for EventBridge
resource "aws_iam_role" "eventbridge_execution" {
  name = "eventbridge-sagemaker-execution"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "events.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy" "start_pipeline" {
  name = "start-sagemaker-pipeline"
  role = aws_iam_role.eventbridge_execution.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Action = [
        "sagemaker:StartPipelineExecution"
      ]
      Resource = aws_sagemaker_pipeline.training_pipeline.arn
    }]
  })
}

GCP: Cloud Scheduler with Pub/Sub

# gcp_scheduled_trigger.tf

resource "google_cloud_scheduler_job" "weekly_retrain" {
  name        = "weekly-model-retrain"
  description = "Triggers weekly model retraining"
  schedule    = "0 2 * * 0"  # Every Sunday at 2 AM
  time_zone   = "UTC"
  
  pubsub_target {
    topic_name = google_pubsub_topic.pipeline_trigger.id
    data       = base64encode(jsonencode({
      trigger_type    = "scheduled"
      data_window_days = 7
    }))
  }
}

resource "google_pubsub_topic" "pipeline_trigger" {
  name = "ml-pipeline-trigger"
}

resource "google_cloudfunctions2_function" "trigger_vertex" {
  name        = "trigger-vertex-pipeline"
  location    = var.region
  description = "Triggers Vertex AI Pipeline from Pub/Sub"

  build_config {
    runtime     = "python311"
    entry_point = "trigger_pipeline"
    source {
      storage_source {
        bucket = google_storage_bucket.functions.name
        object = google_storage_bucket_object.function_code.name
      }
    }
  }

  service_config {
    max_instance_count = 1
    available_memory   = "256M"
    timeout_seconds    = 60
    service_account_email = google_service_account.pipeline_trigger.email
  }

  event_trigger {
    trigger_region = var.region
    event_type     = "google.cloud.pubsub.topic.v1.messagePublished"
    pubsub_topic   = google_pubsub_topic.pipeline_trigger.id
  }
}
# Cloud Function to trigger Vertex AI Pipeline
import functions_framework
import base64
import json
from google.cloud import aiplatform

@functions_framework.cloud_event
def trigger_pipeline(cloud_event):
    """Triggered by Pub/Sub message to start Vertex AI Pipeline."""
    
    # Decode message
    message_data = base64.b64decode(cloud_event.data["message"]["data"])
    params = json.loads(message_data)
    
    # Initialize Vertex AI
    aiplatform.init(
        project="my-project",
        location="us-central1"
    )
    
    # Create and submit pipeline job
    job = aiplatform.PipelineJob(
        display_name=f"scheduled-training-{params.get('trigger_type', 'manual')}",
        template_path="gs://my-bucket/pipelines/training-pipeline.json",
        parameter_values={
            "training_mode": params.get("trigger_type", "scheduled"),
            "data_window_days": params.get("data_window_days", 7)
        },
        enable_caching=True
    )
    
    job.submit(service_account="pipeline-runner@my-project.iam.gserviceaccount.com")
    
    return {"status": "submitted", "job_name": job.resource_name}

Azure: Logic Apps with Azure ML

{
  "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
  "contentVersion": "1.0.0.0",
  "resources": [
    {
      "type": "Microsoft.Logic/workflows",
      "apiVersion": "2019-05-01",
      "name": "weekly-retrain-trigger",
      "location": "[resourceGroup().location]",
      "properties": {
        "definition": {
          "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
          "triggers": {
            "Recurrence": {
              "type": "Recurrence",
              "recurrence": {
                "frequency": "Week",
                "interval": 1,
                "schedule": {
                  "weekDays": ["Sunday"],
                  "hours": ["2"]
                },
                "timeZone": "UTC"
              }
            }
          },
          "actions": {
            "Submit_Pipeline": {
              "type": "Http",
              "inputs": {
                "method": "POST",
                "uri": "[concat('https://', parameters('mlWorkspaceName'), '.api.azureml.ms/pipelines/v1.0/subscriptions/', subscription().subscriptionId, '/resourceGroups/', resourceGroup().name, '/providers/Microsoft.MachineLearningServices/workspaces/', parameters('mlWorkspaceName'), '/PipelineRuns')]",
                "headers": {
                  "Authorization": "Bearer @{body('Get_Access_Token')?['access_token']}",
                  "Content-Type": "application/json"
                },
                "body": {
                  "PipelineId": "[parameters('pipelineId')]",
                  "RunSource": "SDK",
                  "ParameterAssignments": {
                    "training_mode": "scheduled",
                    "data_window_days": 7
                  }
                }
              }
            }
          }
        }
      }
    }
  ]
}

20.3.3. Pattern 2: Event-Driven Architectures (EDA)

Event-driven triggering is ideal when model freshness is paramount and data arrives in batches or streams.

AWS: The EventBridge + S3 Pattern

In AWS, Amazon EventBridge is the central nervous system. A common pattern involves triggering a pipeline when new ground-truth labels land in S3.

graph LR
    A[Labeling Job] -->|"Writes"| B[S3: labels/]
    B -->|"Object Created"| C[EventBridge]
    C -->|"Rule Match"| D{Lambda Buffer}
    D -->|"Batch Ready"| E[SageMaker Pipeline]
    D -->|"Wait"| D

Complete Implementation: Terraform + Lambda

# event_driven_trigger.tf

# S3 bucket with EventBridge notifications enabled
resource "aws_s3_bucket" "mlops_data" {
  bucket = "mlops-data-${var.environment}"
}

resource "aws_s3_bucket_notification" "eventbridge" {
  bucket      = aws_s3_bucket.mlops_data.id
  eventbridge = true
}

# EventBridge Rule for new labels
resource "aws_cloudwatch_event_rule" "new_data" {
  name        = "new-training-data-arrived"
  description = "Triggers when new labeled data arrives in S3"

  event_pattern = jsonencode({
    source      = ["aws.s3"]
    detail-type = ["Object Created"]
    detail = {
      bucket = {
        name = [aws_s3_bucket.mlops_data.id]
      }
      object = {
        key = [{ prefix = "labels/" }]
      }
    }
  })
}

# Lambda for batching/deduplication
resource "aws_lambda_function" "event_batcher" {
  function_name = "training-event-batcher"
  runtime       = "python3.11"
  handler       = "handler.lambda_handler"
  role          = aws_iam_role.lambda_execution.arn
  timeout       = 60
  memory_size   = 256

  environment {
    variables = {
      DYNAMODB_TABLE    = aws_dynamodb_table.event_buffer.name
      PIPELINE_ARN      = aws_sagemaker_pipeline.training.arn
      BATCH_SIZE        = "1000"
      BATCH_WINDOW_SECS = "3600"
    }
  }

  filename         = "lambda/event_batcher.zip"
  source_code_hash = filebase64sha256("lambda/event_batcher.zip")
}

resource "aws_cloudwatch_event_target" "lambda_batcher" {
  rule      = aws_cloudwatch_event_rule.new_data.name
  target_id = "EventBatcher"
  arn       = aws_lambda_function.event_batcher.arn
}

resource "aws_lambda_permission" "eventbridge" {
  statement_id  = "AllowEventBridge"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.event_batcher.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.new_data.arn
}

# DynamoDB for event batching
resource "aws_dynamodb_table" "event_buffer" {
  name         = "training-event-buffer"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "batch_id"
  range_key    = "event_time"

  attribute {
    name = "batch_id"
    type = "S"
  }

  attribute {
    name = "event_time"
    type = "S"
  }

  ttl {
    attribute_name = "expiry_time"
    enabled        = true
  }
}
# lambda/handler.py - Event Batching Logic

import boto3
import os
import json
import time
from datetime import datetime, timedelta
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
sagemaker = boto3.client('sagemaker')

TABLE_NAME = os.environ['DYNAMODB_TABLE']
PIPELINE_ARN = os.environ['PIPELINE_ARN']
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 1000))
BATCH_WINDOW_SECS = int(os.environ.get('BATCH_WINDOW_SECS', 3600))

table = dynamodb.Table(TABLE_NAME)


def lambda_handler(event, context):
    """
    Batches S3 events and triggers pipeline when threshold is reached.
    
    Batching Logic:
    1. Each event is stored in DynamoDB
    2. Check total count in current batch window
    3. If count >= BATCH_SIZE or window expired, trigger pipeline
    """
    
    # Extract S3 event details
    detail = event.get('detail', {})
    bucket = detail.get('bucket', {}).get('name')
    key = detail.get('object', {}).get('key')
    
    if not bucket or not key:
        return {'statusCode': 400, 'body': 'Invalid event'}
    
    # Current hour as batch ID (hourly batching)
    batch_id = datetime.utcnow().strftime('%Y-%m-%d-%H')
    event_time = datetime.utcnow().isoformat()
    
    # Store event
    table.put_item(Item={
        'batch_id': batch_id,
        'event_time': event_time,
        's3_path': f's3://{bucket}/{key}',
        'expiry_time': int(time.time()) + 86400  # 24h TTL
    })
    
    # Count events in current batch
    response = table.query(
        KeyConditionExpression='batch_id = :bid',
        ExpressionAttributeValues={':bid': batch_id},
        Select='COUNT'
    )
    count = response['Count']
    
    # Check if we should trigger
    should_trigger = False
    trigger_reason = None
    
    if count >= BATCH_SIZE:
        should_trigger = True
        trigger_reason = f'batch_size_reached:{count}'
    
    # Check for window expiry (trigger at end of window even if below threshold)
    window_start = datetime.strptime(batch_id, '%Y-%m-%d-%H')
    window_end = window_start + timedelta(seconds=BATCH_WINDOW_SECS)
    
    if datetime.utcnow() >= window_end and count > 0:
        should_trigger = True
        trigger_reason = f'window_expired:count={count}'
    
    if should_trigger:
        # Get all S3 paths in batch
        items = table.query(
            KeyConditionExpression='batch_id = :bid',
            ExpressionAttributeValues={':bid': batch_id}
        )['Items']
        
        s3_paths = [item['s3_path'] for item in items]
        
        # Trigger pipeline
        response = sagemaker.start_pipeline_execution(
            PipelineName=PIPELINE_ARN.split('/')[-1],
            PipelineExecutionDisplayName=f'event-driven-{batch_id}',
            PipelineParameters=[
                {'Name': 'TriggerType', 'Value': 'event_driven'},
                {'Name': 'TriggerReason', 'Value': trigger_reason},
                {'Name': 'DataPaths', 'Value': json.dumps(s3_paths)},
                {'Name': 'EventCount', 'Value': str(len(s3_paths))}
            ]
        )
        
        # Clear processed batch
        for item in items:
            table.delete_item(Key={
                'batch_id': item['batch_id'],
                'event_time': item['event_time']
            })
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'triggered': True,
                'reason': trigger_reason,
                'pipeline_execution': response['PipelineExecutionArn']
            })
        }
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'triggered': False,
            'current_count': count,
            'threshold': BATCH_SIZE
        })
    }

GCP: Pub/Sub with Cloud Run

# cloud_run_trigger/main.py

from flask import Flask, request
from google.cloud import aiplatform, firestore
from datetime import datetime, timedelta
import json
import os

app = Flask(__name__)
db = firestore.Client()

PROJECT = os.environ['PROJECT_ID']
REGION = os.environ['REGION']
PIPELINE_TEMPLATE = os.environ['PIPELINE_TEMPLATE']
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 1000))


@app.route('/trigger', methods=['POST'])
def handle_pubsub():
    """Handle Pub/Sub push notifications from GCS."""
    
    envelope = request.get_json()
    if not envelope:
        return 'Bad Request', 400
    
    message = envelope.get('message', {})
    data = json.loads(
        base64.b64decode(message.get('data', '')).decode('utf-8')
    )
    
    bucket = data.get('bucket')
    name = data.get('name')
    
    if not bucket or not name:
        return 'Invalid message', 400
    
    # Store event in Firestore
    batch_id = datetime.utcnow().strftime('%Y-%m-%d-%H')
    doc_ref = db.collection('event_batches').document(batch_id)
    
    # Atomic increment
    doc_ref.set({
        'events': firestore.ArrayUnion([{
            'gcs_path': f'gs://{bucket}/{name}',
            'timestamp': datetime.utcnow()
        }]),
        'updated_at': datetime.utcnow()
    }, merge=True)
    
    # Get current count
    doc = doc_ref.get()
    events = doc.to_dict().get('events', [])
    
    if len(events) >= BATCH_SIZE:
        trigger_pipeline(batch_id, events)
        # Clear batch
        doc_ref.delete()
        
        return json.dumps({
            'triggered': True,
            'event_count': len(events)
        }), 200
    
    return json.dumps({
        'triggered': False,
        'current_count': len(events)
    }), 200


def trigger_pipeline(batch_id: str, events: list):
    """Trigger Vertex AI Pipeline."""
    
    aiplatform.init(project=PROJECT, location=REGION)
    
    gcs_paths = [e['gcs_path'] for e in events]
    
    job = aiplatform.PipelineJob(
        display_name=f'event-driven-{batch_id}',
        template_path=PIPELINE_TEMPLATE,
        parameter_values={
            'trigger_type': 'event_driven',
            'data_paths': json.dumps(gcs_paths),
            'event_count': len(events)
        }
    )
    
    job.submit()
    return job.resource_name


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

20.3.4. Pattern 3: Drift-Driven Architectures

This is the sophisticated “Self-Healing” pattern. Instead of training on a schedule (which might be wasteful) or on data arrival (which ignores model quality), we train only when the model needs it.

Types of Drift

Drift TypeDescriptionDetection MethodExample
Data DriftInput feature distribution changesKL Divergence, PSINew device types in traffic
Concept DriftX→Y relationship changesPerformance degradationInflation affects $ thresholds
Prediction DriftOutput distribution changesDistribution testsModel becoming more conservative
Label DriftGround truth distribution changesHistorical comparisonFraud rates increasing

AWS: SageMaker Model Monitor Pipeline

graph TB
    subgraph "Inference Layer"
        A[SageMaker Endpoint] -->|Data Capture| B[S3: captured/]
    end
    
    subgraph "Monitoring Layer"
        C[Model Monitor Schedule] -->|Hourly| D[Monitor Job]
        D -->|Analyze| B
        D -->|Baseline| E[S3: baseline/]
        D -->|Results| F[S3: violations/]
    end
    
    subgraph "Alerting Layer"
        F -->|Metric| G[CloudWatch]
        G -->|Alarm| H[SNS]
        H -->|Event| I[EventBridge]
    end
    
    subgraph "Response Layer"
        I -->|Trigger| J[Lambda: Evaluate]
        J -->|Auto-Approve?| K{Severity Check}
        K -->|Low| L[SageMaker Pipeline]
        K -->|High| M[Human Review]
    end

Complete Implementation

# drift_driven_trigger.tf

# Model Monitor Schedule
resource "aws_sagemaker_monitoring_schedule" "drift_monitor" {
  name = "model-drift-monitor"

  monitoring_schedule_config {
    monitoring_job_definition_name = aws_sagemaker_data_quality_job_definition.drift.name
    monitoring_type                = "DataQuality"

    schedule_config {
      schedule_expression = "cron(0 * * * ? *)"  # Hourly
    }
  }
}

resource "aws_sagemaker_data_quality_job_definition" "drift" {
  name     = "drift-detection-job"
  role_arn = aws_iam_role.sagemaker_execution.arn

  data_quality_app_specification {
    image_uri = "123456789.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer"
  }

  data_quality_job_input {
    endpoint_input {
      endpoint_name          = aws_sagemaker_endpoint.production.name
      local_path             = "/opt/ml/processing/input"
      s3_data_distribution_type = "FullyReplicated"
      s3_input_mode          = "File"
    }
  }

  data_quality_job_output_config {
    monitoring_outputs {
      s3_output {
        s3_uri        = "s3://${aws_s3_bucket.monitoring.id}/violations/"
        local_path    = "/opt/ml/processing/output"
        s3_upload_mode = "EndOfJob"
      }
    }
  }

  data_quality_baseline_config {
    constraints_resource {
      s3_uri = "s3://${aws_s3_bucket.monitoring.id}/baseline/constraints.json"
    }
    statistics_resource {
      s3_uri = "s3://${aws_s3_bucket.monitoring.id}/baseline/statistics.json"
    }
  }

  job_resources {
    cluster_config {
      instance_count    = 1
      instance_type     = "ml.m5.xlarge"
      volume_size_in_gb = 50
    }
  }
}

# CloudWatch Alarm on Drift Metrics
resource "aws_cloudwatch_metric_alarm" "drift_detected" {
  alarm_name          = "model-drift-detected"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "FeatureBaseline/drift_check_fail"
  namespace           = "/aws/sagemaker/Endpoints/data-metrics"
  period              = 3600
  statistic           = "Maximum"
  threshold           = 0
  alarm_description   = "Model drift detected by SageMaker Model Monitor"
  
  dimensions = {
    EndpointName = aws_sagemaker_endpoint.production.name
  }

  alarm_actions = [aws_sns_topic.drift_alerts.arn]
}

# SNS Topic for Drift Alerts
resource "aws_sns_topic" "drift_alerts" {
  name = "model-drift-alerts"
}

# Lambda to evaluate drift severity and trigger response
resource "aws_lambda_function" "drift_evaluator" {
  function_name = "drift-severity-evaluator"
  runtime       = "python3.11"
  handler       = "handler.evaluate_drift"
  role          = aws_iam_role.lambda_execution.arn
  timeout       = 120
  memory_size   = 512

  environment {
    variables = {
      PIPELINE_ARN          = aws_sagemaker_pipeline.training.arn
      SEVERITY_THRESHOLD    = "0.3"
      AUTO_RETRAIN_ENABLED  = "true"
      SNS_TOPIC_ARN         = aws_sns_topic.human_review.arn
    }
  }

  filename         = "lambda/drift_evaluator.zip"
  source_code_hash = filebase64sha256("lambda/drift_evaluator.zip")
}

resource "aws_sns_topic_subscription" "drift_to_lambda" {
  topic_arn = aws_sns_topic.drift_alerts.arn
  protocol  = "lambda"
  endpoint  = aws_lambda_function.drift_evaluator.arn
}
# lambda/drift_evaluator.py

import boto3
import json
import os
from typing import Dict, List, Tuple
from dataclasses import dataclass

sagemaker = boto3.client('sagemaker')
s3 = boto3.client('s3')
sns = boto3.client('sns')

PIPELINE_ARN = os.environ['PIPELINE_ARN']
SEVERITY_THRESHOLD = float(os.environ.get('SEVERITY_THRESHOLD', 0.3))
AUTO_RETRAIN_ENABLED = os.environ.get('AUTO_RETRAIN_ENABLED', 'false').lower() == 'true'
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')


@dataclass
class DriftAnalysis:
    severity: str  # 'low', 'medium', 'high', 'critical'
    score: float
    drifted_features: List[str]
    recommendation: str


def evaluate_drift(event, context):
    """
    Evaluate drift severity and determine response action.
    
    Severity Levels:
    - Low (<0.1): Log only, no action
    - Medium (0.1-0.3): Auto-retrain if enabled
    - High (0.3-0.5): Trigger retraining, notify team
    - Critical (>0.5): Block automated retraining, require human review
    """
    
    # Parse SNS message
    message = json.loads(event['Records'][0]['Sns']['Message'])
    
    # Get violation report from S3
    violations = get_latest_violations()
    
    # Analyze severity
    analysis = analyze_drift_severity(violations)
    
    # Determine response
    response = determine_response(analysis)
    
    return response


def get_latest_violations() -> Dict:
    """Retrieve latest violation report from S3."""
    
    bucket = os.environ.get('MONITORING_BUCKET', 'mlops-monitoring')
    prefix = 'violations/'
    
    # Get latest violation file
    response = s3.list_objects_v2(
        Bucket=bucket,
        Prefix=prefix,
        MaxKeys=1
    )
    
    if not response.get('Contents'):
        return {}
    
    latest_key = response['Contents'][0]['Key']
    obj = s3.get_object(Bucket=bucket, Key=latest_key)
    return json.loads(obj['Body'].read())


def analyze_drift_severity(violations: Dict) -> DriftAnalysis:
    """Analyze drift violations and determine severity."""
    
    if not violations:
        return DriftAnalysis(
            severity='none',
            score=0.0,
            drifted_features=[],
            recommendation='No action required'
        )
    
    # Extract feature violations
    feature_violations = violations.get('features', {})
    drifted_features = []
    max_drift_score = 0.0
    
    for feature, stats in feature_violations.items():
        if stats.get('constraint_check_status') == 'Failed':
            drifted_features.append(feature)
            drift_score = stats.get('drift_score', 0)
            max_drift_score = max(max_drift_score, drift_score)
    
    # Determine severity
    if max_drift_score >= 0.5:
        severity = 'critical'
        recommendation = 'BLOCK automated retraining. Investigate data source issues.'
    elif max_drift_score >= 0.3:
        severity = 'high'
        recommendation = 'Trigger retraining with increased monitoring. Notify ML team.'
    elif max_drift_score >= 0.1:
        severity = 'medium'
        recommendation = 'Auto-retrain if enabled. Monitor closely.'
    else:
        severity = 'low'
        recommendation = 'Log for tracking. No immediate action required.'
    
    return DriftAnalysis(
        severity=severity,
        score=max_drift_score,
        drifted_features=drifted_features,
        recommendation=recommendation
    )


def determine_response(analysis: DriftAnalysis) -> Dict:
    """Determine and execute response based on drift analysis."""
    
    response = {
        'analysis': {
            'severity': analysis.severity,
            'score': analysis.score,
            'drifted_features': analysis.drifted_features,
            'recommendation': analysis.recommendation
        },
        'action_taken': None
    }
    
    if analysis.severity == 'critical':
        # Human review required
        notify_human_review(analysis)
        response['action_taken'] = 'human_review_requested'
        
    elif analysis.severity == 'high':
        # Retrain + notify
        trigger_retraining(analysis, require_approval=True)
        notify_team(analysis)
        response['action_taken'] = 'retraining_triggered_with_approval'
        
    elif analysis.severity == 'medium' and AUTO_RETRAIN_ENABLED:
        # Auto-retrain
        trigger_retraining(analysis, require_approval=False)
        response['action_taken'] = 'auto_retraining_triggered'
        
    else:
        # Log only
        log_drift_event(analysis)
        response['action_taken'] = 'logged_only'
    
    return response


def trigger_retraining(analysis: DriftAnalysis, require_approval: bool = False):
    """Trigger SageMaker Pipeline for retraining."""
    
    sagemaker.start_pipeline_execution(
        PipelineName=PIPELINE_ARN.split('/')[-1],
        PipelineExecutionDisplayName=f'drift-triggered-{analysis.severity}',
        PipelineParameters=[
            {'Name': 'TriggerType', 'Value': 'drift_driven'},
            {'Name': 'DriftSeverity', 'Value': analysis.severity},
            {'Name': 'DriftScore', 'Value': str(analysis.score)},
            {'Name': 'DriftedFeatures', 'Value': json.dumps(analysis.drifted_features)},
            {'Name': 'RequireApproval', 'Value': str(require_approval)}
        ]
    )


def notify_human_review(analysis: DriftAnalysis):
    """Send notification requiring human review."""
    
    if SNS_TOPIC_ARN:
        sns.publish(
            TopicArn=SNS_TOPIC_ARN,
            Subject=f'[CRITICAL] Model Drift Requires Review',
            Message=json.dumps({
                'severity': analysis.severity,
                'score': analysis.score,
                'drifted_features': analysis.drifted_features,
                'recommendation': analysis.recommendation,
                'action_required': 'Review drift report and approve/reject retraining'
            }, indent=2)
        )


def notify_team(analysis: DriftAnalysis):
    """Send notification to ML team."""
    # Similar to notify_human_review but less urgent
    pass


def log_drift_event(analysis: DriftAnalysis):
    """Log drift event for tracking."""
    print(json.dumps({
        'event': 'drift_detected',
        'severity': analysis.severity,
        'score': analysis.score,
        'features': analysis.drifted_features
    }))

GCP: Vertex AI Model Monitoring

# vertex_drift_monitor.py

from google.cloud import aiplatform
from google.cloud.aiplatform import model_monitoring
from google.cloud import pubsub_v1
import json

def setup_model_monitoring(
    project: str,
    region: str,
    endpoint_name: str,
    email_recipients: list
):
    """Setup Vertex AI Model Monitoring with drift detection."""
    
    aiplatform.init(project=project, location=region)
    
    # Get endpoint
    endpoint = aiplatform.Endpoint(endpoint_name)
    
    # Define monitoring config
    skew_config = model_monitoring.SkewDetectionConfig(
        data_source="bq://project.dataset.training_table",
        default_skew_threshold=0.3,
        attribute_skew_thresholds={
            "high_risk_feature": 0.1,  # Stricter threshold for critical features
            "medium_risk_feature": 0.2
        }
    )
    
    drift_config = model_monitoring.DriftDetectionConfig(
        default_drift_threshold=0.3,
        attribute_drift_thresholds={
            "high_risk_feature": 0.1
        }
    )
    
    # Alerting config
    email_config = model_monitoring.EmailAlertConfig(
        user_emails=email_recipients
    )
    
    # Create monitoring job
    monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
        display_name="production-model-monitor",
        endpoint=endpoint,
        logging_sampling_strategy=model_monitoring.RandomSampleConfig(
            sample_rate=1.0  # 100% sampling
        ),
        schedule_config=model_monitoring.ScheduleConfig(
            monitor_interval_hours=1
        ),
        skew_detection_config=skew_config,
        drift_detection_config=drift_config,
        alert_config=email_config
    )
    
    return monitoring_job


def create_drift_response_pipeline():
    """Create Pub/Sub triggered Cloud Function for drift response."""
    
    # Cloud Function code
    function_code = '''
import functions_framework
from google.cloud import aiplatform
import json
import base64

@functions_framework.cloud_event
def handle_drift_alert(cloud_event):
    """Handle Model Monitoring drift alerts."""
    
    data = json.loads(base64.b64decode(cloud_event.data["message"]["data"]))
    
    # Parse monitoring alert
    anomaly_type = data.get("anomalyType")
    feature_name = data.get("featureName")
    score = data.get("score", 0)
    
    # Determine response
    if score > 0.5:
        # Critical - human review
        send_to_slack("#ml-alerts", f"🚨 Critical drift: {feature_name} = {score}")
    elif score > 0.3:
        # High - auto retrain with approval
        trigger_pipeline(
            trigger_type="drift",
            require_approval=True,
            drift_info={"feature": feature_name, "score": score}
        )
    elif score > 0.1:
        # Medium - auto retrain
        trigger_pipeline(
            trigger_type="drift",
            require_approval=False,
            drift_info={"feature": feature_name, "score": score}
        )


def trigger_pipeline(trigger_type: str, require_approval: bool, drift_info: dict):
    aiplatform.init(project="my-project", location="us-central1")
    
    job = aiplatform.PipelineJob(
        display_name=f"drift-triggered-{drift_info['feature']}",
        template_path="gs://my-bucket/pipelines/training.json",
        parameter_values={
            "trigger_type": trigger_type,
            "drift_feature": drift_info["feature"],
            "drift_score": drift_info["score"],
            "require_approval": require_approval
        }
    )
    job.submit()
'''
    return function_code

20.3.5. Pattern 4: Hybrid Triggering (Multi-Signal)

Production systems often combine multiple trigger types for robustness.

Multi-Signal Architecture

graph TB
    subgraph "Trigger Sources"
        A[Schedule: Weekly] --> D
        B[Event: New Data] --> D
        C[Drift: Quality Alert] --> D
    end
    
    D[Trigger Orchestrator] --> E{Evaluate Context}
    
    E -->|Recent Training?| F[Debounce]
    E -->|Cost Budget OK?| G[Cost Check]
    E -->|Human Blocked?| H[Override Check]
    
    F --> I[Training Pipeline]
    G --> I
    H --> I

Implementation: Smart Trigger Coordinator

# trigger_coordinator.py

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Dict, List
import boto3
import json


class TriggerType(Enum):
    SCHEDULED = "scheduled"
    EVENT_DRIVEN = "event_driven"
    DRIFT_DRIVEN = "drift_driven"
    MANUAL = "manual"


@dataclass
class TriggerContext:
    trigger_type: TriggerType
    timestamp: datetime
    metadata: Dict
    priority: int  # 1=highest, 5=lowest


@dataclass
class TrainingDecision:
    should_train: bool
    reason: str
    delay_until: Optional[datetime] = None
    parameters: Optional[Dict] = None


class TriggerCoordinator:
    """
    Coordinates multiple trigger sources and makes intelligent training decisions.
    
    Features:
    - Debouncing: Prevents training storms
    - Cost management: Respects budget constraints
    - Priority handling: Critical triggers override normal ones
    - Cooldown periods: Minimum time between trainings
    """
    
    def __init__(
        self,
        min_training_interval_hours: int = 6,
        daily_training_budget: float = 1000.0,
        max_daily_trainings: int = 4
    ):
        self.min_interval = timedelta(hours=min_training_interval_hours)
        self.daily_budget = daily_training_budget
        self.max_daily = max_daily_trainings
        
        self.dynamodb = boto3.resource('dynamodb')
        self.state_table = self.dynamodb.Table('trigger-coordinator-state')
    
    def evaluate(self, trigger: TriggerContext) -> TrainingDecision:
        """Evaluate whether to proceed with training."""
        
        state = self._get_current_state()
        
        # Check 1: Cooldown period
        if state['last_training']:
            last_training = datetime.fromisoformat(state['last_training'])
            if datetime.utcnow() - last_training < self.min_interval:
                remaining = self.min_interval - (datetime.utcnow() - last_training)
                
                # Override for critical drift
                if trigger.trigger_type == TriggerType.DRIFT_DRIVEN and trigger.priority <= 2:
                    pass  # Allow critical drift to bypass cooldown
                else:
                    return TrainingDecision(
                        should_train=False,
                        reason=f"In cooldown period. {remaining.total_seconds()/3600:.1f}h remaining",
                        delay_until=last_training + self.min_interval
                    )
        
        # Check 2: Daily training limit
        today_count = state.get('today_training_count', 0)
        if today_count >= self.max_daily:
            if trigger.priority > 2:  # Non-critical
                return TrainingDecision(
                    should_train=False,
                    reason=f"Daily training limit ({self.max_daily}) reached"
                )
        
        # Check 3: Budget check
        today_spent = state.get('today_budget_spent', 0.0)
        estimated_cost = self._estimate_training_cost(trigger)
        
        if today_spent + estimated_cost > self.daily_budget:
            if trigger.priority > 1:  # Non-urgent
                return TrainingDecision(
                    should_train=False,
                    reason=f"Would exceed daily budget (${today_spent:.2f} + ${estimated_cost:.2f} > ${self.daily_budget:.2f})"
                )
        
        # Check 4: Pending approvals
        if state.get('pending_approval'):
            return TrainingDecision(
                should_train=False,
                reason="Previous training pending approval"
            )
        
        # All checks passed
        return TrainingDecision(
            should_train=True,
            reason=f"Approved: {trigger.trigger_type.value} trigger",
            parameters=self._build_training_parameters(trigger, state)
        )
    
    def _get_current_state(self) -> Dict:
        """Get current coordinator state from DynamoDB."""
        try:
            response = self.state_table.get_item(Key={'pk': 'coordinator_state'})
            return response.get('Item', {})
        except Exception:
            return {}
    
    def _estimate_training_cost(self, trigger: TriggerContext) -> float:
        """Estimate training cost based on trigger context."""
        base_cost = 150.0  # Base GPU cost
        
        # Adjust based on data volume
        data_size_gb = trigger.metadata.get('data_size_gb', 10)
        size_factor = 1 + (data_size_gb / 100)
        
        return base_cost * size_factor
    
    def _build_training_parameters(self, trigger: TriggerContext, state: Dict) -> Dict:
        """Build training parameters based on trigger and state."""
        return {
            'trigger_type': trigger.trigger_type.value,
            'trigger_timestamp': trigger.timestamp.isoformat(),
            'trigger_priority': trigger.priority,
            'training_sequence': state.get('total_trainings', 0) + 1,
            **trigger.metadata
        }
    
    def record_training_started(self, decision: TrainingDecision):
        """Record that training has started."""
        self.state_table.update_item(
            Key={'pk': 'coordinator_state'},
            UpdateExpression='''
                SET last_training = :now,
                    today_training_count = if_not_exists(today_training_count, :zero) + :one,
                    total_trainings = if_not_exists(total_trainings, :zero) + :one
            ''',
            ExpressionAttributeValues={
                ':now': datetime.utcnow().isoformat(),
                ':zero': 0,
                ':one': 1
            }
        )


# Lambda handler
def lambda_handler(event, context):
    """Main entry point for trigger coordination."""
    
    coordinator = TriggerCoordinator(
        min_training_interval_hours=6,
        daily_training_budget=1000.0,
        max_daily_trainings=4
    )
    
    # Parse trigger context from event
    trigger = TriggerContext(
        trigger_type=TriggerType(event.get('trigger_type', 'manual')),
        timestamp=datetime.utcnow(),
        metadata=event.get('metadata', {}),
        priority=event.get('priority', 3)
    )
    
    # Evaluate
    decision = coordinator.evaluate(trigger)
    
    if decision.should_train:
        # Start pipeline
        sagemaker = boto3.client('sagemaker')
        
        sagemaker.start_pipeline_execution(
            PipelineName='training-pipeline',
            PipelineParameters=[
                {'Name': k, 'Value': str(v)}
                for k, v in decision.parameters.items()
            ]
        )
        
        coordinator.record_training_started(decision)
    
    return {
        'should_train': decision.should_train,
        'reason': decision.reason,
        'delay_until': decision.delay_until.isoformat() if decision.delay_until else None
    }

20.3.6. Feedback Loop Prevention

Caution

The Silent Killer: Automated drift-driven retraining can create catastrophic feedback loops where the model accepts gradually degrading data as “normal.”

The Feedback Loop Problem

graph LR
    A[Model Drifts] --> B[Auto-Retrain on Drifted Data]
    B --> C[New Model Accepts Drift]
    C --> D[Drift Metrics Look Normal]
    D --> E[Real Performance Degrades]
    E --> A

Mitigation Strategies

# feedback_loop_prevention.py

from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timedelta
import numpy as np


@dataclass
class SafetyCheck:
    passed: bool
    check_name: str
    details: str


class RetrainingGuardrails:
    """
    Guardrails to prevent feedback loop catastrophe.
    
    Key Principles:
    1. Never retrain on purely production data
    2. Always compare against immutable baseline
    3. Require performance validation before promotion
    4. Implement staged rollouts
    """
    
    def __init__(
        self,
        min_golden_set_performance: float = 0.85,
        max_baseline_drift: float = 0.4,
        require_human_approval_threshold: float = 0.2
    ):
        self.min_golden_performance = min_golden_set_performance
        self.max_baseline_drift = max_baseline_drift
        self.human_approval_threshold = require_human_approval_threshold
    
    def validate_retraining_safety(
        self,
        new_model_metrics: dict,
        baseline_metrics: dict,
        golden_set_results: dict
    ) -> List[SafetyCheck]:
        """Run all safety checks before allowing model promotion."""
        
        checks = []
        
        # Check 1: Golden Set Performance
        golden_accuracy = golden_set_results.get('accuracy', 0)
        checks.append(SafetyCheck(
            passed=golden_accuracy >= self.min_golden_performance,
            check_name="golden_set_performance",
            details=f"Accuracy: {golden_accuracy:.3f} (min: {self.min_golden_performance})"
        ))
        
        # Check 2: Baseline Comparison
        baseline_delta = abs(
            new_model_metrics.get('accuracy', 0) - 
            baseline_metrics.get('accuracy', 0)
        )
        checks.append(SafetyCheck(
            passed=baseline_delta <= self.max_baseline_drift,
            check_name="baseline_drift",
            details=f"Delta from baseline: {baseline_delta:.3f} (max: {self.max_baseline_drift})"
        ))
        
        # Check 3: Prediction Distribution Sanity
        pred_distribution = new_model_metrics.get('prediction_distribution', {})
        baseline_distribution = baseline_metrics.get('prediction_distribution', {})
        
        distribution_shift = self._calculate_distribution_shift(
            pred_distribution, baseline_distribution
        )
        checks.append(SafetyCheck(
            passed=distribution_shift < 0.5,
            check_name="prediction_distribution",
            details=f"Distribution shift: {distribution_shift:.3f}"
        ))
        
        # Check 4: Error Pattern Analysis
        error_patterns = new_model_metrics.get('error_patterns', [])
        checks.append(SafetyCheck(
            passed=not self._detect_systematic_errors(error_patterns),
            check_name="systematic_errors",
            details=f"Checked {len(error_patterns)} error patterns"
        ))
        
        return checks
    
    def _calculate_distribution_shift(
        self, 
        current: dict, 
        baseline: dict
    ) -> float:
        """Calculate KL divergence between distributions."""
        # Simplified implementation
        all_keys = set(current.keys()) | set(baseline.keys())
        
        current_vals = np.array([current.get(k, 0.001) for k in all_keys])
        baseline_vals = np.array([baseline.get(k, 0.001) for k in all_keys])
        
        # Normalize
        current_vals = current_vals / current_vals.sum()
        baseline_vals = baseline_vals / baseline_vals.sum()
        
        # KL Divergence
        return np.sum(current_vals * np.log(current_vals / baseline_vals))
    
    def _detect_systematic_errors(self, error_patterns: List) -> bool:
        """Detect if errors are systematic (potential feedback loop)."""
        if not error_patterns:
            return False
        
        # Check for clustering of errors
        # (simplified - would use more sophisticated analysis in production)
        error_types = [e.get('type') for e in error_patterns]
        type_counts = {}
        for t in error_types:
            type_counts[t] = type_counts.get(t, 0) + 1
        
        max_concentration = max(type_counts.values()) / len(error_types)
        return max_concentration > 0.7  # Too concentrated = systematic


def create_safe_retraining_pipeline():
    """Example SageMaker Pipeline with safety checks."""
    
    from sagemaker.workflow.pipeline import Pipeline
    from sagemaker.workflow.steps import ProcessingStep, TrainingStep, ConditionStep
    from sagemaker.workflow.conditions import ConditionGreaterThan
    
    # Step 1: Train on mixed data (production + baseline holdout)
    # Step 2: Evaluate on golden set (immutable)
    # Step 3: Safety checks
    # Step 4: Conditional promotion
    
    pipeline_definition = """
    Steps:
    1. DataPreparation:
       - Mix production data (70%) with baseline holdout (30%)
       - This prevents pure drift absorption
       
    2. Training:
       - Train new candidate model
       - Log all metrics
       
    3. GoldenSetEvaluation:
       - Evaluate on immutable golden set
       - Golden set is NEVER updated
       
    4. SafetyChecks:
       - Run guardrails validation
       - All checks must pass
       
    5. ShadowDeployment:
       - Deploy to shadow endpoint
       - Run A/B against production (no user impact)
       
    6. HumanApproval (if drift > threshold):
       - Require manual review
       - Present safety check results
       
    7. GradualRollout:
       - 5% -> 25% -> 50% -> 100%
       - Auto-rollback if metrics degrade
    """
    
    return pipeline_definition

20.3.7. Comparison: Choosing Your Trigger Pattern

Trigger TypeProsConsBest ForAWS ServiceGCP Service
ScheduledSimple, PredictableCan be wasteful or too slowStable domainsEventBridgeCloud Scheduler
Event-DrivenReactive, Fresh dataNoisy, Trigger stormsReal-time criticalEventBridge + LambdaPub/Sub + Cloud Functions
Drift-DrivenEfficient, ROI-focusedComplex, Loop riskHigh-scale, Cost-sensitiveModel Monitor + CloudWatchVertex AI Monitoring
HybridRobust, FlexibleComplex orchestrationEnterprise productionStep FunctionsCloud Workflows

Decision Matrix

IF data_arrival_is_predictable AND model_is_stable:
    USE scheduled_trigger
    INTERVAL = business_cycle (daily/weekly/monthly)

ELIF data_is_streaming AND freshness_critical:
    USE event_driven_trigger
    ADD batching_layer (prevent storm)
    ADD deduplication

ELIF cost_is_primary_concern AND have_monitoring:
    USE drift_driven_trigger
    SET conservative_thresholds
    ADD human_approval_for_critical

ELSE:
    USE hybrid_approach
    COMBINE scheduled_baseline + drift_override
    ADD central_coordinator

20.3.8. Observability for Triggers

CloudWatch Dashboard (Terraform)

resource "aws_cloudwatch_dashboard" "trigger_monitoring" {
  dashboard_name = "ml-trigger-monitoring"

  dashboard_body = jsonencode({
    widgets = [
      {
        type   = "metric"
        x      = 0
        y      = 0
        width  = 12
        height = 6
        properties = {
          metrics = [
            ["MLOps", "TriggerEvents", "Type", "scheduled"],
            [".", ".", ".", "event_driven"],
            [".", ".", ".", "drift_driven"]
          ]
          title  = "Trigger Events by Type"
          region = var.aws_region
          stat   = "Sum"
          period = 3600
        }
      },
      {
        type   = "metric"
        x      = 12
        y      = 0
        width  = 12
        height = 6
        properties = {
          metrics = [
            ["MLOps", "TrainingCost", "Result", "completed"],
            [".", ".", ".", "rejected"]
          ]
          title  = "Training Decisions"
          region = var.aws_region
        }
      },
      {
        type   = "metric"
        x      = 0
        y      = 6
        width  = 24
        height = 6
        properties = {
          metrics = [
            ["MLOps", "DriftScore", "Feature", "All"]
          ]
          title  = "Drift Scores Over Time"
          region = var.aws_region
          view   = "timeSeries"
        }
      }
    ]
  })
}

20.3.9. Summary Checklist

For Scheduled Triggers

  • Define appropriate interval based on business cycle
  • Set up alerting for missed executions
  • Monitor for data staleness between runs

For Event-Driven Triggers

  • Implement batching to prevent trigger storms
  • Add deduplication logic
  • Set up dead-letter queues for failed triggers
  • Monitor event processing latency

For Drift-Driven Triggers

  • Establish immutable baseline dataset
  • Define thresholds for each severity level
  • Implement feedback loop prevention
  • Require human approval for critical drift
  • Set up golden set evaluation

For Hybrid Systems

  • Implement central coordinator
  • Define priority system for competing triggers
  • Set up cost budgets and limits
  • Configure cooldown periods
  • Monitor trigger decision rationale

Conclusion

The choice of triggering pattern defines the “liveness” of your AI system.

  1. Start with Scheduled (Cron is King for a reason)
  2. Move to Event-Driven only if latency costs revenue
  3. Move to Drift-Driven only if you have robust automated evaluation and rollout safety nets in place
  4. Consider Hybrid for production-grade systems that need resilience

Ultimately, the goal is to close the loop between the data scientist’s code and the production environment, minimizing the “Time-to-Adapt” for the AI system while maintaining safety and cost efficiency.

[End of Section 20.3]