Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

33.2. Carbon Efficiency: Green AI

Note

The Hidden Cost: Training a large Transformer model can emit as much carbon as five cars do in their entire lifetimes. As AI scales, “Green AI” moves from nice-to-have to a C-suite ESG requirement.


33.2.1. The Carbon Equation

$$ C_{total} = E \times I $$

VariableDefinitionUnitTypical Range
EEnergy ConsumedkWh10-10,000+
ICarbon IntensitygCO2eq/kWh3-800
PUEPower Usage EffectivenessRatio1.1-1.5
CTotal Emissionskg CO2eqVariable

Expanded Carbon Formula

$$ C_{total} = E_{compute} \times PUE \times I_{grid} + E_{cooling} + E_{network} $$

MLOps Levers for Carbon Reduction

LeverActionPotential ImpactEffort
Reduce Compute TimeEarly stopping, efficient algorithms-30-50%Medium
Reduce Power DrawTPUs > GPUs for matrix math-20-40%Low
Reduce Carbon IntensityTrain in hydro/wind regions-90%Low-Medium
Improve PUEUse efficient data centers-20-30%Low (vendor choice)
Cache & ReuseSemantic caching for inference-50-90%Medium
Model DistillationSmaller models for inference-70-90% inferenceHigh

Carbon Budget Framework

from dataclasses import dataclass
from typing import Optional
from enum import Enum

class CarbonTier(Enum):
    LOW = "low"      # < 10 kg CO2
    MEDIUM = "medium"  # 10-100 kg
    HIGH = "high"     # 100-1000 kg
    CRITICAL = "critical"  # > 1000 kg

@dataclass
class CarbonBudget:
    """Carbon budget for ML operations."""
    
    project_name: str
    annual_budget_kg: float
    training_allocation: float = 0.7  # 70% for training
    inference_allocation: float = 0.3  # 30% for inference
    
    def training_budget(self) -> float:
        return self.annual_budget_kg * self.training_allocation
    
    def inference_budget(self) -> float:
        return self.annual_budget_kg * self.inference_allocation
    
    def check_training_run(
        self, 
        estimated_kg: float,
        current_usage_kg: float
    ) -> dict:
        """Check if training run fits in budget."""
        remaining = self.training_budget() - current_usage_kg
        fits = estimated_kg <= remaining
        
        return {
            "approved": fits,
            "remaining_budget_kg": remaining,
            "estimated_kg": estimated_kg,
            "utilization_pct": (current_usage_kg / self.training_budget()) * 100
        }


def classify_run(estimated_kg: float) -> CarbonTier:
    """Classify training run by carbon impact."""
    if estimated_kg < 10:
        return CarbonTier.LOW
    elif estimated_kg < 100:
        return CarbonTier.MEDIUM
    elif estimated_kg < 1000:
        return CarbonTier.HIGH
    else:
        return CarbonTier.CRITICAL


# Example usage
budget = CarbonBudget("recommendation-system", annual_budget_kg=500)
check = budget.check_training_run(estimated_kg=50, current_usage_kg=200)
# {'approved': True, 'remaining_budget_kg': 150, ...}

33.2.2. Tooling: CodeCarbon

CodeCarbon is the standard for tracking ML carbon emissions:

from codecarbon import EmissionsTracker, OfflineEmissionsTracker
import mlflow
from typing import Optional
from dataclasses import dataclass
import json

@dataclass
class EmissionsReport:
    emissions_kg: float
    energy_kwh: float
    duration_seconds: float
    region: str
    cpu_power: float
    gpu_power: float
    carbon_intensity: float

class GreenTrainer:
    """Training with carbon tracking and reporting."""
    
    def __init__(
        self, 
        project_name: str,
        offline_mode: bool = False,
        country_iso_code: str = "USA"
    ):
        self.project_name = project_name
        
        if offline_mode:
            self.tracker = OfflineEmissionsTracker(
                project_name=project_name,
                country_iso_code=country_iso_code,
                measure_power_secs=15,
                save_to_file=True,
                log_level="warning"
            )
        else:
            self.tracker = EmissionsTracker(
                project_name=project_name,
                measure_power_secs=15,
                save_to_file=True,
                log_level="warning"
            )
        
        self.emissions_data: Optional[EmissionsReport] = None
    
    def train(self, train_fn, *args, **kwargs):
        """Wrap training function with carbon tracking."""
        self.tracker.start()
        
        try:
            result = train_fn(*args, **kwargs)
        finally:
            emissions = self.tracker.stop()
            self._capture_data(emissions)
        
        return result
    
    def _capture_data(self, emissions: float) -> None:
        """Capture emissions data for reporting."""
        data = self.tracker.final_emissions_data
        
        self.emissions_data = EmissionsReport(
            emissions_kg=emissions,
            energy_kwh=data.energy_consumed if data else 0,
            duration_seconds=data.duration if data else 0,
            region=data.region if data else "unknown",
            cpu_power=data.cpu_power if data else 0,
            gpu_power=data.gpu_power if data else 0,
            carbon_intensity=data.emissions_rate if data else 0
        )
    
    def log_to_mlflow(self) -> None:
        """Log emissions to MLflow."""
        if not self.emissions_data:
            return
        
        mlflow.log_metric("carbon_emissions_kg", self.emissions_data.emissions_kg)
        mlflow.log_metric("energy_consumed_kwh", self.emissions_data.energy_kwh)
        mlflow.log_metric("training_duration_s", self.emissions_data.duration_seconds)
        mlflow.log_metric("carbon_intensity_g_kwh", self.emissions_data.carbon_intensity)
        
        mlflow.set_tag("training_region", self.emissions_data.region)
        mlflow.set_tag("green_ai_tracked", "true")
    
    def get_report(self) -> dict:
        """Get emissions report."""
        if not self.emissions_data:
            return {}
        
        return {
            "emissions_kg_co2": round(self.emissions_data.emissions_kg, 4),
            "energy_kwh": round(self.emissions_data.energy_kwh, 2),
            "duration_hours": round(self.emissions_data.duration_seconds / 3600, 2),
            "region": self.emissions_data.region,
            "efficiency_kg_per_hour": round(
                self.emissions_data.emissions_kg / 
                (self.emissions_data.duration_seconds / 3600), 
                4
            ) if self.emissions_data.duration_seconds > 0 else 0,
            "equivalent_car_km": round(self.emissions_data.emissions_kg / 0.12, 1)
        }


# Usage
green = GreenTrainer("my-model")

def train_model(model, data):
    for epoch in range(100):
        model.train(data)
    return model

trained = green.train(train_model, model, data)

print(green.get_report())
# {'emissions_kg_co2': 2.5, 'energy_kwh': 15.3, 'equivalent_car_km': 20.8}

CI/CD Integration

# .github/workflows/training.yaml
name: Model Training

on:
  push:
    paths:
      - 'training/**'

jobs:
  train:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install codecarbon mlflow torch
      
      - name: Run training with carbon tracking
        env:
          CODECARBON_LOG_LEVEL: warning
        run: |
          python train.py --track-carbon
      
      - name: Upload emissions report
        uses: actions/upload-artifact@v4
        with:
          name: emissions-report
          path: emissions.csv
      
      - name: Comment carbon usage on PR
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const report = JSON.parse(fs.readFileSync('emissions_report.json'));
            
            const body = `## 🌱 Carbon Emissions Report
            
            | Metric | Value |
            |--------|-------|
            | CO2 Emissions | ${report.emissions_kg_co2} kg |
            | Energy Used | ${report.energy_kwh} kWh |
            | Duration | ${report.duration_hours} hours |
            | Equivalent | ${report.equivalent_car_km} km driving |
            `;
            
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: body
            });

33.2.3. Chase the Sun: Region Selection

Carbon intensity varies 100x between regions:

RegionCloudGrid MixgCO2/kWhRecommendation
MontrealAWS ca-central-1Hydro~3✅ Best choice
QuebecGCP northamerica-northeast1Hydro~3✅ Best choice
StockholmAWS eu-north-1Hydro/Wind~15✅ Excellent
OregonAWS us-west-2Hydro/Wind~50✅ Good
IowaGCP us-central1Wind~200⚠️ Variable
FinlandGCP europe-north1Hydro/Nuclear~80✅ Good
VirginiaAWS us-east-1Coal/Gas~400❌ Avoid for large training
SingaporeAllGas~450❌ Avoid for large training

Real-Time Carbon-Aware Scheduling

import requests
from typing import List, Optional, Dict
from dataclasses import dataclass
from datetime import datetime, timedelta
import json

@dataclass
class RegionCarbon:
    region: str
    carbon_intensity: float  # gCO2/kWh
    renewable_percentage: float
    timestamp: str
    forecast_available: bool

class CarbonAwareScheduler:
    """Schedule training in lowest-carbon region."""
    
    # Static carbon intensities (fallback)
    STATIC_INTENSITIES = {
        "us-east-1": 400,
        "us-west-2": 50,
        "ca-central-1": 3,
        "eu-north-1": 15,
        "eu-west-1": 300,
        "ap-northeast-1": 500,
        "us-central1": 200,  # GCP
        "europe-north1": 80,
        "northamerica-northeast1": 3
    }
    
    CARBON_AWARE_API = "https://api.carbonaware.org"
    
    def __init__(self, candidate_regions: List[str], use_api: bool = True):
        self.regions = candidate_regions
        self.use_api = use_api
    
    def get_current_intensity(self, region: str) -> RegionCarbon:
        """Get current carbon intensity for region."""
        
        if self.use_api:
            try:
                return self._fetch_from_api(region)
            except Exception:
                pass
        
        # Fallback to static
        return RegionCarbon(
            region=region,
            carbon_intensity=self.STATIC_INTENSITIES.get(region, 500),
            renewable_percentage=0,
            timestamp=datetime.utcnow().isoformat(),
            forecast_available=False
        )
    
    def _fetch_from_api(self, region: str) -> RegionCarbon:
        """Fetch real-time data from Carbon Aware SDK API."""
        resp = requests.get(
            f"{self.CARBON_AWARE_API}/emissions/bylocation",
            params={"location": region},
            timeout=5
        )
        resp.raise_for_status()
        data = resp.json()
        
        return RegionCarbon(
            region=region,
            carbon_intensity=data.get("rating", 500),
            renewable_percentage=data.get("renewablePercentage", 0),
            timestamp=data.get("time", ""),
            forecast_available=True
        )
    
    def get_greenest_region(self) -> str:
        """Select region with lowest carbon intensity."""
        intensities = {}
        
        for region in self.regions:
            carbon = self.get_current_intensity(region)
            intensities[region] = carbon.carbon_intensity
        
        return min(intensities, key=intensities.get)
    
    def get_optimal_window(
        self, 
        region: str, 
        duration_hours: int = 4,
        look_ahead_hours: int = 24
    ) -> Optional[datetime]:
        """Find optimal time window for lowest carbon."""
        
        try:
            resp = requests.get(
                f"{self.CARBON_AWARE_API}/emissions/forecasts",
                params={
                    "location": region,
                    "dataStartAt": datetime.utcnow().isoformat(),
                    "dataEndAt": (datetime.utcnow() + timedelta(hours=look_ahead_hours)).isoformat(),
                    "windowSize": duration_hours
                },
                timeout=10
            )
            resp.raise_for_status()
            
            forecasts = resp.json()
            
            # Find window with lowest average intensity
            best_window = min(forecasts, key=lambda x: x["rating"])
            
            return datetime.fromisoformat(best_window["timestamp"])
        
        except Exception:
            return None
    
    def schedule_training(
        self,
        estimated_duration_hours: float,
        flexible_window_hours: int = 24
    ) -> dict:
        """Get optimal region and timing for training."""
        
        # Get current best region
        best_region = self.get_greenest_region()
        current_intensity = self.get_current_intensity(best_region)
        
        # Check if we can delay for better window
        optimal_time = self.get_optimal_window(
            best_region, 
            int(estimated_duration_hours),
            flexible_window_hours
        )
        
        return {
            "recommended_region": best_region,
            "current_carbon_intensity": current_intensity.carbon_intensity,
            "optimal_start_time": optimal_time.isoformat() if optimal_time else "now",
            "all_regions": {
                r: self.get_current_intensity(r).carbon_intensity 
                for r in self.regions
            }
        }


# Usage
scheduler = CarbonAwareScheduler([
    "us-east-1", "us-west-2", "ca-central-1", "eu-north-1"
])

schedule = scheduler.schedule_training(
    estimated_duration_hours=4,
    flexible_window_hours=12
)
# {'recommended_region': 'ca-central-1', 'current_carbon_intensity': 3, ...}

Terraform: Multi-Region Training

# carbon_aware_training.tf

variable "training_regions" {
  type = map(object({
    priority         = number
    carbon_intensity = number  # gCO2/kWh
    gpu_available    = bool
  }))
  
  default = {
    "ca-central-1" = { priority = 1, carbon_intensity = 3, gpu_available = true }
    "eu-north-1"   = { priority = 2, carbon_intensity = 15, gpu_available = true }
    "us-west-2"    = { priority = 3, carbon_intensity = 50, gpu_available = true }
    "us-east-1"    = { priority = 4, carbon_intensity = 400, gpu_available = true }
  }
}

# Create training resources in green region first
resource "aws_sagemaker_training_job" "green_training" {
  for_each = {
    for k, v in var.training_regions : k => v
    if v.priority == 1 && v.gpu_available
  }
  
  training_job_name = "green-training-${each.key}-${formatdate("YYYYMMDDhhmmss", timestamp())}"
  role_arn          = aws_iam_role.sagemaker.arn
  
  algorithm_specification {
    training_image = var.training_image
    training_input_mode = "File"
  }
  
  resource_config {
    instance_type   = "ml.p4d.24xlarge"
    instance_count  = 1
    volume_size_in_gb = 100
  }
  
  # Force training in green region
  vpc_config {
    subnets          = [aws_subnet.training[each.key].id]
    security_group_ids = [aws_security_group.training.id]
  }
  
  tags = {
    carbon_intensity = each.value.carbon_intensity
    green_ai         = "true"
    region           = each.key
  }
}

# CloudWatch alarm for carbon budget
resource "aws_cloudwatch_metric_alarm" "carbon_budget" {
  alarm_name          = "carbon-budget-exceeded"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "carbon_emissions_kg"
  namespace           = "GreenAI"
  period              = 86400  # Daily
  statistic           = "Sum"
  threshold           = var.daily_carbon_budget_kg
  
  alarm_actions = [aws_sns_topic.alerts.arn]
  
  tags = {
    Environment = var.environment
  }
}

33.2.4. Model Distillation for Sustainability

Distillation creates smaller, more efficient models:

StageCarbon CostFrequencyCumulative
Train Teacher (175B)500 kg CO2Once500 kg
Distill Student (7B)100 kg CO2Once600 kg
Serve Student0.0001 kg/inferenceMillions/dayVaries

Carbon ROI Calculation

from dataclasses import dataclass
from typing import Optional

@dataclass
class DistillationROI:
    """Calculate carbon ROI of distillation."""
    
    teacher_inference_carbon: float  # kg CO2 per inference
    student_inference_carbon: float  # kg CO2 per inference
    distillation_carbon: float       # kg CO2 total for distillation
    daily_inferences: int
    
    def savings_per_inference(self) -> float:
        return self.teacher_inference_carbon - self.student_inference_carbon
    
    def breakeven_inferences(self) -> int:
        if self.savings_per_inference() <= 0:
            return float('inf')
        return int(self.distillation_carbon / self.savings_per_inference())
    
    def breakeven_days(self) -> float:
        return self.breakeven_inferences() / self.daily_inferences
    
    def yearly_savings_kg(self) -> float:
        yearly_inferences = self.daily_inferences * 365
        gross_savings = self.savings_per_inference() * yearly_inferences
        return gross_savings - self.distillation_carbon
    
    def roi_multiple(self) -> float:
        if self.distillation_carbon <= 0:
            return float('inf')
        return self.yearly_savings_kg() / self.distillation_carbon + 1
    
    def report(self) -> dict:
        return {
            "breakeven_inferences": self.breakeven_inferences(),
            "breakeven_days": round(self.breakeven_days(), 1),
            "yearly_savings_kg_co2": round(self.yearly_savings_kg(), 2),
            "roi_multiple": round(self.roi_multiple(), 2),
            "equivalent_trees_year": round(self.yearly_savings_kg() / 21, 1)  # Tree absorbs ~21kg/year
        }


# Example: GPT-4 to GPT-3.5 equivalent distillation
roi = DistillationROI(
    teacher_inference_carbon=0.001,   # GPT-4 level: 1g per inference
    student_inference_carbon=0.0001,  # GPT-3.5 level: 0.1g per inference
    distillation_carbon=100,          # 100kg to distill
    daily_inferences=1_000_000        # 1M inferences/day
)

print(roi.report())
# {
#     'breakeven_inferences': 111111,
#     'breakeven_days': 0.1,
#     'yearly_savings_kg_co2': 32750,
#     'roi_multiple': 328.5,
#     'equivalent_trees_year': 1559.5
# }

Distillation Pipeline with Carbon Tracking

from codecarbon import EmissionsTracker
import torch
import torch.nn.functional as F

class CarbonAwareDistiller:
    """Distillation with carbon tracking."""
    
    def __init__(
        self,
        teacher_model,
        student_model,
        temperature: float = 3.0,
        alpha: float = 0.7
    ):
        self.teacher = teacher_model
        self.student = student_model
        self.temperature = temperature
        self.alpha = alpha
        self.tracker = EmissionsTracker(project_name="distillation")
    
    def distillation_loss(
        self,
        student_logits: torch.Tensor,
        teacher_logits: torch.Tensor,
        labels: torch.Tensor
    ) -> torch.Tensor:
        """Compute distillation loss."""
        # Soft targets
        soft_teacher = F.softmax(teacher_logits / self.temperature, dim=-1)
        soft_student = F.log_softmax(student_logits / self.temperature, dim=-1)
        
        distill_loss = F.kl_div(
            soft_student, 
            soft_teacher, 
            reduction='batchmean'
        ) * (self.temperature ** 2)
        
        # Hard targets
        hard_loss = F.cross_entropy(student_logits, labels)
        
        return self.alpha * distill_loss + (1 - self.alpha) * hard_loss
    
    def distill(
        self,
        train_loader,
        optimizer,
        epochs: int = 10,
        device: str = "cuda"
    ) -> dict:
        """Run distillation with carbon tracking."""
        
        self.teacher.eval()
        self.student.train()
        self.teacher.to(device)
        self.student.to(device)
        
        self.tracker.start()
        
        for epoch in range(epochs):
            total_loss = 0
            
            for batch in train_loader:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)
                
                # Get teacher predictions (no grad)
                with torch.no_grad():
                    teacher_logits = self.teacher(inputs)
                
                # Get student predictions
                student_logits = self.student(inputs)
                
                # Compute loss
                loss = self.distillation_loss(student_logits, teacher_logits, labels)
                
                # Backward
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            
            print(f"Epoch {epoch+1}: Loss = {total_loss:.4f}")
        
        emissions = self.tracker.stop()
        
        return {
            "student_model": self.student,
            "distillation_carbon_kg": emissions,
            "epochs": epochs
        }
    
    def compare_efficiency(self, test_input: torch.Tensor) -> dict:
        """Compare teacher vs student efficiency."""
        import time
        
        device = "cuda" if torch.cuda.is_available() else "cpu"
        self.teacher.to(device)
        self.student.to(device)
        test_input = test_input.to(device)
        
        # Warmup
        for _ in range(10):
            _ = self.student(test_input)
        
        # Measure teacher
        torch.cuda.synchronize() if device == "cuda" else None
        t0 = time.perf_counter()
        for _ in range(100):
            with torch.no_grad():
                _ = self.teacher(test_input)
        torch.cuda.synchronize() if device == "cuda" else None
        teacher_time = (time.perf_counter() - t0) / 100
        
        # Measure student
        torch.cuda.synchronize() if device == "cuda" else None
        t0 = time.perf_counter()
        for _ in range(100):
            with torch.no_grad():
                _ = self.student(test_input)
        torch.cuda.synchronize() if device == "cuda" else None
        student_time = (time.perf_counter() - t0) / 100
        
        return {
            "teacher_latency_ms": teacher_time * 1000,
            "student_latency_ms": student_time * 1000,
            "speedup": teacher_time / student_time,
            "estimated_energy_reduction": 1 - (student_time / teacher_time)
        }

33.2.5. Training vs Inference Carbon

ComponentOne-TimeOngoing/YearFocus
Train Llama-2 70B500 tons CO2-1% of lifetime
Serve 100M users/day-5000 tons CO299% of lifetime

Implication: 80% of green AI efforts should focus on inference optimization.

Inference Carbon Estimator

from dataclasses import dataclass
from typing import Dict

@dataclass
class InferenceConfig:
    model_size_b: float  # Parameters in billions
    batch_size: int
    avg_tokens_per_request: int
    gpu_type: str
    precision: str  # "fp32", "fp16", "int8", "int4"

class InferenceCarbonEstimator:
    """Estimate carbon for inference workloads."""
    
    # Approximate GPU power by type (Watts)
    GPU_POWER = {
        "A100_80GB": 400,
        "A100_40GB": 350,
        "H100": 700,
        "A10G": 150,
        "T4": 70,
        "L4": 72,
        "V100": 300,
        "RTX4090": 450
    }
    
    # Throughput multipliers by precision
    PRECISION_MULTIPLIERS = {
        "fp32": 1.0,
        "fp16": 2.0,
        "int8": 4.0,
        "int4": 8.0
    }
    
    def __init__(self, carbon_intensity: float = 400):
        """
        Args:
            carbon_intensity: gCO2/kWh of electricity
        """
        self.carbon_intensity = carbon_intensity
    
    def estimate_per_request(self, config: InferenceConfig) -> dict:
        """Estimate carbon per inference request."""
        
        gpu_power = self.GPU_POWER.get(config.gpu_type, 300)
        precision_mult = self.PRECISION_MULTIPLIERS.get(config.precision, 1.0)
        
        # Estimate latency based on model size and precision
        # Rough formula: latency ∝ model_size / (memory_bandwidth * batch_efficiency)
        base_latency_ms = (config.model_size_b * 2.0) / (1.0 * config.batch_size)
        adjusted_latency_ms = base_latency_ms / precision_mult
        
        # Energy per request (Joules)
        energy_joules = gpu_power * (adjusted_latency_ms / 1000)
        energy_kwh = energy_joules / 3600000
        
        # Carbon per request
        carbon_g = energy_kwh * self.carbon_intensity
        
        return {
            "latency_ms": round(adjusted_latency_ms, 2),
            "energy_joules": round(energy_joules, 4),
            "carbon_grams": round(carbon_g, 6),
            "carbon_per_1m_requests_kg": round(carbon_g * 1_000_000 / 1000, 2)
        }
    
    def compare_configs(self, configs: Dict[str, InferenceConfig]) -> dict:
        """Compare carbon across configurations."""
        results = {}
        
        for name, config in configs.items():
            results[name] = self.estimate_per_request(config)
        
        # Find most efficient
        best = min(results.items(), key=lambda x: x[1]["carbon_grams"])
        
        return {
            "configs": results,
            "most_efficient": best[0],
            "savings_vs_baseline": {
                name: round(1 - (r["carbon_grams"] / list(results.values())[0]["carbon_grams"]), 2)
                for name, r in results.items()
            }
        }


# Compare configurations
estimator = InferenceCarbonEstimator(carbon_intensity=400)

configs = {
    "baseline_fp16": InferenceConfig(
        model_size_b=7, batch_size=1, avg_tokens_per_request=100,
        gpu_type="A100_80GB", precision="fp16"
    ),
    "quantized_int8": InferenceConfig(
        model_size_b=7, batch_size=1, avg_tokens_per_request=100,
        gpu_type="A100_80GB", precision="int8"
    ),
    "quantized_int4": InferenceConfig(
        model_size_b=7, batch_size=1, avg_tokens_per_request=100,
        gpu_type="A100_80GB", precision="int4"
    ),
    "smaller_gpu_int8": InferenceConfig(
        model_size_b=7, batch_size=1, avg_tokens_per_request=100,
        gpu_type="T4", precision="int8"
    )
}

comparison = estimator.compare_configs(configs)
print(comparison)

Quantization Impact

PrecisionMemoryLatencyEnergyQuality Impact
FP32100%100%100%Baseline
FP1650%60%60%Negligible
INT825%40%40%<1% degradation
INT412.5%30%30%1-3% degradation

33.2.6. Caching for Green AI

Every cache hit = one GPU inference saved:

import redis
import hashlib
import json
from typing import Optional, Dict, Any
from dataclasses import dataclass
from prometheus_client import Counter, Gauge

# Metrics
CACHE_HITS = Counter("green_cache_hits_total", "Cache hits", ["model"])
CACHE_MISSES = Counter("green_cache_misses_total", "Cache misses", ["model"])
CARBON_SAVED = Counter("green_carbon_saved_grams", "CO2 saved by caching", ["model"])
CACHE_HIT_RATE = Gauge("green_cache_hit_rate", "Cache hit rate", ["model"])

@dataclass
class CacheStats:
    hits: int
    misses: int
    carbon_saved_g: float
    
    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0

class GreenInferenceCache:
    """Semantic caching with carbon tracking."""
    
    def __init__(
        self,
        model,
        model_name: str,
        carbon_per_inference_g: float = 0.1,
        ttl_seconds: int = 86400,
        redis_url: str = "redis://localhost:6379"
    ):
        self.model = model
        self.model_name = model_name
        self.carbon_per_inference = carbon_per_inference_g
        self.ttl = ttl_seconds
        
        self.cache = redis.from_url(redis_url)
        self.stats = CacheStats(hits=0, misses=0, carbon_saved_g=0)
    
    def _hash_input(self, input_text: str) -> str:
        """Create deterministic hash of input."""
        return hashlib.sha256(input_text.encode()).hexdigest()
    
    def predict(self, input_text: str, **kwargs) -> dict:
        """Predict with caching."""
        cache_key = f"{self.model_name}:{self._hash_input(input_text)}"
        
        # Check cache
        cached = self.cache.get(cache_key)
        if cached:
            self.stats.hits += 1
            self.stats.carbon_saved_g += self.carbon_per_inference
            
            CACHE_HITS.labels(model=self.model_name).inc()
            CARBON_SAVED.labels(model=self.model_name).inc(self.carbon_per_inference)
            
            return json.loads(cached)
        
        # Cache miss - run inference
        self.stats.misses += 1
        CACHE_MISSES.labels(model=self.model_name).inc()
        
        result = self.model.predict(input_text, **kwargs)
        
        # Cache result
        self.cache.setex(cache_key, self.ttl, json.dumps(result))
        
        # Update hit rate gauge
        CACHE_HIT_RATE.labels(model=self.model_name).set(self.stats.hit_rate)
        
        return result
    
    def get_green_metrics(self) -> dict:
        """Get sustainability metrics."""
        return {
            "cache_hits": self.stats.hits,
            "cache_misses": self.stats.misses,
            "hit_rate": round(self.stats.hit_rate, 4),
            "carbon_saved_g": round(self.stats.carbon_saved_g, 2),
            "carbon_saved_kg": round(self.stats.carbon_saved_g / 1000, 4),
            "equivalent_car_km": round(self.stats.carbon_saved_g / 120, 2),
            "inferences_avoided": self.stats.hits
        }
    
    def estimate_monthly_savings(self, daily_requests: int) -> dict:
        """Project monthly carbon savings."""
        estimated_hit_rate = self.stats.hit_rate if self.stats.hit_rate > 0 else 0.3
        monthly_requests = daily_requests * 30
        
        hits = int(monthly_requests * estimated_hit_rate)
        carbon_saved = hits * self.carbon_per_inference / 1000  # kg
        
        return {
            "projected_monthly_requests": monthly_requests,
            "projected_cache_hits": hits,
            "projected_carbon_saved_kg": round(carbon_saved, 2),
            "projected_cost_saved_usd": round(hits * 0.001, 2)  # Rough GPU cost
        }


class SemanticCache(GreenInferenceCache):
    """Cache with semantic similarity matching."""
    
    def __init__(
        self,
        model,
        model_name: str,
        embedding_model,
        similarity_threshold: float = 0.95,
        **kwargs
    ):
        super().__init__(model, model_name, **kwargs)
        self.embedder = embedding_model
        self.threshold = similarity_threshold
        self.embedding_cache: Dict[str, Any] = {}
    
    def _find_similar_cached(self, input_text: str) -> Optional[str]:
        """Find semantically similar cached input."""
        input_embedding = self.embedder.encode(input_text)
        
        for cached_input, cached_embedding in self.embedding_cache.items():
            similarity = self._cosine_similarity(input_embedding, cached_embedding)
            if similarity >= self.threshold:
                return cached_input
        
        return None
    
    def _cosine_similarity(self, a, b) -> float:
        import numpy as np
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def predict(self, input_text: str, **kwargs) -> dict:
        """Predict with semantic similarity caching."""
        
        # Check for semantically similar cached input
        similar_input = self._find_similar_cached(input_text)
        
        if similar_input:
            cache_key = f"{self.model_name}:{self._hash_input(similar_input)}"
            cached = self.cache.get(cache_key)
            if cached:
                self.stats.hits += 1
                self.stats.carbon_saved_g += self.carbon_per_inference
                return json.loads(cached)
        
        # Cache miss - run inference
        self.stats.misses += 1
        result = self.model.predict(input_text, **kwargs)
        
        # Cache with embedding
        cache_key = f"{self.model_name}:{self._hash_input(input_text)}"
        self.cache.setex(cache_key, self.ttl, json.dumps(result))
        self.embedding_cache[input_text] = self.embedder.encode(input_text)
        
        return result

33.2.7. Hardware Efficiency

HardwareUse CasePerf/WattRecommendation
NVIDIA A100Training + inferenceBaselineGeneral purpose
NVIDIA H100Large training1.2xFastest training
Google TPU v4Matrix ops1.5xTensorFlow/JAX workloads
Google TPU v5eEfficient inference2xCost-optimized inference
AWS Inferentia2Inference only3xHigh-volume inference
AWS TrainiumTraining1.5xAWS training workloads
Apple M-seriesEdge inference4xOn-device ML
Intel Gaudi2Training1.3xAlternative to NVIDIA

Hardware Selection Tool

from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class WorkloadType(Enum):
    TRAINING = "training"
    INFERENCE = "inference"
    BOTH = "both"

@dataclass
class HardwareOption:
    name: str
    provider: str
    power_watts: int
    cost_per_hour: float
    workload_type: WorkloadType
    perf_per_watt: float  # Relative to A100 baseline
    availability: str  # "on_demand", "reserved", "spot"

class GreenHardwareSelector:
    """Select optimal hardware for carbon efficiency."""
    
    HARDWARE_OPTIONS = [
        HardwareOption("A100_80GB", "AWS/GCP", 400, 32.77, WorkloadType.BOTH, 1.0, "on_demand"),
        HardwareOption("H100_80GB", "AWS/GCP", 700, 65.0, WorkloadType.TRAINING, 1.2, "on_demand"),
        HardwareOption("TPU_v4", "GCP", 275, 12.88, WorkloadType.TRAINING, 1.5, "on_demand"),
        HardwareOption("TPU_v5e", "GCP", 200, 8.0, WorkloadType.INFERENCE, 2.0, "on_demand"),
        HardwareOption("Inferentia2", "AWS", 120, 1.92, WorkloadType.INFERENCE, 3.0, "on_demand"),
        HardwareOption("Trainium", "AWS", 300, 22.0, WorkloadType.TRAINING, 1.5, "on_demand"),
        HardwareOption("L4", "GCP", 72, 1.78, WorkloadType.INFERENCE, 1.8, "on_demand"),
        HardwareOption("T4", "AWS/GCP", 70, 0.53, WorkloadType.INFERENCE, 1.2, "spot"),
    ]
    
    def select_for_workload(
        self,
        workload: WorkloadType,
        budget_per_hour: float,
        carbon_priority: float = 0.5  # 0=cost only, 1=carbon only
    ) -> List[HardwareOption]:
        """Select hardware optimizing for carbon and cost."""
        
        # Filter by workload type
        candidates = [
            h for h in self.HARDWARE_OPTIONS
            if h.workload_type in [workload, WorkloadType.BOTH]
        ]
        
        # Filter by budget
        candidates = [h for h in candidates if h.cost_per_hour <= budget_per_hour]
        
        if not candidates:
            return []
        
        # Score by combined metric
        def score(h: HardwareOption) -> float:
            carbon_score = h.perf_per_watt / h.power_watts  # Higher is better
            cost_score = 1 / h.cost_per_hour  # Lower cost is better
            
            return carbon_priority * carbon_score + (1 - carbon_priority) * cost_score
        
        candidates.sort(key=score, reverse=True)
        return candidates
    
    def recommend(
        self,
        workload: WorkloadType,
        estimated_hours: float,
        max_budget: float
    ) -> dict:
        """Get hardware recommendation with projections."""
        
        hourly_budget = max_budget / estimated_hours
        options = self.select_for_workload(workload, hourly_budget)
        
        if not options:
            return {"error": "No hardware fits budget"}
        
        best = options[0]
        
        # Calculate projections
        total_cost = best.cost_per_hour * estimated_hours
        total_energy_kwh = (best.power_watts / 1000) * estimated_hours
        
        return {
            "recommended_hardware": best.name,
            "provider": best.provider,
            "projected_cost": round(total_cost, 2),
            "projected_energy_kwh": round(total_energy_kwh, 2),
            "perf_per_watt_rating": best.perf_per_watt,
            "alternatives": [
                {"name": h.name, "cost": round(h.cost_per_hour * estimated_hours, 2)}
                for h in options[1:3]
            ]
        }


# Usage
selector = GreenHardwareSelector()

recommendation = selector.recommend(
    workload=WorkloadType.INFERENCE,
    estimated_hours=720,  # 1 month
    max_budget=2000
)
# {'recommended_hardware': 'Inferentia2', 'projected_cost': 1382.4, ...}

33.2.8. GPU Utilization Monitoring

If GPU utilization is 30%, you waste 70% of energy:

import subprocess
import time
from typing import List, Dict
from dataclasses import dataclass
from statistics import mean, stdev
from prometheus_client import Gauge

GPU_UTILIZATION = Gauge("gpu_utilization_percent", "GPU utilization", ["gpu_id"])
GPU_POWER = Gauge("gpu_power_watts", "GPU power draw", ["gpu_id"])
GPU_MEMORY = Gauge("gpu_memory_used_percent", "GPU memory usage", ["gpu_id"])

@dataclass
class GPUStats:
    gpu_id: int
    utilization: float
    memory_used: float
    memory_total: float
    power_draw: float
    temperature: float

class GPUMonitor:
    """Monitor GPU efficiency for carbon optimization."""
    
    UTILIZATION_TARGET = 80  # Target utilization %
    
    def __init__(self, sample_interval: float = 1.0):
        self.sample_interval = sample_interval
        self.history: List[Dict[int, GPUStats]] = []
    
    def sample(self) -> Dict[int, GPUStats]:
        """Sample current GPU stats."""
        result = subprocess.run(
            [
                "nvidia-smi",
                "--query-gpu=index,utilization.gpu,memory.used,memory.total,power.draw,temperature.gpu",
                "--format=csv,noheader,nounits"
            ],
            capture_output=True,
            text=True
        )
        
        stats = {}
        for line in result.stdout.strip().split("\n"):
            parts = [p.strip() for p in line.split(",")]
            if len(parts) >= 6:
                gpu_id = int(parts[0])
                stats[gpu_id] = GPUStats(
                    gpu_id=gpu_id,
                    utilization=float(parts[1]),
                    memory_used=float(parts[2]),
                    memory_total=float(parts[3]),
                    power_draw=float(parts[4]),
                    temperature=float(parts[5])
                )
        
        # Update Prometheus metrics
        for gpu_id, s in stats.items():
            GPU_UTILIZATION.labels(gpu_id=str(gpu_id)).set(s.utilization)
            GPU_POWER.labels(gpu_id=str(gpu_id)).set(s.power_draw)
            GPU_MEMORY.labels(gpu_id=str(gpu_id)).set(
                100 * s.memory_used / s.memory_total
            )
        
        return stats
    
    def monitor(self, duration_seconds: int = 60) -> dict:
        """Monitor GPUs for specified duration."""
        end_time = time.time() + duration_seconds
        samples = []
        
        while time.time() < end_time:
            samples.append(self.sample())
            time.sleep(self.sample_interval)
        
        return self._analyze(samples)
    
    def _analyze(self, samples: List[Dict[int, GPUStats]]) -> dict:
        """Analyze collected samples."""
        if not samples:
            return {}
        
        gpu_ids = samples[0].keys()
        analysis = {}
        
        for gpu_id in gpu_ids:
            utilizations = [s[gpu_id].utilization for s in samples if gpu_id in s]
            powers = [s[gpu_id].power_draw for s in samples if gpu_id in s]
            
            avg_util = mean(utilizations)
            avg_power = mean(powers)
            
            # Calculate wasted energy
            waste_ratio = max(0, (self.UTILIZATION_TARGET - avg_util) / self.UTILIZATION_TARGET)
            
            analysis[gpu_id] = {
                "avg_utilization": round(avg_util, 1),
                "std_utilization": round(stdev(utilizations), 1) if len(utilizations) > 1 else 0,
                "avg_power_watts": round(avg_power, 1),
                "waste_ratio": round(waste_ratio, 2),
                "status": "optimal" if avg_util >= self.UTILIZATION_TARGET else "underutilized"
            }
        
        return {
            "gpus": analysis,
            "recommendations": self._get_recommendations(analysis)
        }
    
    def _get_recommendations(self, analysis: Dict) -> List[str]:
        """Generate optimization recommendations."""
        recommendations = []
        
        for gpu_id, stats in analysis.items():
            if stats["avg_utilization"] < 50:
                recommendations.append(
                    f"GPU {gpu_id}: Very low utilization ({stats['avg_utilization']}%). "
                    f"Consider increasing batch size or using smaller GPU."
                )
            elif stats["avg_utilization"] < self.UTILIZATION_TARGET:
                recommendations.append(
                    f"GPU {gpu_id}: Utilization {stats['avg_utilization']}% below target. "
                    f"Suggestions: increase batch size, add DataLoader workers, use WebDataset."
                )
        
        return recommendations


# Usage
monitor = GPUMonitor()
results = monitor.monitor(duration_seconds=60)
print(results)
# {'gpus': {0: {'avg_utilization': 72.3, 'status': 'underutilized', ...}}, 
#  'recommendations': ['GPU 0: Utilization 72.3% below target...']}

33.2.9. SCI Score (Software Carbon Intensity)

The Green Software Foundation’s standard metric:

$$ SCI = ((E \times I) + M) / R $$

VariableMeaningUnit
EEnergy consumedkWh
ICarbon intensity of gridgCO2/kWh
MEmbodied carbon (hardware manufacturing)gCO2
RFunctional unitRequests, users, etc.
from dataclasses import dataclass

@dataclass
class SCICalculator:
    """Calculate Software Carbon Intensity score."""
    
    # Embodied carbon estimates (gCO2)
    EMBODIED_CARBON = {
        "A100": 150_000,  # ~150kg CO2 to manufacture
        "H100": 200_000,
        "TPU_v4": 100_000,
        "T4": 50_000,
        "CPU_server": 200_000
    }
    
    # Hardware lifetime assumptions (hours)
    HARDWARE_LIFETIME = {
        "A100": 35_000,  # ~4 years
        "H100": 35_000,
        "TPU_v4": 35_000,
        "T4": 35_000,
        "CPU_server": 52_500  # ~6 years
    }
    
    def calculate(
        self,
        energy_kwh: float,
        carbon_intensity: float,
        functional_units: int,
        hardware_type: str,
        usage_hours: float
    ) -> dict:
        """Calculate SCI score.
        
        Args:
            energy_kwh: Energy consumed in kWh
            carbon_intensity: Grid carbon intensity (gCO2/kWh)
            functional_units: Number of functional units (requests, users)
            hardware_type: Type of hardware used
            usage_hours: Hours of hardware usage
            
        Returns:
            SCI breakdown and score
        """
        # Operational carbon
        operational_carbon = energy_kwh * carbon_intensity
        
        # Embodied carbon allocation
        total_embodied = self.EMBODIED_CARBON.get(hardware_type, 100_000)
        lifetime = self.HARDWARE_LIFETIME.get(hardware_type, 35_000)
        
        # Amortize embodied carbon over lifetime
        embodied_allocation = (usage_hours / lifetime) * total_embodied
        
        # Total carbon
        total_carbon = operational_carbon + embodied_allocation
        
        # SCI score
        sci = total_carbon / functional_units if functional_units > 0 else 0
        
        return {
            "sci_score": round(sci, 4),
            "sci_unit": "gCO2eq per request",
            "breakdown": {
                "operational_carbon_g": round(operational_carbon, 2),
                "embodied_carbon_g": round(embodied_allocation, 2),
                "total_carbon_g": round(total_carbon, 2)
            },
            "functional_units": functional_units,
            "interpretation": self._interpret_score(sci)
        }
    
    def _interpret_score(self, sci: float) -> str:
        """Interpret SCI score."""
        if sci < 0.1:
            return "Excellent - Very efficient"
        elif sci < 1.0:
            return "Good - Room for improvement"
        elif sci < 10.0:
            return "Moderate - Consider optimization"
        else:
            return "Poor - Significant optimization needed"
    
    def compare_scenarios(
        self,
        scenarios: dict  # {name: {energy_kwh, carbon_intensity, requests, hardware, hours}}
    ) -> dict:
        """Compare SCI across scenarios."""
        results = {}
        
        for name, params in scenarios.items():
            results[name] = self.calculate(
                energy_kwh=params["energy_kwh"],
                carbon_intensity=params["carbon_intensity"],
                functional_units=params["requests"],
                hardware_type=params["hardware"],
                usage_hours=params["hours"]
            )
        
        # Rank by SCI
        ranked = sorted(results.items(), key=lambda x: x[1]["sci_score"])
        
        return {
            "scenarios": results,
            "best_scenario": ranked[0][0],
            "worst_scenario": ranked[-1][0]
        }


# Usage
calc = SCICalculator()

# Compare different deployment options
scenarios = {
    "us_east_a100": {
        "energy_kwh": 100,
        "carbon_intensity": 400,
        "requests": 1_000_000,
        "hardware": "A100",
        "hours": 24
    },
    "canada_a100": {
        "energy_kwh": 100,
        "carbon_intensity": 3,
        "requests": 1_000_000,
        "hardware": "A100",
        "hours": 24
    },
    "us_east_t4": {
        "energy_kwh": 20,
        "carbon_intensity": 400,
        "requests": 1_000_000,
        "hardware": "T4",
        "hours": 24
    }
}

comparison = calc.compare_scenarios(scenarios)
print(f"Best option: {comparison['best_scenario']}")
# Best option: canada_a100

33.2.10. Serverless vs Serverful Carbon

WorkloadBest ChoiceReason
Bursty/Low trafficServerlessScale to zero = 0 idle energy
Constant high trafficServerfulBetter utilization, no cold starts
Internal toolsServerlessOften idle
Customer-facing criticalServerfulConsistent performance
Development/testingServerlessIntermittent usage
Batch processingSpot/Pre-emptibleFlexible timing

33.2.11. Summary Checklist

StepActionImpactEffort
1Add CodeCarbon to training pipelinesVisibilityLow
2Select low-carbon regions for batch jobs-80-95%Low
3Implement model distillation-70-90% inferenceHigh
4Quantize to INT8 for inference-60%Medium
5Cache frequent predictions-50-90%Medium
6Monitor GPU utilizationVisibilityLow
7Use efficient hardware (TPUs/Inferentia)-40-60%Medium
8Calculate and track SCI scoreReportingLow
9Set carbon budgets for teamsGovernanceMedium
10Report carbon in model cardsTransparencyLow

Quick Wins Ranking

ActionCarbon ReductionImplementation Time
Train in Quebec/Stockholm90%+1 day
Add caching layer50-90%1 week
Quantize models60%2-3 days
Increase batch size20-40%1 hour
Use spot instancesSame carbon, less cost1 day
Switch to TPUs (if TF/JAX)40%1 week

[End of Section 33.2]