33.2. Carbon Efficiency: Green AI
Note
The Hidden Cost: Training a large Transformer model can emit as much carbon as five cars do in their entire lifetimes. As AI scales, “Green AI” moves from nice-to-have to a C-suite ESG requirement.
33.2.1. The Carbon Equation
$$ C_{total} = E \times I $$
| Variable | Definition | Unit | Typical Range |
|---|---|---|---|
| E | Energy Consumed | kWh | 10-10,000+ |
| I | Carbon Intensity | gCO2eq/kWh | 3-800 |
| PUE | Power Usage Effectiveness | Ratio | 1.1-1.5 |
| C | Total Emissions | kg CO2eq | Variable |
Expanded Carbon Formula
$$ C_{total} = E_{compute} \times PUE \times I_{grid} + E_{cooling} + E_{network} $$
MLOps Levers for Carbon Reduction
| Lever | Action | Potential Impact | Effort |
|---|---|---|---|
| Reduce Compute Time | Early stopping, efficient algorithms | -30-50% | Medium |
| Reduce Power Draw | TPUs > GPUs for matrix math | -20-40% | Low |
| Reduce Carbon Intensity | Train in hydro/wind regions | -90% | Low-Medium |
| Improve PUE | Use efficient data centers | -20-30% | Low (vendor choice) |
| Cache & Reuse | Semantic caching for inference | -50-90% | Medium |
| Model Distillation | Smaller models for inference | -70-90% inference | High |
Carbon Budget Framework
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class CarbonTier(Enum):
LOW = "low" # < 10 kg CO2
MEDIUM = "medium" # 10-100 kg
HIGH = "high" # 100-1000 kg
CRITICAL = "critical" # > 1000 kg
@dataclass
class CarbonBudget:
"""Carbon budget for ML operations."""
project_name: str
annual_budget_kg: float
training_allocation: float = 0.7 # 70% for training
inference_allocation: float = 0.3 # 30% for inference
def training_budget(self) -> float:
return self.annual_budget_kg * self.training_allocation
def inference_budget(self) -> float:
return self.annual_budget_kg * self.inference_allocation
def check_training_run(
self,
estimated_kg: float,
current_usage_kg: float
) -> dict:
"""Check if training run fits in budget."""
remaining = self.training_budget() - current_usage_kg
fits = estimated_kg <= remaining
return {
"approved": fits,
"remaining_budget_kg": remaining,
"estimated_kg": estimated_kg,
"utilization_pct": (current_usage_kg / self.training_budget()) * 100
}
def classify_run(estimated_kg: float) -> CarbonTier:
"""Classify training run by carbon impact."""
if estimated_kg < 10:
return CarbonTier.LOW
elif estimated_kg < 100:
return CarbonTier.MEDIUM
elif estimated_kg < 1000:
return CarbonTier.HIGH
else:
return CarbonTier.CRITICAL
# Example usage
budget = CarbonBudget("recommendation-system", annual_budget_kg=500)
check = budget.check_training_run(estimated_kg=50, current_usage_kg=200)
# {'approved': True, 'remaining_budget_kg': 150, ...}
33.2.2. Tooling: CodeCarbon
CodeCarbon is the standard for tracking ML carbon emissions:
from codecarbon import EmissionsTracker, OfflineEmissionsTracker
import mlflow
from typing import Optional
from dataclasses import dataclass
import json
@dataclass
class EmissionsReport:
emissions_kg: float
energy_kwh: float
duration_seconds: float
region: str
cpu_power: float
gpu_power: float
carbon_intensity: float
class GreenTrainer:
"""Training with carbon tracking and reporting."""
def __init__(
self,
project_name: str,
offline_mode: bool = False,
country_iso_code: str = "USA"
):
self.project_name = project_name
if offline_mode:
self.tracker = OfflineEmissionsTracker(
project_name=project_name,
country_iso_code=country_iso_code,
measure_power_secs=15,
save_to_file=True,
log_level="warning"
)
else:
self.tracker = EmissionsTracker(
project_name=project_name,
measure_power_secs=15,
save_to_file=True,
log_level="warning"
)
self.emissions_data: Optional[EmissionsReport] = None
def train(self, train_fn, *args, **kwargs):
"""Wrap training function with carbon tracking."""
self.tracker.start()
try:
result = train_fn(*args, **kwargs)
finally:
emissions = self.tracker.stop()
self._capture_data(emissions)
return result
def _capture_data(self, emissions: float) -> None:
"""Capture emissions data for reporting."""
data = self.tracker.final_emissions_data
self.emissions_data = EmissionsReport(
emissions_kg=emissions,
energy_kwh=data.energy_consumed if data else 0,
duration_seconds=data.duration if data else 0,
region=data.region if data else "unknown",
cpu_power=data.cpu_power if data else 0,
gpu_power=data.gpu_power if data else 0,
carbon_intensity=data.emissions_rate if data else 0
)
def log_to_mlflow(self) -> None:
"""Log emissions to MLflow."""
if not self.emissions_data:
return
mlflow.log_metric("carbon_emissions_kg", self.emissions_data.emissions_kg)
mlflow.log_metric("energy_consumed_kwh", self.emissions_data.energy_kwh)
mlflow.log_metric("training_duration_s", self.emissions_data.duration_seconds)
mlflow.log_metric("carbon_intensity_g_kwh", self.emissions_data.carbon_intensity)
mlflow.set_tag("training_region", self.emissions_data.region)
mlflow.set_tag("green_ai_tracked", "true")
def get_report(self) -> dict:
"""Get emissions report."""
if not self.emissions_data:
return {}
return {
"emissions_kg_co2": round(self.emissions_data.emissions_kg, 4),
"energy_kwh": round(self.emissions_data.energy_kwh, 2),
"duration_hours": round(self.emissions_data.duration_seconds / 3600, 2),
"region": self.emissions_data.region,
"efficiency_kg_per_hour": round(
self.emissions_data.emissions_kg /
(self.emissions_data.duration_seconds / 3600),
4
) if self.emissions_data.duration_seconds > 0 else 0,
"equivalent_car_km": round(self.emissions_data.emissions_kg / 0.12, 1)
}
# Usage
green = GreenTrainer("my-model")
def train_model(model, data):
for epoch in range(100):
model.train(data)
return model
trained = green.train(train_model, model, data)
print(green.get_report())
# {'emissions_kg_co2': 2.5, 'energy_kwh': 15.3, 'equivalent_car_km': 20.8}
CI/CD Integration
# .github/workflows/training.yaml
name: Model Training
on:
push:
paths:
- 'training/**'
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install codecarbon mlflow torch
- name: Run training with carbon tracking
env:
CODECARBON_LOG_LEVEL: warning
run: |
python train.py --track-carbon
- name: Upload emissions report
uses: actions/upload-artifact@v4
with:
name: emissions-report
path: emissions.csv
- name: Comment carbon usage on PR
if: github.event_name == 'pull_request'
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const report = JSON.parse(fs.readFileSync('emissions_report.json'));
const body = `## 🌱 Carbon Emissions Report
| Metric | Value |
|--------|-------|
| CO2 Emissions | ${report.emissions_kg_co2} kg |
| Energy Used | ${report.energy_kwh} kWh |
| Duration | ${report.duration_hours} hours |
| Equivalent | ${report.equivalent_car_km} km driving |
`;
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: body
});
33.2.3. Chase the Sun: Region Selection
Carbon intensity varies 100x between regions:
| Region | Cloud | Grid Mix | gCO2/kWh | Recommendation |
|---|---|---|---|---|
| Montreal | AWS ca-central-1 | Hydro | ~3 | ✅ Best choice |
| Quebec | GCP northamerica-northeast1 | Hydro | ~3 | ✅ Best choice |
| Stockholm | AWS eu-north-1 | Hydro/Wind | ~15 | ✅ Excellent |
| Oregon | AWS us-west-2 | Hydro/Wind | ~50 | ✅ Good |
| Iowa | GCP us-central1 | Wind | ~200 | ⚠️ Variable |
| Finland | GCP europe-north1 | Hydro/Nuclear | ~80 | ✅ Good |
| Virginia | AWS us-east-1 | Coal/Gas | ~400 | ❌ Avoid for large training |
| Singapore | All | Gas | ~450 | ❌ Avoid for large training |
Real-Time Carbon-Aware Scheduling
import requests
from typing import List, Optional, Dict
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
@dataclass
class RegionCarbon:
region: str
carbon_intensity: float # gCO2/kWh
renewable_percentage: float
timestamp: str
forecast_available: bool
class CarbonAwareScheduler:
"""Schedule training in lowest-carbon region."""
# Static carbon intensities (fallback)
STATIC_INTENSITIES = {
"us-east-1": 400,
"us-west-2": 50,
"ca-central-1": 3,
"eu-north-1": 15,
"eu-west-1": 300,
"ap-northeast-1": 500,
"us-central1": 200, # GCP
"europe-north1": 80,
"northamerica-northeast1": 3
}
CARBON_AWARE_API = "https://api.carbonaware.org"
def __init__(self, candidate_regions: List[str], use_api: bool = True):
self.regions = candidate_regions
self.use_api = use_api
def get_current_intensity(self, region: str) -> RegionCarbon:
"""Get current carbon intensity for region."""
if self.use_api:
try:
return self._fetch_from_api(region)
except Exception:
pass
# Fallback to static
return RegionCarbon(
region=region,
carbon_intensity=self.STATIC_INTENSITIES.get(region, 500),
renewable_percentage=0,
timestamp=datetime.utcnow().isoformat(),
forecast_available=False
)
def _fetch_from_api(self, region: str) -> RegionCarbon:
"""Fetch real-time data from Carbon Aware SDK API."""
resp = requests.get(
f"{self.CARBON_AWARE_API}/emissions/bylocation",
params={"location": region},
timeout=5
)
resp.raise_for_status()
data = resp.json()
return RegionCarbon(
region=region,
carbon_intensity=data.get("rating", 500),
renewable_percentage=data.get("renewablePercentage", 0),
timestamp=data.get("time", ""),
forecast_available=True
)
def get_greenest_region(self) -> str:
"""Select region with lowest carbon intensity."""
intensities = {}
for region in self.regions:
carbon = self.get_current_intensity(region)
intensities[region] = carbon.carbon_intensity
return min(intensities, key=intensities.get)
def get_optimal_window(
self,
region: str,
duration_hours: int = 4,
look_ahead_hours: int = 24
) -> Optional[datetime]:
"""Find optimal time window for lowest carbon."""
try:
resp = requests.get(
f"{self.CARBON_AWARE_API}/emissions/forecasts",
params={
"location": region,
"dataStartAt": datetime.utcnow().isoformat(),
"dataEndAt": (datetime.utcnow() + timedelta(hours=look_ahead_hours)).isoformat(),
"windowSize": duration_hours
},
timeout=10
)
resp.raise_for_status()
forecasts = resp.json()
# Find window with lowest average intensity
best_window = min(forecasts, key=lambda x: x["rating"])
return datetime.fromisoformat(best_window["timestamp"])
except Exception:
return None
def schedule_training(
self,
estimated_duration_hours: float,
flexible_window_hours: int = 24
) -> dict:
"""Get optimal region and timing for training."""
# Get current best region
best_region = self.get_greenest_region()
current_intensity = self.get_current_intensity(best_region)
# Check if we can delay for better window
optimal_time = self.get_optimal_window(
best_region,
int(estimated_duration_hours),
flexible_window_hours
)
return {
"recommended_region": best_region,
"current_carbon_intensity": current_intensity.carbon_intensity,
"optimal_start_time": optimal_time.isoformat() if optimal_time else "now",
"all_regions": {
r: self.get_current_intensity(r).carbon_intensity
for r in self.regions
}
}
# Usage
scheduler = CarbonAwareScheduler([
"us-east-1", "us-west-2", "ca-central-1", "eu-north-1"
])
schedule = scheduler.schedule_training(
estimated_duration_hours=4,
flexible_window_hours=12
)
# {'recommended_region': 'ca-central-1', 'current_carbon_intensity': 3, ...}
Terraform: Multi-Region Training
# carbon_aware_training.tf
variable "training_regions" {
type = map(object({
priority = number
carbon_intensity = number # gCO2/kWh
gpu_available = bool
}))
default = {
"ca-central-1" = { priority = 1, carbon_intensity = 3, gpu_available = true }
"eu-north-1" = { priority = 2, carbon_intensity = 15, gpu_available = true }
"us-west-2" = { priority = 3, carbon_intensity = 50, gpu_available = true }
"us-east-1" = { priority = 4, carbon_intensity = 400, gpu_available = true }
}
}
# Create training resources in green region first
resource "aws_sagemaker_training_job" "green_training" {
for_each = {
for k, v in var.training_regions : k => v
if v.priority == 1 && v.gpu_available
}
training_job_name = "green-training-${each.key}-${formatdate("YYYYMMDDhhmmss", timestamp())}"
role_arn = aws_iam_role.sagemaker.arn
algorithm_specification {
training_image = var.training_image
training_input_mode = "File"
}
resource_config {
instance_type = "ml.p4d.24xlarge"
instance_count = 1
volume_size_in_gb = 100
}
# Force training in green region
vpc_config {
subnets = [aws_subnet.training[each.key].id]
security_group_ids = [aws_security_group.training.id]
}
tags = {
carbon_intensity = each.value.carbon_intensity
green_ai = "true"
region = each.key
}
}
# CloudWatch alarm for carbon budget
resource "aws_cloudwatch_metric_alarm" "carbon_budget" {
alarm_name = "carbon-budget-exceeded"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "carbon_emissions_kg"
namespace = "GreenAI"
period = 86400 # Daily
statistic = "Sum"
threshold = var.daily_carbon_budget_kg
alarm_actions = [aws_sns_topic.alerts.arn]
tags = {
Environment = var.environment
}
}
33.2.4. Model Distillation for Sustainability
Distillation creates smaller, more efficient models:
| Stage | Carbon Cost | Frequency | Cumulative |
|---|---|---|---|
| Train Teacher (175B) | 500 kg CO2 | Once | 500 kg |
| Distill Student (7B) | 100 kg CO2 | Once | 600 kg |
| Serve Student | 0.0001 kg/inference | Millions/day | Varies |
Carbon ROI Calculation
from dataclasses import dataclass
from typing import Optional
@dataclass
class DistillationROI:
"""Calculate carbon ROI of distillation."""
teacher_inference_carbon: float # kg CO2 per inference
student_inference_carbon: float # kg CO2 per inference
distillation_carbon: float # kg CO2 total for distillation
daily_inferences: int
def savings_per_inference(self) -> float:
return self.teacher_inference_carbon - self.student_inference_carbon
def breakeven_inferences(self) -> int:
if self.savings_per_inference() <= 0:
return float('inf')
return int(self.distillation_carbon / self.savings_per_inference())
def breakeven_days(self) -> float:
return self.breakeven_inferences() / self.daily_inferences
def yearly_savings_kg(self) -> float:
yearly_inferences = self.daily_inferences * 365
gross_savings = self.savings_per_inference() * yearly_inferences
return gross_savings - self.distillation_carbon
def roi_multiple(self) -> float:
if self.distillation_carbon <= 0:
return float('inf')
return self.yearly_savings_kg() / self.distillation_carbon + 1
def report(self) -> dict:
return {
"breakeven_inferences": self.breakeven_inferences(),
"breakeven_days": round(self.breakeven_days(), 1),
"yearly_savings_kg_co2": round(self.yearly_savings_kg(), 2),
"roi_multiple": round(self.roi_multiple(), 2),
"equivalent_trees_year": round(self.yearly_savings_kg() / 21, 1) # Tree absorbs ~21kg/year
}
# Example: GPT-4 to GPT-3.5 equivalent distillation
roi = DistillationROI(
teacher_inference_carbon=0.001, # GPT-4 level: 1g per inference
student_inference_carbon=0.0001, # GPT-3.5 level: 0.1g per inference
distillation_carbon=100, # 100kg to distill
daily_inferences=1_000_000 # 1M inferences/day
)
print(roi.report())
# {
# 'breakeven_inferences': 111111,
# 'breakeven_days': 0.1,
# 'yearly_savings_kg_co2': 32750,
# 'roi_multiple': 328.5,
# 'equivalent_trees_year': 1559.5
# }
Distillation Pipeline with Carbon Tracking
from codecarbon import EmissionsTracker
import torch
import torch.nn.functional as F
class CarbonAwareDistiller:
"""Distillation with carbon tracking."""
def __init__(
self,
teacher_model,
student_model,
temperature: float = 3.0,
alpha: float = 0.7
):
self.teacher = teacher_model
self.student = student_model
self.temperature = temperature
self.alpha = alpha
self.tracker = EmissionsTracker(project_name="distillation")
def distillation_loss(
self,
student_logits: torch.Tensor,
teacher_logits: torch.Tensor,
labels: torch.Tensor
) -> torch.Tensor:
"""Compute distillation loss."""
# Soft targets
soft_teacher = F.softmax(teacher_logits / self.temperature, dim=-1)
soft_student = F.log_softmax(student_logits / self.temperature, dim=-1)
distill_loss = F.kl_div(
soft_student,
soft_teacher,
reduction='batchmean'
) * (self.temperature ** 2)
# Hard targets
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * distill_loss + (1 - self.alpha) * hard_loss
def distill(
self,
train_loader,
optimizer,
epochs: int = 10,
device: str = "cuda"
) -> dict:
"""Run distillation with carbon tracking."""
self.teacher.eval()
self.student.train()
self.teacher.to(device)
self.student.to(device)
self.tracker.start()
for epoch in range(epochs):
total_loss = 0
for batch in train_loader:
inputs, labels = batch
inputs, labels = inputs.to(device), labels.to(device)
# Get teacher predictions (no grad)
with torch.no_grad():
teacher_logits = self.teacher(inputs)
# Get student predictions
student_logits = self.student(inputs)
# Compute loss
loss = self.distillation_loss(student_logits, teacher_logits, labels)
# Backward
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}: Loss = {total_loss:.4f}")
emissions = self.tracker.stop()
return {
"student_model": self.student,
"distillation_carbon_kg": emissions,
"epochs": epochs
}
def compare_efficiency(self, test_input: torch.Tensor) -> dict:
"""Compare teacher vs student efficiency."""
import time
device = "cuda" if torch.cuda.is_available() else "cpu"
self.teacher.to(device)
self.student.to(device)
test_input = test_input.to(device)
# Warmup
for _ in range(10):
_ = self.student(test_input)
# Measure teacher
torch.cuda.synchronize() if device == "cuda" else None
t0 = time.perf_counter()
for _ in range(100):
with torch.no_grad():
_ = self.teacher(test_input)
torch.cuda.synchronize() if device == "cuda" else None
teacher_time = (time.perf_counter() - t0) / 100
# Measure student
torch.cuda.synchronize() if device == "cuda" else None
t0 = time.perf_counter()
for _ in range(100):
with torch.no_grad():
_ = self.student(test_input)
torch.cuda.synchronize() if device == "cuda" else None
student_time = (time.perf_counter() - t0) / 100
return {
"teacher_latency_ms": teacher_time * 1000,
"student_latency_ms": student_time * 1000,
"speedup": teacher_time / student_time,
"estimated_energy_reduction": 1 - (student_time / teacher_time)
}
33.2.5. Training vs Inference Carbon
| Component | One-Time | Ongoing/Year | Focus |
|---|---|---|---|
| Train Llama-2 70B | 500 tons CO2 | - | 1% of lifetime |
| Serve 100M users/day | - | 5000 tons CO2 | 99% of lifetime |
Implication: 80% of green AI efforts should focus on inference optimization.
Inference Carbon Estimator
from dataclasses import dataclass
from typing import Dict
@dataclass
class InferenceConfig:
model_size_b: float # Parameters in billions
batch_size: int
avg_tokens_per_request: int
gpu_type: str
precision: str # "fp32", "fp16", "int8", "int4"
class InferenceCarbonEstimator:
"""Estimate carbon for inference workloads."""
# Approximate GPU power by type (Watts)
GPU_POWER = {
"A100_80GB": 400,
"A100_40GB": 350,
"H100": 700,
"A10G": 150,
"T4": 70,
"L4": 72,
"V100": 300,
"RTX4090": 450
}
# Throughput multipliers by precision
PRECISION_MULTIPLIERS = {
"fp32": 1.0,
"fp16": 2.0,
"int8": 4.0,
"int4": 8.0
}
def __init__(self, carbon_intensity: float = 400):
"""
Args:
carbon_intensity: gCO2/kWh of electricity
"""
self.carbon_intensity = carbon_intensity
def estimate_per_request(self, config: InferenceConfig) -> dict:
"""Estimate carbon per inference request."""
gpu_power = self.GPU_POWER.get(config.gpu_type, 300)
precision_mult = self.PRECISION_MULTIPLIERS.get(config.precision, 1.0)
# Estimate latency based on model size and precision
# Rough formula: latency ∝ model_size / (memory_bandwidth * batch_efficiency)
base_latency_ms = (config.model_size_b * 2.0) / (1.0 * config.batch_size)
adjusted_latency_ms = base_latency_ms / precision_mult
# Energy per request (Joules)
energy_joules = gpu_power * (adjusted_latency_ms / 1000)
energy_kwh = energy_joules / 3600000
# Carbon per request
carbon_g = energy_kwh * self.carbon_intensity
return {
"latency_ms": round(adjusted_latency_ms, 2),
"energy_joules": round(energy_joules, 4),
"carbon_grams": round(carbon_g, 6),
"carbon_per_1m_requests_kg": round(carbon_g * 1_000_000 / 1000, 2)
}
def compare_configs(self, configs: Dict[str, InferenceConfig]) -> dict:
"""Compare carbon across configurations."""
results = {}
for name, config in configs.items():
results[name] = self.estimate_per_request(config)
# Find most efficient
best = min(results.items(), key=lambda x: x[1]["carbon_grams"])
return {
"configs": results,
"most_efficient": best[0],
"savings_vs_baseline": {
name: round(1 - (r["carbon_grams"] / list(results.values())[0]["carbon_grams"]), 2)
for name, r in results.items()
}
}
# Compare configurations
estimator = InferenceCarbonEstimator(carbon_intensity=400)
configs = {
"baseline_fp16": InferenceConfig(
model_size_b=7, batch_size=1, avg_tokens_per_request=100,
gpu_type="A100_80GB", precision="fp16"
),
"quantized_int8": InferenceConfig(
model_size_b=7, batch_size=1, avg_tokens_per_request=100,
gpu_type="A100_80GB", precision="int8"
),
"quantized_int4": InferenceConfig(
model_size_b=7, batch_size=1, avg_tokens_per_request=100,
gpu_type="A100_80GB", precision="int4"
),
"smaller_gpu_int8": InferenceConfig(
model_size_b=7, batch_size=1, avg_tokens_per_request=100,
gpu_type="T4", precision="int8"
)
}
comparison = estimator.compare_configs(configs)
print(comparison)
Quantization Impact
| Precision | Memory | Latency | Energy | Quality Impact |
|---|---|---|---|---|
| FP32 | 100% | 100% | 100% | Baseline |
| FP16 | 50% | 60% | 60% | Negligible |
| INT8 | 25% | 40% | 40% | <1% degradation |
| INT4 | 12.5% | 30% | 30% | 1-3% degradation |
33.2.6. Caching for Green AI
Every cache hit = one GPU inference saved:
import redis
import hashlib
import json
from typing import Optional, Dict, Any
from dataclasses import dataclass
from prometheus_client import Counter, Gauge
# Metrics
CACHE_HITS = Counter("green_cache_hits_total", "Cache hits", ["model"])
CACHE_MISSES = Counter("green_cache_misses_total", "Cache misses", ["model"])
CARBON_SAVED = Counter("green_carbon_saved_grams", "CO2 saved by caching", ["model"])
CACHE_HIT_RATE = Gauge("green_cache_hit_rate", "Cache hit rate", ["model"])
@dataclass
class CacheStats:
hits: int
misses: int
carbon_saved_g: float
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0
class GreenInferenceCache:
"""Semantic caching with carbon tracking."""
def __init__(
self,
model,
model_name: str,
carbon_per_inference_g: float = 0.1,
ttl_seconds: int = 86400,
redis_url: str = "redis://localhost:6379"
):
self.model = model
self.model_name = model_name
self.carbon_per_inference = carbon_per_inference_g
self.ttl = ttl_seconds
self.cache = redis.from_url(redis_url)
self.stats = CacheStats(hits=0, misses=0, carbon_saved_g=0)
def _hash_input(self, input_text: str) -> str:
"""Create deterministic hash of input."""
return hashlib.sha256(input_text.encode()).hexdigest()
def predict(self, input_text: str, **kwargs) -> dict:
"""Predict with caching."""
cache_key = f"{self.model_name}:{self._hash_input(input_text)}"
# Check cache
cached = self.cache.get(cache_key)
if cached:
self.stats.hits += 1
self.stats.carbon_saved_g += self.carbon_per_inference
CACHE_HITS.labels(model=self.model_name).inc()
CARBON_SAVED.labels(model=self.model_name).inc(self.carbon_per_inference)
return json.loads(cached)
# Cache miss - run inference
self.stats.misses += 1
CACHE_MISSES.labels(model=self.model_name).inc()
result = self.model.predict(input_text, **kwargs)
# Cache result
self.cache.setex(cache_key, self.ttl, json.dumps(result))
# Update hit rate gauge
CACHE_HIT_RATE.labels(model=self.model_name).set(self.stats.hit_rate)
return result
def get_green_metrics(self) -> dict:
"""Get sustainability metrics."""
return {
"cache_hits": self.stats.hits,
"cache_misses": self.stats.misses,
"hit_rate": round(self.stats.hit_rate, 4),
"carbon_saved_g": round(self.stats.carbon_saved_g, 2),
"carbon_saved_kg": round(self.stats.carbon_saved_g / 1000, 4),
"equivalent_car_km": round(self.stats.carbon_saved_g / 120, 2),
"inferences_avoided": self.stats.hits
}
def estimate_monthly_savings(self, daily_requests: int) -> dict:
"""Project monthly carbon savings."""
estimated_hit_rate = self.stats.hit_rate if self.stats.hit_rate > 0 else 0.3
monthly_requests = daily_requests * 30
hits = int(monthly_requests * estimated_hit_rate)
carbon_saved = hits * self.carbon_per_inference / 1000 # kg
return {
"projected_monthly_requests": monthly_requests,
"projected_cache_hits": hits,
"projected_carbon_saved_kg": round(carbon_saved, 2),
"projected_cost_saved_usd": round(hits * 0.001, 2) # Rough GPU cost
}
class SemanticCache(GreenInferenceCache):
"""Cache with semantic similarity matching."""
def __init__(
self,
model,
model_name: str,
embedding_model,
similarity_threshold: float = 0.95,
**kwargs
):
super().__init__(model, model_name, **kwargs)
self.embedder = embedding_model
self.threshold = similarity_threshold
self.embedding_cache: Dict[str, Any] = {}
def _find_similar_cached(self, input_text: str) -> Optional[str]:
"""Find semantically similar cached input."""
input_embedding = self.embedder.encode(input_text)
for cached_input, cached_embedding in self.embedding_cache.items():
similarity = self._cosine_similarity(input_embedding, cached_embedding)
if similarity >= self.threshold:
return cached_input
return None
def _cosine_similarity(self, a, b) -> float:
import numpy as np
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def predict(self, input_text: str, **kwargs) -> dict:
"""Predict with semantic similarity caching."""
# Check for semantically similar cached input
similar_input = self._find_similar_cached(input_text)
if similar_input:
cache_key = f"{self.model_name}:{self._hash_input(similar_input)}"
cached = self.cache.get(cache_key)
if cached:
self.stats.hits += 1
self.stats.carbon_saved_g += self.carbon_per_inference
return json.loads(cached)
# Cache miss - run inference
self.stats.misses += 1
result = self.model.predict(input_text, **kwargs)
# Cache with embedding
cache_key = f"{self.model_name}:{self._hash_input(input_text)}"
self.cache.setex(cache_key, self.ttl, json.dumps(result))
self.embedding_cache[input_text] = self.embedder.encode(input_text)
return result
33.2.7. Hardware Efficiency
| Hardware | Use Case | Perf/Watt | Recommendation |
|---|---|---|---|
| NVIDIA A100 | Training + inference | Baseline | General purpose |
| NVIDIA H100 | Large training | 1.2x | Fastest training |
| Google TPU v4 | Matrix ops | 1.5x | TensorFlow/JAX workloads |
| Google TPU v5e | Efficient inference | 2x | Cost-optimized inference |
| AWS Inferentia2 | Inference only | 3x | High-volume inference |
| AWS Trainium | Training | 1.5x | AWS training workloads |
| Apple M-series | Edge inference | 4x | On-device ML |
| Intel Gaudi2 | Training | 1.3x | Alternative to NVIDIA |
Hardware Selection Tool
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
class WorkloadType(Enum):
TRAINING = "training"
INFERENCE = "inference"
BOTH = "both"
@dataclass
class HardwareOption:
name: str
provider: str
power_watts: int
cost_per_hour: float
workload_type: WorkloadType
perf_per_watt: float # Relative to A100 baseline
availability: str # "on_demand", "reserved", "spot"
class GreenHardwareSelector:
"""Select optimal hardware for carbon efficiency."""
HARDWARE_OPTIONS = [
HardwareOption("A100_80GB", "AWS/GCP", 400, 32.77, WorkloadType.BOTH, 1.0, "on_demand"),
HardwareOption("H100_80GB", "AWS/GCP", 700, 65.0, WorkloadType.TRAINING, 1.2, "on_demand"),
HardwareOption("TPU_v4", "GCP", 275, 12.88, WorkloadType.TRAINING, 1.5, "on_demand"),
HardwareOption("TPU_v5e", "GCP", 200, 8.0, WorkloadType.INFERENCE, 2.0, "on_demand"),
HardwareOption("Inferentia2", "AWS", 120, 1.92, WorkloadType.INFERENCE, 3.0, "on_demand"),
HardwareOption("Trainium", "AWS", 300, 22.0, WorkloadType.TRAINING, 1.5, "on_demand"),
HardwareOption("L4", "GCP", 72, 1.78, WorkloadType.INFERENCE, 1.8, "on_demand"),
HardwareOption("T4", "AWS/GCP", 70, 0.53, WorkloadType.INFERENCE, 1.2, "spot"),
]
def select_for_workload(
self,
workload: WorkloadType,
budget_per_hour: float,
carbon_priority: float = 0.5 # 0=cost only, 1=carbon only
) -> List[HardwareOption]:
"""Select hardware optimizing for carbon and cost."""
# Filter by workload type
candidates = [
h for h in self.HARDWARE_OPTIONS
if h.workload_type in [workload, WorkloadType.BOTH]
]
# Filter by budget
candidates = [h for h in candidates if h.cost_per_hour <= budget_per_hour]
if not candidates:
return []
# Score by combined metric
def score(h: HardwareOption) -> float:
carbon_score = h.perf_per_watt / h.power_watts # Higher is better
cost_score = 1 / h.cost_per_hour # Lower cost is better
return carbon_priority * carbon_score + (1 - carbon_priority) * cost_score
candidates.sort(key=score, reverse=True)
return candidates
def recommend(
self,
workload: WorkloadType,
estimated_hours: float,
max_budget: float
) -> dict:
"""Get hardware recommendation with projections."""
hourly_budget = max_budget / estimated_hours
options = self.select_for_workload(workload, hourly_budget)
if not options:
return {"error": "No hardware fits budget"}
best = options[0]
# Calculate projections
total_cost = best.cost_per_hour * estimated_hours
total_energy_kwh = (best.power_watts / 1000) * estimated_hours
return {
"recommended_hardware": best.name,
"provider": best.provider,
"projected_cost": round(total_cost, 2),
"projected_energy_kwh": round(total_energy_kwh, 2),
"perf_per_watt_rating": best.perf_per_watt,
"alternatives": [
{"name": h.name, "cost": round(h.cost_per_hour * estimated_hours, 2)}
for h in options[1:3]
]
}
# Usage
selector = GreenHardwareSelector()
recommendation = selector.recommend(
workload=WorkloadType.INFERENCE,
estimated_hours=720, # 1 month
max_budget=2000
)
# {'recommended_hardware': 'Inferentia2', 'projected_cost': 1382.4, ...}
33.2.8. GPU Utilization Monitoring
If GPU utilization is 30%, you waste 70% of energy:
import subprocess
import time
from typing import List, Dict
from dataclasses import dataclass
from statistics import mean, stdev
from prometheus_client import Gauge
GPU_UTILIZATION = Gauge("gpu_utilization_percent", "GPU utilization", ["gpu_id"])
GPU_POWER = Gauge("gpu_power_watts", "GPU power draw", ["gpu_id"])
GPU_MEMORY = Gauge("gpu_memory_used_percent", "GPU memory usage", ["gpu_id"])
@dataclass
class GPUStats:
gpu_id: int
utilization: float
memory_used: float
memory_total: float
power_draw: float
temperature: float
class GPUMonitor:
"""Monitor GPU efficiency for carbon optimization."""
UTILIZATION_TARGET = 80 # Target utilization %
def __init__(self, sample_interval: float = 1.0):
self.sample_interval = sample_interval
self.history: List[Dict[int, GPUStats]] = []
def sample(self) -> Dict[int, GPUStats]:
"""Sample current GPU stats."""
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=index,utilization.gpu,memory.used,memory.total,power.draw,temperature.gpu",
"--format=csv,noheader,nounits"
],
capture_output=True,
text=True
)
stats = {}
for line in result.stdout.strip().split("\n"):
parts = [p.strip() for p in line.split(",")]
if len(parts) >= 6:
gpu_id = int(parts[0])
stats[gpu_id] = GPUStats(
gpu_id=gpu_id,
utilization=float(parts[1]),
memory_used=float(parts[2]),
memory_total=float(parts[3]),
power_draw=float(parts[4]),
temperature=float(parts[5])
)
# Update Prometheus metrics
for gpu_id, s in stats.items():
GPU_UTILIZATION.labels(gpu_id=str(gpu_id)).set(s.utilization)
GPU_POWER.labels(gpu_id=str(gpu_id)).set(s.power_draw)
GPU_MEMORY.labels(gpu_id=str(gpu_id)).set(
100 * s.memory_used / s.memory_total
)
return stats
def monitor(self, duration_seconds: int = 60) -> dict:
"""Monitor GPUs for specified duration."""
end_time = time.time() + duration_seconds
samples = []
while time.time() < end_time:
samples.append(self.sample())
time.sleep(self.sample_interval)
return self._analyze(samples)
def _analyze(self, samples: List[Dict[int, GPUStats]]) -> dict:
"""Analyze collected samples."""
if not samples:
return {}
gpu_ids = samples[0].keys()
analysis = {}
for gpu_id in gpu_ids:
utilizations = [s[gpu_id].utilization for s in samples if gpu_id in s]
powers = [s[gpu_id].power_draw for s in samples if gpu_id in s]
avg_util = mean(utilizations)
avg_power = mean(powers)
# Calculate wasted energy
waste_ratio = max(0, (self.UTILIZATION_TARGET - avg_util) / self.UTILIZATION_TARGET)
analysis[gpu_id] = {
"avg_utilization": round(avg_util, 1),
"std_utilization": round(stdev(utilizations), 1) if len(utilizations) > 1 else 0,
"avg_power_watts": round(avg_power, 1),
"waste_ratio": round(waste_ratio, 2),
"status": "optimal" if avg_util >= self.UTILIZATION_TARGET else "underutilized"
}
return {
"gpus": analysis,
"recommendations": self._get_recommendations(analysis)
}
def _get_recommendations(self, analysis: Dict) -> List[str]:
"""Generate optimization recommendations."""
recommendations = []
for gpu_id, stats in analysis.items():
if stats["avg_utilization"] < 50:
recommendations.append(
f"GPU {gpu_id}: Very low utilization ({stats['avg_utilization']}%). "
f"Consider increasing batch size or using smaller GPU."
)
elif stats["avg_utilization"] < self.UTILIZATION_TARGET:
recommendations.append(
f"GPU {gpu_id}: Utilization {stats['avg_utilization']}% below target. "
f"Suggestions: increase batch size, add DataLoader workers, use WebDataset."
)
return recommendations
# Usage
monitor = GPUMonitor()
results = monitor.monitor(duration_seconds=60)
print(results)
# {'gpus': {0: {'avg_utilization': 72.3, 'status': 'underutilized', ...}},
# 'recommendations': ['GPU 0: Utilization 72.3% below target...']}
33.2.9. SCI Score (Software Carbon Intensity)
The Green Software Foundation’s standard metric:
$$ SCI = ((E \times I) + M) / R $$
| Variable | Meaning | Unit |
|---|---|---|
| E | Energy consumed | kWh |
| I | Carbon intensity of grid | gCO2/kWh |
| M | Embodied carbon (hardware manufacturing) | gCO2 |
| R | Functional unit | Requests, users, etc. |
from dataclasses import dataclass
@dataclass
class SCICalculator:
"""Calculate Software Carbon Intensity score."""
# Embodied carbon estimates (gCO2)
EMBODIED_CARBON = {
"A100": 150_000, # ~150kg CO2 to manufacture
"H100": 200_000,
"TPU_v4": 100_000,
"T4": 50_000,
"CPU_server": 200_000
}
# Hardware lifetime assumptions (hours)
HARDWARE_LIFETIME = {
"A100": 35_000, # ~4 years
"H100": 35_000,
"TPU_v4": 35_000,
"T4": 35_000,
"CPU_server": 52_500 # ~6 years
}
def calculate(
self,
energy_kwh: float,
carbon_intensity: float,
functional_units: int,
hardware_type: str,
usage_hours: float
) -> dict:
"""Calculate SCI score.
Args:
energy_kwh: Energy consumed in kWh
carbon_intensity: Grid carbon intensity (gCO2/kWh)
functional_units: Number of functional units (requests, users)
hardware_type: Type of hardware used
usage_hours: Hours of hardware usage
Returns:
SCI breakdown and score
"""
# Operational carbon
operational_carbon = energy_kwh * carbon_intensity
# Embodied carbon allocation
total_embodied = self.EMBODIED_CARBON.get(hardware_type, 100_000)
lifetime = self.HARDWARE_LIFETIME.get(hardware_type, 35_000)
# Amortize embodied carbon over lifetime
embodied_allocation = (usage_hours / lifetime) * total_embodied
# Total carbon
total_carbon = operational_carbon + embodied_allocation
# SCI score
sci = total_carbon / functional_units if functional_units > 0 else 0
return {
"sci_score": round(sci, 4),
"sci_unit": "gCO2eq per request",
"breakdown": {
"operational_carbon_g": round(operational_carbon, 2),
"embodied_carbon_g": round(embodied_allocation, 2),
"total_carbon_g": round(total_carbon, 2)
},
"functional_units": functional_units,
"interpretation": self._interpret_score(sci)
}
def _interpret_score(self, sci: float) -> str:
"""Interpret SCI score."""
if sci < 0.1:
return "Excellent - Very efficient"
elif sci < 1.0:
return "Good - Room for improvement"
elif sci < 10.0:
return "Moderate - Consider optimization"
else:
return "Poor - Significant optimization needed"
def compare_scenarios(
self,
scenarios: dict # {name: {energy_kwh, carbon_intensity, requests, hardware, hours}}
) -> dict:
"""Compare SCI across scenarios."""
results = {}
for name, params in scenarios.items():
results[name] = self.calculate(
energy_kwh=params["energy_kwh"],
carbon_intensity=params["carbon_intensity"],
functional_units=params["requests"],
hardware_type=params["hardware"],
usage_hours=params["hours"]
)
# Rank by SCI
ranked = sorted(results.items(), key=lambda x: x[1]["sci_score"])
return {
"scenarios": results,
"best_scenario": ranked[0][0],
"worst_scenario": ranked[-1][0]
}
# Usage
calc = SCICalculator()
# Compare different deployment options
scenarios = {
"us_east_a100": {
"energy_kwh": 100,
"carbon_intensity": 400,
"requests": 1_000_000,
"hardware": "A100",
"hours": 24
},
"canada_a100": {
"energy_kwh": 100,
"carbon_intensity": 3,
"requests": 1_000_000,
"hardware": "A100",
"hours": 24
},
"us_east_t4": {
"energy_kwh": 20,
"carbon_intensity": 400,
"requests": 1_000_000,
"hardware": "T4",
"hours": 24
}
}
comparison = calc.compare_scenarios(scenarios)
print(f"Best option: {comparison['best_scenario']}")
# Best option: canada_a100
33.2.10. Serverless vs Serverful Carbon
| Workload | Best Choice | Reason |
|---|---|---|
| Bursty/Low traffic | Serverless | Scale to zero = 0 idle energy |
| Constant high traffic | Serverful | Better utilization, no cold starts |
| Internal tools | Serverless | Often idle |
| Customer-facing critical | Serverful | Consistent performance |
| Development/testing | Serverless | Intermittent usage |
| Batch processing | Spot/Pre-emptible | Flexible timing |
33.2.11. Summary Checklist
| Step | Action | Impact | Effort |
|---|---|---|---|
| 1 | Add CodeCarbon to training pipelines | Visibility | Low |
| 2 | Select low-carbon regions for batch jobs | -80-95% | Low |
| 3 | Implement model distillation | -70-90% inference | High |
| 4 | Quantize to INT8 for inference | -60% | Medium |
| 5 | Cache frequent predictions | -50-90% | Medium |
| 6 | Monitor GPU utilization | Visibility | Low |
| 7 | Use efficient hardware (TPUs/Inferentia) | -40-60% | Medium |
| 8 | Calculate and track SCI score | Reporting | Low |
| 9 | Set carbon budgets for teams | Governance | Medium |
| 10 | Report carbon in model cards | Transparency | Low |
Quick Wins Ranking
| Action | Carbon Reduction | Implementation Time |
|---|---|---|
| Train in Quebec/Stockholm | 90%+ | 1 day |
| Add caching layer | 50-90% | 1 week |
| Quantize models | 60% | 2-3 days |
| Increase batch size | 20-40% | 1 hour |
| Use spot instances | Same carbon, less cost | 1 day |
| Switch to TPUs (if TF/JAX) | 40% | 1 week |
[End of Section 33.2]