Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

32.7. Industry-Specific Compliance: The Vertical Constraints

Note

One Size Does Not Fit All: A recommendation system for funny cat videos has different constraints than a diagnostic radiology model. This chapter explores the “Hard Constraints” found in Healthcare, Finance, Government, Automotive, and other regulated industries.

Compliance is often viewed as a monolith, but the actual engineering implementation varies wildly by vertical. Each industry has evolved its own regulatory framework based on historical failures, public safety concerns, and stakeholder protections. This chapter provides actionable engineering guidance for building compliant ML systems across major regulated industries.


32.7.1. Healthcare & Life Sciences (HIPAA / GxP / FDA)

In the US, the Health Insurance Portability and Accountability Act (HIPAA) governs Protected Health Information (PHI).

The HIPAA Technical Safeguards

SafeguardRequirementMLOps Implementation
Access ControlUnique user IDsIAM + SSO integration
Audit ControlsRecord access logsCloudTrail/Stackdriver + SIEM
IntegrityProtect from alterationS3 versioning + checksums
Transmission SecurityEncryption in transitTLS 1.2+ everywhere
EncryptionProtect at restKMS/CMEK for all storage

1. The Business Associate Agreement (BAA)

Before you spin up p3.2xlarge instances on AWS, you must sign a BAA.

  • Implication: You can ONLY use AWS services that are “HIPAA Eligible.”
  • Trap: New AWS AI services (e.g., Bedrock preview) might not be HIPAA eligible on launch day. Using them is a violation.
# terraform/healthcare/main.tf - HIPAA-Compliant Infrastructure

variable "hipaa_eligible_services" {
  description = "List of HIPAA-eligible AWS services"
  type        = list(string)
  default = [
    "ec2", "s3", "rds", "sagemaker", "lambda",
    "ecs", "fargate", "ecr", "cloudwatch", "kms",
    "secretsmanager", "sns", "sqs", "dynamodb"
  ]
}

# Enforce KMS encryption on all S3 buckets
resource "aws_s3_bucket" "phi_data" {
  bucket = "phi-training-data-${var.environment}"
  
  # Force destroy disabled - PHI requires retention
  force_destroy = false
}

resource "aws_s3_bucket_server_side_encryption_configuration" "phi_encryption" {
  bucket = aws_s3_bucket.phi_data.id

  rule {
    apply_server_side_encryption_by_default {
      kms_master_key_id = aws_kms_key.phi.arn
      sse_algorithm     = "aws:kms"
    }
    bucket_key_enabled = true
  }
}

resource "aws_s3_bucket_public_access_block" "phi_block" {
  bucket = aws_s3_bucket.phi_data.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# PHI-specific KMS key with strict access control
resource "aws_kms_key" "phi" {
  description             = "KMS key for PHI encryption"
  deletion_window_in_days = 30
  enable_key_rotation     = true
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "AllowKeyAdministration"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/SecurityAdmin"
        }
        Action   = ["kms:*"]
        Resource = "*"
      },
      {
        Sid    = "AllowMLAccess"
        Effect = "Allow"
        Principal = {
          AWS = aws_iam_role.sagemaker_execution.arn
        }
        Action = [
          "kms:Encrypt",
          "kms:Decrypt",
          "kms:GenerateDataKey*"
        ]
        Resource = "*"
      }
    ]
  })
}

# CloudTrail for PHI access auditing
resource "aws_cloudtrail" "phi_audit" {
  name                          = "phi-access-audit"
  s3_bucket_name                = aws_s3_bucket.audit_logs.id
  include_global_service_events = true
  is_multi_region_trail         = true
  enable_log_file_validation    = true
  kms_key_id                    = aws_kms_key.audit.arn

  event_selector {
    read_write_type           = "All"
    include_management_events = true

    data_resource {
      type   = "AWS::S3::Object"
      values = ["${aws_s3_bucket.phi_data.arn}/"]
    }
  }

  insight_selector {
    insight_type = "ApiCallRateInsight"
  }
}

2. Architecture: The De-Identification Proxy

You rarely train on raw PHI. You train on de-identified data following Safe Harbor or Expert Determination methods.

graph LR
    subgraph "On-Premise (Hospital)"
        A[EMR System] -->|HL7/FHIR| B[De-ID Gateway]
        B -->|Remove PHI| C[Audit Log]
    end
    
    subgraph "Cloud (AWS/GCP)"
        D[Ingestion S3] -->|Glue ETL| E[De-ID Lake]
        E -->|Training| F[SageMaker]
        F -->|Model| G[Registry]
    end
    
    B -->|Encrypted Transfer| D
# phi_deidentification.py - HIPAA Safe Harbor Implementation

import re
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import hashlib
import secrets

@dataclass
class PHIElement:
    """HIPAA Safe Harbor 18 Identifiers"""
    names: bool = True
    geographic_subdivisions: bool = True  # Below state level
    dates: bool = True  # Except year for age > 89
    phone_numbers: bool = True
    fax_numbers: bool = True
    email_addresses: bool = True
    ssn: bool = True
    mrn: bool = True  # Medical Record Numbers
    health_plan_beneficiary: bool = True
    account_numbers: bool = True
    certificate_license_numbers: bool = True
    vehicle_identifiers: bool = True
    device_identifiers: bool = True
    urls: bool = True
    ip_addresses: bool = True
    biometric_identifiers: bool = True
    photos: bool = True
    unique_codes: bool = True


class HIPAADeidentifier:
    """
    De-identify PHI following HIPAA Safe Harbor method.
    
    Safe Harbor requires removal or generalization of 18 identifiers
    with no actual knowledge that remaining info could identify an individual.
    """
    
    def __init__(self, salt: str = None):
        self.salt = salt or secrets.token_hex(32)
        self._compile_patterns()
    
    def _compile_patterns(self):
        """Pre-compile regex patterns for efficiency."""
        
        self.patterns = {
            'ssn': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
            'phone': re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'),
            'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            'mrn': re.compile(r'\b(?:MRN|MR#|Patient ID)[:\s]*(\d+)\b', re.I),
            'ip': re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'),
            'url': re.compile(r'https?://[^\s]+'),
            # Dates in various formats
            'date': re.compile(
                r'\b(?:\d{1,2}[-/]\d{1,2}[-/]\d{2,4})|'
                r'(?:\d{4}[-/]\d{1,2}[-/]\d{1,2})|'
                r'(?:(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4})\b',
                re.I
            ),
            # Names (simplified - production would use NER)
            'name_prefix': re.compile(r'\b(?:Patient|Name|Dr\.?|Mr\.?|Mrs\.?|Ms\.?)[:\s]+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)', re.I),
        }
    
    def deidentify_text(self, text: str) -> Dict[str, any]:
        """
        De-identify text and return result with audit log.
        
        Returns:
            dict with 'text', 'redactions', 'audit_id'
        """
        
        redactions = []
        result = text
        
        # Replace patterns in order of specificity
        for pattern_name, pattern in self.patterns.items():
            for match in pattern.finditer(result):
                matched_text = match.group(0)
                
                # Generate consistent pseudonym for the same value
                pseudonym = self._generate_pseudonym(matched_text, pattern_name)
                
                redactions.append({
                    'type': pattern_name,
                    'original_hash': self._hash_value(matched_text),
                    'position': match.span(),
                    'pseudonym': pseudonym
                })
                
                result = result[:match.start()] + pseudonym + result[match.end():]
        
        # Generate audit ID
        audit_id = hashlib.sha256(
            f"{text}{datetime.utcnow().isoformat()}{self.salt}".encode()
        ).hexdigest()[:16]
        
        return {
            'text': result,
            'redactions': redactions,
            'audit_id': audit_id,
            'timestamp': datetime.utcnow().isoformat()
        }
    
    def _generate_pseudonym(self, value: str, phi_type: str) -> str:
        """Generate consistent pseudonym for a value."""
        
        # Use HMAC for consistent but irreversible pseudonyms
        hash_val = hashlib.pbkdf2_hmac(
            'sha256',
            value.encode(),
            self.salt.encode(),
            100000
        ).hex()[:8]
        
        return f"[{phi_type.upper()}_{hash_val}]"
    
    def _hash_value(self, value: str) -> str:
        """Create one-way hash for audit purposes."""
        return hashlib.sha256(
            f"{value}{self.salt}".encode()
        ).hexdigest()
    
    def generalize_dates(self, date_str: str) -> str:
        """
        Generalize dates per Safe Harbor.
        - Keep only year
        - For age > 89, aggregate to 90+
        """
        # Implementation depends on date format
        # Return only year portion
        try:
            # Try parsing various formats
            for fmt in ['%m/%d/%Y', '%Y-%m-%d', '%B %d, %Y']:
                try:
                    dt = datetime.strptime(date_str, fmt)
                    return str(dt.year)
                except ValueError:
                    continue
        except:
            return "[DATE_REDACTED]"
        
        return "[DATE_REDACTED]"


# GCP Implementation with Cloud Healthcare API
class GCPHealthcareDeidentifier:
    """De-identify using Google Cloud Healthcare API."""
    
    def __init__(self, project_id: str, location: str):
        from google.cloud import healthcare_v1
        
        self.client = healthcare_v1.DeidentifyClient()
        self.project_id = project_id
        self.location = location
    
    def deidentify_dicom(
        self,
        source_dataset: str,
        destination_dataset: str
    ):
        """De-identify DICOM images (medical imaging)."""
        
        from google.cloud.healthcare_v1.types import deidentify
        
        # Configure de-identification
        config = deidentify.DeidentifyConfig(
            dicom=deidentify.DicomConfig(
                filter_profile=deidentify.DicomConfig.TagFilterProfile.DEIDENTIFY_TAG_CONTENTS,
                remove_list=deidentify.DicomTagList(
                    tags=[
                        "PatientName",
                        "PatientID", 
                        "PatientBirthDate",
                        "ReferringPhysicianName"
                    ]
                )
            ),
            text=deidentify.TextConfig(
                transformations=[
                    deidentify.InfoTypeTransformation(
                        info_types=["PERSON_NAME", "DATE", "PHONE_NUMBER"],
                        redact_config=deidentify.RedactConfig()
                    )
                ]
            )
        )
        
        # Execute de-identification
        request = deidentify.DeidentifyDatasetRequest(
            source_dataset=source_dataset,
            destination_dataset=destination_dataset,
            config=config
        )
        
        operation = self.client.deidentify_dataset(request=request)
        return operation.result()

3. FDA SaMD (Software as Medical Device)

If your ML model diagnoses disease, it’s a Medical Device subject to FDA regulation.

# fda_samd_compliance.py - FDA Pre-Submission Requirements

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
import json


class DeviceClass(Enum):
    CLASS_I = 1   # Low risk (tongue depressors)
    CLASS_II = 2  # Moderate risk (X-ray readers)
    CLASS_III = 3 # High risk (pacemakers, AI diagnostics)


@dataclass
class PCCPDocument:
    """
    Predetermined Change Control Plan (FDA)
    
    Required for AI/ML devices that will be updated post-market.
    Must specify WHAT changes, HOW validated, WHO approves.
    """
    
    device_name: str
    intended_changes: List[Dict]
    validation_protocol: Dict
    governance_process: Dict
    
    def generate_submission(self) -> str:
        """Generate PCCP document for FDA submission."""
        
        return f"""
# Predetermined Change Control Plan
## Device: {self.device_name}

## 1. Description of Modifications Covered

{self._format_intended_changes()}

## 2. Modification Protocol

### 2.1 Data Requirements
- Minimum dataset size: {self.validation_protocol.get('min_samples', 1000)}
- Required demographics representation: {self.validation_protocol.get('demographics')}
- Data quality thresholds: {self.validation_protocol.get('data_quality')}

### 2.2 Performance Thresholds
{self._format_performance_thresholds()}

### 2.3 Validation Methodology
- Cross-validation: {self.validation_protocol.get('cv_folds', 5)}-fold
- External validation dataset: Required
- Comparison to predicate: Required

## 3. Risk Analysis

### 3.1 Anticipated Risks
{self._format_risks()}

### 3.2 Risk Mitigation
- Automatic rollback if AUC < {self.validation_protocol.get('min_auc', 0.85)}
- Human-in-the-loop for edge cases
- Continuous monitoring post-deployment

## 4. Governance

### 4.1 Approval Chain
{self._format_governance()}

### 4.2 Documentation Requirements
- Model card with performance metrics
- Bias analysis report
- Validation study report
- Audit trail of all changes
"""
    
    def _format_intended_changes(self) -> str:
        lines = []
        for i, change in enumerate(self.intended_changes, 1):
            lines.append(f"{i}. **{change['type']}**: {change['description']}")
            lines.append(f"   - Trigger: {change.get('trigger', 'Scheduled')}")
            lines.append(f"   - Expected frequency: {change.get('frequency', 'Quarterly')}")
        return "\n".join(lines)
    
    def _format_performance_thresholds(self) -> str:
        thresholds = self.validation_protocol.get('thresholds', {})
        return "\n".join([
            f"- {metric}: {value}" 
            for metric, value in thresholds.items()
        ])
    
    def _format_risks(self) -> str:
        risks = [
            "Data drift affecting accuracy",
            "Bias amplification in underrepresented groups",
            "Adversarial inputs causing misclassification"
        ]
        return "\n".join([f"- {r}" for r in risks])
    
    def _format_governance(self) -> str:
        return f"""
- Clinical Review: {self.governance_process.get('clinical_reviewer')}
- Technical Review: {self.governance_process.get('technical_reviewer')}
- Quality Assurance: {self.governance_process.get('qa_reviewer')}
- Final Approval: {self.governance_process.get('final_approver')}
"""


# Example usage
pccp = PCCPDocument(
    device_name="RadAssist AI - Chest X-Ray Analysis",
    intended_changes=[
        {
            "type": "Retraining",
            "description": "Periodic retraining on new labeled data from partner hospitals",
            "trigger": "Quarterly or when >10,000 new labeled images available",
            "frequency": "Quarterly"
        },
        {
            "type": "Architecture Update",
            "description": "Update to newer backbone (ResNet -> ConvNeXt) for improved accuracy",
            "trigger": "When new architecture shows >2% AUC improvement",
            "frequency": "Annual"
        }
    ],
    validation_protocol={
        "min_samples": 5000,
        "demographics": "Age, sex, ethnicity proportional to US population",
        "data_quality": "Expert radiologist labels, 2-reader consensus",
        "cv_folds": 5,
        "min_auc": 0.90,
        "thresholds": {
            "AUC-ROC": ">= 0.90",
            "Sensitivity": ">= 0.85",
            "Specificity": ">= 0.80",
            "PPV in high-risk population": ">= 0.70"
        }
    },
    governance_process={
        "clinical_reviewer": "Board-certified radiologist",
        "technical_reviewer": "ML Engineering Lead",
        "qa_reviewer": "Quality Assurance Manager",
        "final_approver": "Chief Medical Officer"
    }
)

print(pccp.generate_submission())

32.7.2. Financial Services (SR 11-7 / ECOA / Basel)

Banking is governed by the Federal Reserve’s SR 11-7 (Guidance on Model Risk Management). It treats models as financial liabilities.

SR 11-7 Model Risk Framework

graph TB
    subgraph "Model Development"
        A[Data & Assumptions] --> B[Model Design]
        B --> C[Implementation]
        C --> D[Testing]
    end
    
    subgraph "Model Validation"
        E[Independent Review] --> F[Conceptual Soundness]
        F --> G[Ongoing Monitoring]
        G --> H[Outcomes Analysis]
    end
    
    subgraph "Model Governance"
        I[Model Inventory] --> J[Approval Process]
        J --> K[Audit Trail]
        K --> L[Board Reporting]
    end
    
    D --> E
    H --> I

1. The Model Inventory System

# model_inventory.py - SR 11-7 Compliant Model Registry

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, date
from enum import Enum
import json


class ModelTier(Enum):
    TIER_1 = "High Impact"    # Material to financial statements
    TIER_2 = "Medium Impact"  # Significant but not material
    TIER_3 = "Low Impact"     # Limited exposure


class ModelStatus(Enum):
    DEVELOPMENT = "Development"
    VALIDATION = "Pending Validation"
    PRODUCTION = "Production"
    MONITORING = "Enhanced Monitoring"
    DECOMMISSIONED = "Decommissioned"


@dataclass
class ModelInventoryEntry:
    """SR 11-7 Model Inventory Entry"""
    
    # Identification
    model_id: str
    model_name: str
    model_version: str
    
    # Classification
    tier: ModelTier
    status: ModelStatus
    business_unit: str
    use_case: str
    
    # Ownership
    model_owner: str
    model_developer: str
    validator: str
    
    # Technical Details
    model_type: str  # e.g., "Logistic Regression", "XGBoost", "Neural Network"
    input_features: List[str]
    output_variable: str
    training_data_period: str
    
    # Risk Assessment
    materiality_assessment: Dict
    limitations: List[str]
    assumptions: List[str]
    
    # Lifecycle
    development_date: date
    validation_date: Optional[date]
    production_date: Optional[date]
    next_review_date: date
    
    # Validation Results
    validation_results: Dict = field(default_factory=dict)
    
    # Monitoring
    performance_metrics: Dict = field(default_factory=dict)
    monitoring_frequency: str = "Monthly"
    
    def to_regulatory_report(self) -> Dict:
        """Generate regulatory-compliant report."""
        return {
            "Model Identification": {
                "ID": self.model_id,
                "Name": self.model_name,
                "Version": self.model_version,
                "Type": self.model_type
            },
            "Risk Classification": {
                "Tier": self.tier.value,
                "Status": self.status.value,
                "Business Unit": self.business_unit
            },
            "Governance": {
                "Owner": self.model_owner,
                "Developer": self.model_developer,
                "Independent Validator": self.validator
            },
            "Materiality": self.materiality_assessment,
            "Key Dates": {
                "Developed": str(self.development_date),
                "Validated": str(self.validation_date) if self.validation_date else "Pending",
                "Production": str(self.production_date) if self.production_date else "N/A",
                "Next Review": str(self.next_review_date)
            },
            "Limitations": self.limitations,
            "Performance Metrics": self.performance_metrics
        }


class ModelInventorySystem:
    """Enterprise Model Inventory for SR 11-7 Compliance"""
    
    def __init__(self, db_connection):
        self.db = db_connection
        self.models = {}
    
    def register_model(self, entry: ModelInventoryEntry) -> str:
        """Register a new model in the inventory."""
        
        # Validate required fields for tier
        if entry.tier == ModelTier.TIER_1:
            self._validate_tier1_requirements(entry)
        
        # Generate unique ID if not provided
        if not entry.model_id:
            entry.model_id = self._generate_model_id(entry)
        
        # Store in database
        self.models[entry.model_id] = entry
        
        # Trigger workflow based on tier
        if entry.tier in [ModelTier.TIER_1, ModelTier.TIER_2]:
            self._trigger_validation_workflow(entry)
        
        return entry.model_id
    
    def _validate_tier1_requirements(self, entry: ModelInventoryEntry):
        """Tier 1 models require additional documentation."""
        
        required_fields = [
            'materiality_assessment',
            'limitations',
            'assumptions',
            'validator'
        ]
        
        for field in required_fields:
            value = getattr(entry, field)
            if not value or (isinstance(value, (list, dict)) and len(value) == 0):
                raise ValueError(f"Tier 1 models require: {field}")
    
    def get_models_for_review(self, as_of_date: date = None) -> List[ModelInventoryEntry]:
        """Get models requiring periodic review."""
        
        as_of_date = as_of_date or date.today()
        
        return [
            model for model in self.models.values()
            if model.next_review_date <= as_of_date
            and model.status == ModelStatus.PRODUCTION
        ]
    
    def generate_board_report(self) -> Dict:
        """Generate quarterly board report on model risk."""
        
        return {
            "Total Models": len(self.models),
            "By Tier": {
                tier.value: len([m for m in self.models.values() if m.tier == tier])
                for tier in ModelTier
            },
            "By Status": {
                status.value: len([m for m in self.models.values() if m.status == status])
                for status in ModelStatus
            },
            "Models Requiring Review": len(self.get_models_for_review()),
            "Validation Backlog": len([
                m for m in self.models.values() 
                if m.status == ModelStatus.VALIDATION
            ])
        }

2. Fair Lending Compliance (ECOA)

# fair_lending.py - ECOA Disparate Impact Analysis

import pandas as pd
import numpy as np
from scipy.stats import fisher_exact, chi2_contingency
from dataclasses import dataclass
from typing import Dict, List, Tuple


@dataclass
class FairnessMetrics:
    """Fair lending metrics for regulatory compliance."""
    
    adverse_impact_ratio: float  # Four-fifths rule
    odds_ratio: float
    p_value: float
    chi_square: float
    chi_square_p: float
    approval_rate_protected: float
    approval_rate_reference: float
    
    @property
    def passes_four_fifths_rule(self) -> bool:
        """AIR >= 0.8 is generally considered acceptable."""
        return self.adverse_impact_ratio >= 0.8
    
    @property
    def statistically_significant(self) -> bool:
        """p < 0.05 indicates significant difference."""
        return self.p_value < 0.05


class DisparateImpactAnalyzer:
    """
    Analyze credit decisions for ECOA compliance.
    
    The Four-Fifths (80%) Rule:
    If the selection rate for a protected group is less than 80%
    of the rate for the reference group, disparate impact may exist.
    """
    
    def __init__(self):
        self.results = {}
    
    def analyze_protected_class(
        self,
        df: pd.DataFrame,
        protected_col: str,
        outcome_col: str,
        protected_value: any = 1,
        reference_value: any = 0
    ) -> FairnessMetrics:
        """
        Analyze disparate impact for a protected class.
        
        Args:
            df: DataFrame with predictions
            protected_col: Column indicating protected class membership
            outcome_col: Column indicating approval (1) or denial (0)
            protected_value: Value indicating protected group
            reference_value: Value indicating reference group
            
        Returns:
            FairnessMetrics with all relevant statistics
        """
        
        # Split groups
        protected_group = df[df[protected_col] == protected_value]
        reference_group = df[df[protected_col] == reference_value]
        
        # Calculate approval rates
        rate_protected = protected_group[outcome_col].mean()
        rate_reference = reference_group[outcome_col].mean()
        
        # Adverse Impact Ratio (Four-Fifths Rule)
        air = rate_protected / rate_reference if rate_reference > 0 else 0
        
        # Build contingency table
        #              Approved   Denied
        # Protected       a          b
        # Reference       c          d
        
        a = protected_group[outcome_col].sum()
        b = len(protected_group) - a
        c = reference_group[outcome_col].sum()
        d = len(reference_group) - c
        
        contingency = [[a, b], [c, d]]
        
        # Fisher's Exact Test
        odds_ratio, p_value = fisher_exact(contingency)
        
        # Chi-Square Test
        chi2, chi_p, dof, expected = chi2_contingency(contingency)
        
        return FairnessMetrics(
            adverse_impact_ratio=air,
            odds_ratio=odds_ratio,
            p_value=p_value,
            chi_square=chi2,
            chi_square_p=chi_p,
            approval_rate_protected=rate_protected,
            approval_rate_reference=rate_reference
        )
    
    def analyze_all_protected_classes(
        self,
        df: pd.DataFrame,
        outcome_col: str,
        protected_columns: Dict[str, Tuple[any, any]]
    ) -> Dict[str, FairnessMetrics]:
        """
        Analyze all protected classes at once.
        
        Args:
            protected_columns: Dict mapping column names to (protected_value, reference_value)
        """
        
        results = {}
        
        for col, (protected_val, reference_val) in protected_columns.items():
            results[col] = self.analyze_protected_class(
                df, col, outcome_col, protected_val, reference_val
            )
        
        return results
    
    def generate_compliance_report(
        self,
        results: Dict[str, FairnessMetrics],
        model_name: str
    ) -> str:
        """Generate ECOA compliance report."""
        
        report = f"""
# Fair Lending Compliance Report
## Model: {model_name}
## Date: {pd.Timestamp.now().strftime('%Y-%m-%d')}

---

## Executive Summary

"""
        
        failures = []
        for protected_class, metrics in results.items():
            if not metrics.passes_four_fifths_rule:
                failures.append(protected_class)
        
        if failures:
            report += f"⚠️ **ATTENTION REQUIRED**: Potential disparate impact detected for: {', '.join(failures)}\n\n"
        else:
            report += "✅ All protected classes pass the Four-Fifths Rule.\n\n"
        
        report += "## Detailed Results\n\n"
        
        for protected_class, metrics in results.items():
            status = "✅ PASS" if metrics.passes_four_fifths_rule else "❌ FAIL"
            
            report += f"""
### {protected_class} {status}

| Metric | Value |
|:-------|:------|
| Adverse Impact Ratio | {metrics.adverse_impact_ratio:.4f} |
| Protected Group Approval Rate | {metrics.approval_rate_protected:.2%} |
| Reference Group Approval Rate | {metrics.approval_rate_reference:.2%} |
| Odds Ratio | {metrics.odds_ratio:.4f} |
| Fisher's Exact p-value | {metrics.p_value:.4f} |
| Chi-Square Statistic | {metrics.chi_square:.2f} |
| Chi-Square p-value | {metrics.chi_square_p:.4f} |

"""
        
        report += """
## Methodology

This analysis follows the EEOC Uniform Guidelines on Employee Selection Procedures,
adapted for credit decisions as recommended by regulatory guidance.

The Four-Fifths Rule: If the selection rate for a protected class is less than
80% (4/5) of the rate for the reference group, disparate impact may be present.

Statistical significance is assessed using Fisher's Exact Test (p < 0.05).
"""
        
        return report


# Example usage
analyzer = DisparateImpactAnalyzer()

# Sample data
df = pd.DataFrame({
    'approved': np.random.binomial(1, 0.7, 10000),
    'gender': np.random.binomial(1, 0.5, 10000),  # 1 = female
    'race_minority': np.random.binomial(1, 0.3, 10000),  # 1 = minority
    'age_over_40': np.random.binomial(1, 0.4, 10000)  # 1 = over 40
})

results = analyzer.analyze_all_protected_classes(
    df,
    outcome_col='approved',
    protected_columns={
        'Gender (Female)': ('gender', 1, 0),
        'Race (Minority)': ('race_minority', 1, 0),
        'Age (Over 40)': ('age_over_40', 1, 0)
    }
)

print(analyzer.generate_compliance_report(results, "Credit Approval Model v2.1"))

32.7.3. Government & Defense (FedRAMP / IL / CMMC)

US Government work requires FedRAMP authorization at various Impact Levels (IL).

Impact Levels

LevelData TypeCloud RequirementExample
IL2PublicCommercial CloudPublic websites
IL4CUIGovCloudControlled documents
IL5Higher CUIGovCloud + ControlsDefense contracts
IL6SecretAir-GappedClassified systems

Air-Gapped MLOps Architecture

graph LR
    subgraph "Low Side (Connected)"
        A[Development Env] --> B[Build Artifacts]
        B --> C[Security Scan]
        C --> D[Approval Queue]
    end
    
    subgraph "Cross-Domain Solution"
        E[One-Way Diode]
    end
    
    subgraph "High Side (Air-Gapped)"
        F[Staging Env] --> G[Validation]
        G --> H[Production]
    end
    
    D --> E
    E --> F
# govcloud_infrastructure.tf - FedRAMP High Compliant

provider "aws" {
  region = "us-gov-west-1"  # GovCloud region
  
  # FIPS 140-2 endpoints
  endpoints {
    s3  = "s3-fips.us-gov-west-1.amazonaws.com"
    sts = "sts.us-gov-west-1.amazonaws.com"
    kms = "kms-fips.us-gov-west-1.amazonaws.com"
  }
}

# Force FIPS-compliant encryption
resource "aws_s3_bucket" "ml_artifacts" {
  bucket = "ml-artifacts-${var.environment}-govcloud"
}

resource "aws_s3_bucket_server_side_encryption_configuration" "fips" {
  bucket = aws_s3_bucket.ml_artifacts.id

  rule {
    apply_server_side_encryption_by_default {
      kms_master_key_id = aws_kms_key.fips_key.arn
      sse_algorithm     = "aws:kms"
    }
  }
}

# FIPS-validated KMS key
resource "aws_kms_key" "fips_key" {
  description              = "FIPS 140-2 validated encryption key"
  customer_master_key_spec = "SYMMETRIC_DEFAULT"
  key_usage                = "ENCRYPT_DECRYPT"
  enable_key_rotation      = true
  
  # Strict policy requiring US persons
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "RequireUSPersons"
        Effect = "Deny"
        Principal = "*"
        Action = "kms:*"
        Resource = "*"
        Condition = {
          Bool = {
            "aws:ViaAWSService": "false"
          }
          StringNotEquals = {
            "aws:PrincipalTag/Citizenship": "US"
          }
        }
      }
    ]
  })
}

# VPC with TIC-compliant egress
resource "aws_vpc" "isolated" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true
  
  tags = {
    Name       = "fedramp-high-vpc"
    Compliance = "FedRAMP-High"
  }
}

# No internet gateway - fully isolated
resource "aws_vpc_endpoint" "s3" {
  vpc_id       = aws_vpc.isolated.id
  service_name = "com.amazonaws.us-gov-west-1.s3"
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Principal = "*"
      Action    = ["s3:GetObject", "s3:PutObject"]
      Resource  = "${aws_s3_bucket.ml_artifacts.arn}/*"
    }]
  })
}

32.7.4. Automotive (ISO 26262 / SOTIF)

Autonomous vehicles are safety-critical systems requiring ASIL-D compliance.

# automotive_validation.py - ISO 26262 Compliance

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum


class ASILLevel(Enum):
    QM = 0   # Quality Management (no safety requirement)
    A = 1    # Lowest safety requirement
    B = 2
    C = 3
    D = 4    # Highest safety requirement (steering, braking)


@dataclass
class SafetyCase:
    """ISO 26262 Safety Case Documentation"""
    
    component_name: str
    asil_level: ASILLevel
    hazard_analysis: List[Dict]
    safety_requirements: List[Dict]
    verification_methods: List[Dict]
    validation_results: Dict
    
    def generate_safety_report(self) -> str:
        """Generate ISO 26262 compliant safety report."""
        
        return f"""
# Safety Case Report
## Component: {self.component_name}
## ASIL Level: {self.asil_level.name}

---

## 1. Hazard Analysis and Risk Assessment (HARA)

{self._format_hazards()}

## 2. Safety Requirements

{self._format_requirements()}

## 3. Verification Evidence

{self._format_verification()}

## 4. Validation Summary

- Test Cases Executed: {self.validation_results.get('test_cases', 0)}
- Passed: {self.validation_results.get('passed', 0)}
- Failed: {self.validation_results.get('failed', 0)}
- Coverage: {self.validation_results.get('coverage', 0):.1%}

### Simulation Results
- Virtual Miles: {self.validation_results.get('virtual_miles', 0):,}
- Scenario Coverage: {self.validation_results.get('scenario_coverage', 0):.1%}
- Critical Failures: {self.validation_results.get('critical_failures', 0)}

## 5. Residual Risk Assessment

{self._format_residual_risk()}
"""
    
    def _format_hazards(self) -> str:
        lines = []
        for h in self.hazard_analysis:
            lines.append(f"### Hazard: {h['name']}")
            lines.append(f"- Severity: {h['severity']}")
            lines.append(f"- Exposure: {h['exposure']}")
            lines.append(f"- Controllability: {h['controllability']}")
            lines.append(f"- ASIL: {h['asil']}")
            lines.append("")
        return "\n".join(lines)
    
    def _format_requirements(self) -> str:
        lines = []
        for r in self.safety_requirements:
            lines.append(f"- **{r['id']}**: {r['description']}")
        return "\n".join(lines)
    
    def _format_verification(self) -> str:
        lines = []
        for v in self.verification_methods:
            lines.append(f"### {v['requirement_id']}")
            lines.append(f"- Method: {v['method']}")
            lines.append(f"- Status: {v['status']}")
            lines.append("")
        return "\n".join(lines)
    
    def _format_residual_risk(self) -> str:
        return """
Based on verification and validation activities, residual risks have been
assessed and documented. All residual risks are within acceptable limits
as defined in the project safety plan.
"""

32.7.5. Summary Checklist

IndustryKey RegulationsPrimary ConcernCritical Requirements
HealthcareHIPAA, GxP, FDAPatient SafetyDe-ID, BAA, PCCP
FinanceSR 11-7, ECOAEconomic StabilityModel Inventory, Fair Lending
GovernmentFedRAMP, CMMCNational SecurityFIPS, Air-Gap, US Persons
AutomotiveISO 26262, SOTIFLife SafetyASIL, Simulation Miles

Cross-Industry Compliance Architecture

graph TB
    subgraph "Core Platform"
        A[MLOps Platform]
    end
    
    subgraph "Compliance Overlays"
        B[HIPAA Overlay]
        C[FedRAMP Overlay]
        D[SR 11-7 Overlay]
        E[ISO 26262 Overlay]
    end
    
    A --> B
    A --> C
    A --> D
    A --> E
    
    B --> F[Healthcare Deployment]
    C --> G[Government Deployment]
    D --> H[Financial Deployment]
    E --> I[Automotive Deployment]

Your MLOps platform must support “Overlay Configurations” to adapt to these differing rulesets without rewriting the core infrastructure. This is achieved through:

  1. Parameterized Terraform modules with compliance flags
  2. Policy-as-Code (OPA/Sentinel) for enforcement
  3. Audit trail automation for all regulated activities
  4. Separation of duties in approval workflows

[End of Section 32.7]