32.8. Cross-Functional Contracts: The Human API
Tip
Conway’s Law applied to ML: “Organizations which design systems are constrained to produce designs which are copies of the communication structures of these organizations.” If your Data Scientists don’t talk to your SREs, your Model Registry will be a dumpster fire.
While technical contracts (APIs) are rigidly enforced by compilers, social contracts (SLAs) are loosely enforced by managers. This semantic gap is where most MLOps initiatives fail. We need to formalize the relationship between the Creators (Data Science) and the Custodians (Platform Engineering).
32.8.1. The Operating Models: Who Owns What?
There are two primary operating models. You must explicitly choose one.
Model A: “You Build It, You Run It” (Spotify Model)
graph TB
subgraph "Data Science Squad"
A[Build Model] --> B[Package Container]
B --> C[Deploy to K8s]
C --> D[Monitor & On-Call]
end
subgraph "Platform Team"
E[Provide EKS Cluster]
F[Provide Monitoring Stack]
G[Provide CI/CD Templates]
end
E --> C
F --> D
G --> B
- Philosophy: The Data Science Squad owns the model from Jupyter to Production.
- Platform Role: Provides the “Golden Path” (Self-service infrastructure). SREs manage the Kubernetes cluster, not the pods running on it.
- Contract: “Platform guarantees the EKS Control Plane uptime. DS guarantees the Python inference service logic.”
Advantages:
- Faster iteration (no handoffs)
- Clear ownership
- Teams learn Ops skills
Disadvantages:
- Requires skilled DS teams
- Inconsistent standards across squads
- Can lead to reinventing wheels
Model B: “The Handover” (Traditional Enterprise)
graph LR
subgraph "Data Science"
A[Research] --> B[Prototype]
B --> C[Model Artifact]
end
subgraph "ML Engineering"
D[Production Code] --> E[Container]
E --> F[Deploy]
end
subgraph "Platform/SRE"
G[Infrastructure]
H[Monitoring]
I[On-Call]
end
C -->|PRR| D
F --> G
F --> H
H --> I
- Philosophy: DS builds a prototype; ML Engineers rewrite it for Prod.
- Contract: The Production Readiness Review (PRR).
- No model crosses the “Air Gap” from Dev to Prod without passing the PRR Checklist.
Advantages:
- Clear separation of concerns
- Specialists at each stage
- Consistent production quality
Disadvantages:
- Slow handoffs
- Translation errors
- DS frustration (“they changed my model!”)
Hybrid Model: The Best of Both
# ownership_matrix.py - Define clear boundaries
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List
class Responsibility(Enum):
OWNS = "Owns" # Primary responsibility
SUPPORTS = "Supports" # Secondary/consulting
INFORMED = "Informed" # Keep in loop only
@dataclass
class OwnershipMatrix:
"""Define team responsibilities for each phase."""
activities: Dict[str, Dict[str, Responsibility]] = None
def __post_init__(self):
self.activities = {
# Development Phase
"Research & Experimentation": {
"Data Science": Responsibility.OWNS,
"ML Engineering": Responsibility.INFORMED,
"Platform": Responsibility.INFORMED
},
"Feature Engineering": {
"Data Science": Responsibility.OWNS,
"ML Engineering": Responsibility.SUPPORTS,
"Platform": Responsibility.INFORMED
},
"Model Training": {
"Data Science": Responsibility.OWNS,
"ML Engineering": Responsibility.SUPPORTS,
"Platform": Responsibility.INFORMED
},
# Productionization Phase
"Code Optimization": {
"Data Science": Responsibility.SUPPORTS,
"ML Engineering": Responsibility.OWNS,
"Platform": Responsibility.INFORMED
},
"Containerization": {
"Data Science": Responsibility.INFORMED,
"ML Engineering": Responsibility.OWNS,
"Platform": Responsibility.SUPPORTS
},
"CI/CD Pipeline": {
"Data Science": Responsibility.INFORMED,
"ML Engineering": Responsibility.OWNS,
"Platform": Responsibility.SUPPORTS
},
# Operations Phase
"Infrastructure Management": {
"Data Science": Responsibility.INFORMED,
"ML Engineering": Responsibility.INFORMED,
"Platform": Responsibility.OWNS
},
"Model Monitoring": {
"Data Science": Responsibility.OWNS,
"ML Engineering": Responsibility.SUPPORTS,
"Platform": Responsibility.INFORMED
},
"Incident Response (Model)": {
"Data Science": Responsibility.OWNS,
"ML Engineering": Responsibility.SUPPORTS,
"Platform": Responsibility.INFORMED
},
"Incident Response (Infra)": {
"Data Science": Responsibility.INFORMED,
"ML Engineering": Responsibility.INFORMED,
"Platform": Responsibility.OWNS
}
}
def get_owner(self, activity: str) -> str:
"""Get primary owner for an activity."""
if activity not in self.activities:
raise ValueError(f"Unknown activity: {activity}")
for team, resp in self.activities[activity].items():
if resp == Responsibility.OWNS:
return team
return "Undefined"
def generate_raci_matrix(self) -> str:
"""Generate RACI matrix as markdown table."""
header = "| Activity | Data Science | ML Engineering | Platform |"
separator = "|:---------|:-------------|:---------------|:---------|"
rows = [header, separator]
for activity, responsibilities in self.activities.items():
row = f"| {activity} |"
for team in ["Data Science", "ML Engineering", "Platform"]:
resp = responsibilities.get(team, Responsibility.INFORMED)
symbol = {
Responsibility.OWNS: "**A** (Owns)",
Responsibility.SUPPORTS: "C (Consult)",
Responsibility.INFORMED: "I"
}[resp]
row += f" {symbol} |"
rows.append(row)
return "\n".join(rows)
32.8.2. The Production Readiness Review (PRR) Checklist
The PRR is the formal contract for the Handover. It should be a Markdown document in the repo, signed off by both leads.
PRR Template
# Production Readiness Review
## Model: {{ model_name }}
## Version: {{ version }}
## Date: {{ date }}
---
## 1. Observability ✅
### Logging
- [ ] Structured JSON logging implemented
- [ ] Log levels appropriately set (INFO for prod)
- [ ] Request/response payloads logged (with PII redaction)
- [ ] Correlation IDs propagated
### Metrics
- [ ] Latency histogram exposed (p50, p95, p99)
- [ ] Request count exposed
- [ ] Error rate exposed
- [ ] Business metrics exposed (predictions by category)
### Dashboards
- [ ] Grafana dashboard created: [Link]
- [ ] PagerDuty alerts configured: [Link]
- [ ] Runbook created: [Link]
---
## 2. Reproducibility ✅
### Code
- [ ] Training code in version control: [Commit SHA]
- [ ] Inference code in version control: [Commit SHA]
- [ ] Docker image tagged: [Image SHA]
### Data
- [ ] Training data versioned (DVC/lakeFS): [Version]
- [ ] Feature definitions in Feature Store: [Link]
- [ ] Test dataset preserved for validation
### Model
- [ ] Model artifact in registry: [URI]
- [ ] Model card completed: [Link]
- [ ] Hyperparameters documented
---
## 3. Scalability & Performance ✅
### Load Testing
- [ ] Target throughput defined: {{ target_qps }} QPS
- [ ] Load test executed: [Results link]
- [ ] P99 latency under load: {{ p99_latency }}ms (SLA: {{ sla_latency }}ms)
### Resource Configuration
- [ ] Memory request: {{ memory_request }}
- [ ] Memory limit: {{ memory_limit }}
- [ ] CPU request: {{ cpu_request }}
- [ ] GPU requirement: {{ gpu_type }}
### Autoscaling
- [ ] HPA configured: min={{ min_replicas }}, max={{ max_replicas }}
- [ ] Scale-up threshold: {{ cpu_threshold }}% CPU
- [ ] Scale-down stabilization: {{ cooldown }}s
---
## 4. Failure Modes ✅
### Dependency Failures
| Dependency | Failure Behavior | Tested? |
|:-----------|:-----------------|:--------|
| Feature Store | Return cached value | ✅ |
| Model Server | Return default prediction | ✅ |
| Database | Fail open with fallback | ✅ |
### Graceful Degradation
- [ ] Circuit breaker implemented
- [ ] Timeout configured: {{ timeout_ms }}ms
- [ ] Retry policy: {{ retry_count }} attempts
### Rollback
- [ ] Previous version deployable in <5 min
- [ ] Rollback tested: [Date]
---
## 5. Cost Estimate ✅
| Resource | Unit Cost | Monthly Usage | Monthly Cost |
|:---------|:----------|:--------------|:-------------|
| Compute | ${{ cpu_cost }}/hr | {{ cpu_hours }} hrs | ${{ compute_total }} |
| GPU | ${{ gpu_cost }}/hr | {{ gpu_hours }} hrs | ${{ gpu_total }} |
| Storage | ${{ storage_cost }}/GB | {{ storage_gb }} GB | ${{ storage_total }} |
| **Total** | | | **${{ total_cost }}** |
- [ ] Cost below budget: ${{ budget }}
- [ ] CostCenter tag applied: {{ cost_center }}
---
## 6. Security ✅
- [ ] No secrets in code
- [ ] IAM role follows least privilege
- [ ] Input validation implemented
- [ ] Rate limiting configured
---
## Approvals
| Role | Name | Signature | Date |
|:-----|:-----|:----------|:-----|
| Data Science Lead | | | |
| ML Engineering Lead | | | |
| Platform Lead | | | |
| Product Manager | | | |
Automated PRR Enforcement
# prr_validator.py - Automate PRR checks in CI/CD
import yaml
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from pathlib import Path
import subprocess
import json
@dataclass
class PRRCheck:
name: str
passed: bool
details: str
blocking: bool = True
@dataclass
class PRRResult:
model_name: str
version: str
checks: List[PRRCheck] = field(default_factory=list)
@property
def passed(self) -> bool:
return all(c.passed for c in self.checks if c.blocking)
@property
def blocking_failures(self) -> List[PRRCheck]:
return [c for c in self.checks if not c.passed and c.blocking]
class PRRValidator:
"""Automated Production Readiness Review validator."""
def __init__(self, config_path: str):
with open(config_path) as f:
self.config = yaml.safe_load(f)
def validate(
self,
model_path: str,
deployment_config: Dict
) -> PRRResult:
"""Run all PRR checks."""
result = PRRResult(
model_name=deployment_config.get("model_name", "unknown"),
version=deployment_config.get("version", "unknown")
)
# Check 1: Observability
result.checks.append(self._check_logging(model_path))
result.checks.append(self._check_metrics(model_path))
result.checks.append(self._check_dashboard(deployment_config))
# Check 2: Reproducibility
result.checks.append(self._check_versioning(model_path))
result.checks.append(self._check_model_card(model_path))
# Check 3: Performance
result.checks.append(self._check_load_test(deployment_config))
result.checks.append(self._check_resource_limits(deployment_config))
# Check 4: Failure Modes
result.checks.append(self._check_circuit_breaker(model_path))
result.checks.append(self._check_rollback_plan(deployment_config))
# Check 5: Cost
result.checks.append(self._check_cost_estimate(deployment_config))
# Check 6: Security
result.checks.append(self._check_secrets(model_path))
result.checks.append(self._check_iam_policy(deployment_config))
return result
def _check_logging(self, model_path: str) -> PRRCheck:
"""Verify structured logging is implemented."""
# Search for logging patterns in code
import_patterns = [
"import logging",
"import structlog",
"from loguru import logger"
]
code_files = list(Path(model_path).rglob("*.py"))
has_logging = False
for file in code_files:
content = file.read_text()
if any(p in content for p in import_patterns):
has_logging = True
break
return PRRCheck(
name="Structured Logging",
passed=has_logging,
details="Found logging implementation" if has_logging else "No logging found",
blocking=True
)
def _check_metrics(self, model_path: str) -> PRRCheck:
"""Verify metrics are exposed."""
metric_patterns = [
"prometheus_client",
"opentelemetry",
"from datadog import"
]
code_files = list(Path(model_path).rglob("*.py"))
has_metrics = False
for file in code_files:
content = file.read_text()
if any(p in content for p in metric_patterns):
has_metrics = True
break
return PRRCheck(
name="Metrics Exposed",
passed=has_metrics,
details="Found metrics implementation" if has_metrics else "No metrics found",
blocking=True
)
def _check_load_test(self, config: Dict) -> PRRCheck:
"""Verify load test was performed."""
load_test_results = config.get("load_test_results")
if not load_test_results:
return PRRCheck(
name="Load Test",
passed=False,
details="No load test results provided",
blocking=True
)
p99_latency = load_test_results.get("p99_latency_ms", float("inf"))
sla_latency = config.get("sla_latency_ms", 500)
passed = p99_latency <= sla_latency
return PRRCheck(
name="Load Test",
passed=passed,
details=f"P99: {p99_latency}ms (SLA: {sla_latency}ms)",
blocking=True
)
def _check_cost_estimate(self, config: Dict) -> PRRCheck:
"""Verify cost is within budget."""
estimated_cost = config.get("estimated_monthly_cost", 0)
budget = config.get("monthly_budget", float("inf"))
passed = estimated_cost <= budget
return PRRCheck(
name="Cost Estimate",
passed=passed,
details=f"Estimated: ${estimated_cost}/mo (Budget: ${budget}/mo)",
blocking=True
)
def _check_secrets(self, model_path: str) -> PRRCheck:
"""Verify no secrets in code."""
# Run secret detection
try:
result = subprocess.run(
["detect-secrets", "scan", model_path],
capture_output=True,
text=True
)
findings = json.loads(result.stdout)
secrets_found = len(findings.get("results", {})) > 0
except:
secrets_found = False # Tool not available, manual check needed
return PRRCheck(
name="No Secrets in Code",
passed=not secrets_found,
details="No secrets detected" if not secrets_found else "SECRETS FOUND!",
blocking=True
)
# Additional check methods would follow the same pattern...
def generate_report(self, result: PRRResult) -> str:
"""Generate PRR report in markdown."""
status = "✅ PASSED" if result.passed else "❌ FAILED"
report = f"""
# Production Readiness Review Report
## Model: {result.model_name} v{result.version}
## Status: {status}
---
## Check Results
| Check | Status | Details |
|:------|:-------|:--------|
"""
for check in result.checks:
emoji = "✅" if check.passed else ("⚠️" if not check.blocking else "❌")
report += f"| {check.name} | {emoji} | {check.details} |\n"
if result.blocking_failures:
report += "\n## Blocking Issues\n\n"
for failure in result.blocking_failures:
report += f"- **{failure.name}**: {failure.details}\n"
return report
32.8.3. Incident Response Contracts (SLAs)
When the model breaks at 3 AM, whose pager goes off?
The RACI Matrix for MLOps
| Activity | Data Scientist | ML Engineer | Platform Engineer | Product Manager |
|---|---|---|---|---|
| Model Drift > 10% | A (Fix it) | C (Help deploy) | I | C (Impact) |
| Endpoint Latency > 1s | C (Optimize) | A (Scale) | C (Infra) | I |
| Cluster Down | I | I | A (Fix K8s) | I |
| Data Pipeline Failed | C | A | C | I |
| Feature Store Down | I | I | A | C |
| Model Producing Bias | A | C | I | A |
On-Call Policies
# oncall_policy.yaml
policies:
platform_team:
coverage: 24x7
response_time: 15_minutes
responsibilities:
- kubernetes_control_plane
- networking
- iam_and_security
- monitoring_infrastructure
- feature_store_availability
escalation:
- level_1: on_call_engineer
- level_2: platform_lead
- level_3: engineering_director
ml_team:
coverage: business_hours # 9-6 local time
after_hours: best_effort # Unless revenue-critical
response_time: 1_hour
responsibilities:
- model_accuracy
- inference_logic
- data_drift
- prediction_quality
escalation:
- level_1: model_owner
- level_2: ml_lead
- level_3: data_science_director
revenue_critical_models:
# Override for specific models
models:
- fraud_detection
- real_time_bidding
- dynamic_pricing
coverage: 24x7
response_time: 15_minutes
on_call_team: ml_team_critical
Runbook Template
# Incident Runbook: [CRITICAL] P99 Latency High on Fraud Model
## Trigger
- `fraud_model_latency_p99 > 500ms` for 5 minutes
- Alert source: PagerDuty
- Severity: P1
---
## Quick Diagnosis (< 5 minutes)
### Step 1: Check Traffic Volume
**Dashboard**: [Grafana - Fraud Model](link)
Is RPS > 2x normal?
- **YES**: Traffic spike. Check if HPA is scaling. Go to Step 4.
- **NO**: Proceed to Step 2.
### Step 2: Check Dependencies
**Dashboard**: [Dependency Health](link)
| Dependency | Status Check |
|:-----------|:-------------|
| Feature Store | [Tecton Status](link) |
| Database | [RDS CloudWatch](link) |
| Model Artifact S3 | [S3 Status](link) |
- **Any Degraded?**: Escalate to Platform Team. Stop here.
- **All Healthy**: Proceed to Step 3.
### Step 3: Check Model Resources
**Dashboard**: [Pod Resources](link)
| Metric | Healthy | Current |
|:-------|:--------|:--------|
| CPU | <80% | __% |
| Memory | <90% | __% |
| GPU | <95% | __% |
- **Resources Saturated?**: Go to Step 5 (Scale).
- **Resources OK**: Go to Step 6 (Bad Release).
### Step 4: Check Autoscaler
```bash
kubectl get hpa fraud-model -n ml-serving
kubectl describe hpa fraud-model -n ml-serving
- Max Replicas Hit?: Increase max replicas.
kubectl patch hpa fraud-model -n ml-serving --patch '{"spec":{"maxReplicas":100}}'
Step 5: Manual Scale
kubectl scale deployment fraud-model -n ml-serving --replicas=50
Monitor for 2 minutes. If latency drops, incident mitigated.
Step 6: Check Recent Deployments
kubectl rollout history deployment/fraud-model -n ml-serving
Was there a deployment in the last hour?
- YES: Rollback immediately.
kubectl rollout undo deployment/fraud-model -n ml-serving
Mitigation Options
Option A: Enable Degraded Mode
Serve cached predictions from last known good state.
kubectl set env deployment/fraud-model DEGRADED_MODE=true -n ml-serving
Option B: Shed Load
Enable rate limiting if traffic is the issue.
kubectl annotate ingress fraud-model nginx.ingress.kubernetes.io/limit-rps="100" -n ml-serving
Escalation
| After | Escalate To | Contact |
|---|---|---|
| 15 min | ML Engineering Lead | @ml-lead |
| 30 min | Platform Lead | @platform-lead |
| 60 min | Engineering Director | @eng-director |
Post-Incident
- Timeline documented in incident ticket
- Root cause identified
- Action items created
- Post-mortem scheduled (for P1/P2)
---
## 32.8.4. Cost Attribution Contracts (FinOps)
"Who pays for the GPU?" In the cloud, it is easy to burn $100k in a weekend.
### Tagging Strategy
```hcl
# terraform/modules/ml-project/main.tf
locals {
required_tags = {
Environment = var.environment
Project = var.project_name
CostCenter = var.cost_center
Team = var.team_name
ModelName = var.model_name
ManagedBy = "terraform"
CreatedBy = var.created_by
}
}
# Enforce tagging on all resources
resource "aws_sagemaker_endpoint" "model" {
name = var.endpoint_name
endpoint_config_name = aws_sagemaker_endpoint_configuration.config.name
tags = merge(local.required_tags, {
ResourceType = "inference-endpoint"
SLA = var.sla_tier
})
}
# S3 bucket policy to deny untagged writes
data "aws_iam_policy_document" "require_tags" {
statement {
sid = "DenyUntaggedObjects"
effect = "Deny"
principals {
type = "*"
identifiers = ["*"]
}
actions = ["s3:PutObject"]
resources = ["${aws_s3_bucket.models.arn}/*"]
condition {
test = "Null"
variable = "s3:RequestObjectTag/CostCenter"
values = ["true"]
}
}
}
Budget Automation
# finops/budget_enforcer.py
import boto3
from datetime import datetime
from typing import Dict, List
import json
class BudgetEnforcer:
"""Automatically enforce ML cost budgets."""
def __init__(self, account_id: str):
self.account_id = account_id
self.budgets = boto3.client('budgets')
self.sagemaker = boto3.client('sagemaker')
self.sns = boto3.client('sns')
def create_project_budget(
self,
project_id: str,
monthly_limit: float,
alert_emails: List[str],
auto_stop_threshold: float = 0.95 # 95% of budget
):
"""Create budget with alerts and auto-stop."""
self.budgets.create_budget(
AccountId=self.account_id,
Budget={
'BudgetName': f'ML-{project_id}',
'BudgetLimit': {
'Amount': str(monthly_limit),
'Unit': 'USD'
},
'CostFilters': {
'TagKeyValue': [f'user:Project${project_id}']
},
'TimeUnit': 'MONTHLY',
'BudgetType': 'COST'
},
NotificationsWithSubscribers=[
# 50% alert
{
'Notification': {
'NotificationType': 'ACTUAL',
'ComparisonOperator': 'GREATER_THAN',
'Threshold': 50,
'ThresholdType': 'PERCENTAGE'
},
'Subscribers': [
{'SubscriptionType': 'EMAIL', 'Address': email}
for email in alert_emails
]
},
# 80% alert
{
'Notification': {
'NotificationType': 'ACTUAL',
'ComparisonOperator': 'GREATER_THAN',
'Threshold': 80,
'ThresholdType': 'PERCENTAGE'
},
'Subscribers': [
{'SubscriptionType': 'EMAIL', 'Address': email}
for email in alert_emails
]
},
# Auto-stop at 95%
{
'Notification': {
'NotificationType': 'ACTUAL',
'ComparisonOperator': 'GREATER_THAN',
'Threshold': auto_stop_threshold * 100,
'ThresholdType': 'PERCENTAGE'
},
'Subscribers': [
{
'SubscriptionType': 'SNS',
'Address': self._get_auto_stop_topic()
}
]
}
]
)
def _get_auto_stop_topic(self) -> str:
"""Get or create SNS topic for auto-stop."""
# This topic triggers Lambda to stop resources
return f"arn:aws:sns:{self.region}:{self.account_id}:ml-budget-auto-stop"
def stop_project_resources(self, project_id: str):
"""Stop all running resources for a project."""
stopped_resources = []
# Stop training jobs
training_jobs = self.sagemaker.list_training_jobs(
StatusEquals='InProgress'
)['TrainingJobSummaries']
for job in training_jobs:
job_details = self.sagemaker.describe_training_job(
TrainingJobName=job['TrainingJobName']
)
if self._matches_project(job_details, project_id):
self.sagemaker.stop_training_job(
TrainingJobName=job['TrainingJobName']
)
stopped_resources.append(('TrainingJob', job['TrainingJobName']))
# Stop endpoints (expensive!)
endpoints = self.sagemaker.list_endpoints()['Endpoints']
for endpoint in endpoints:
endpoint_tags = self.sagemaker.list_tags(
ResourceArn=endpoint['EndpointArn']
)['Tags']
if any(t['Key'] == 'Project' and t['Value'] == project_id for t in endpoint_tags):
# Don't delete, but scale to 0
self._scale_endpoint_to_zero(endpoint['EndpointName'])
stopped_resources.append(('Endpoint', endpoint['EndpointName']))
return stopped_resources
32.8.5. Versioning Policies and Deprecation
Data Science moves fast. APIs need stability. We need a policy for Deprecation.
The Model API Contract
# api_contract.yaml
model_api:
name: fraud_detection
current_version: v3
supported_versions:
- version: v3
status: current
end_of_life: null
- version: v2
status: deprecated
end_of_life: "2024-06-01"
migration_guide: "docs/v2-to-v3-migration.md"
- version: v1
status: sunset
end_of_life: "2024-01-01"
deprecation_policy:
notice_period_days: 90
support_previous_versions: 2
brownout_testing: true
sla:
availability: 99.9%
latency_p99: 200ms
error_rate: 0.1%
Deprecation Workflow
graph LR
A[T-90: Announce] --> B[T-60: Brownout Test]
B --> C[T-30: Blackout Test]
C --> D[T-0: Delete]
B -.->|Consumer Issues| E[Extend Timeline]
E --> B
32.8.6. Summary Checklist for Human Contracts
| Contract Type | Document | Owner | Review Cadence |
|---|---|---|---|
| Ownership | RACI Matrix | Engineering Manager | Quarterly |
| Production Readiness | PRR Template | ML Engineering Lead | Per-deployment |
| Incident Response | Runbook | On-Call Team | Monthly |
| Cost Attribution | Tagging Policy | FinOps Team | Monthly |
| Deprecation | API Contract | Product Manager | Per-release |
Social contracts prevent burnout and blame culture. Invest in them.
[End of Section 32.8]