Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

32.6. Audit Trails: The Black Box Recorder

Important

The Golden Rule of Audit: If it isn’t logged, it didn’t happen. In regulated environments, the inability to produce a log for a specific prediction is often treated legally as if the system failed.

An ML Audit Trail is different from standard application logging. We don’t just care about “Error: NullPointerException”. We care about the why and the what of every decision.


32.6.1. The Anatomy of a Prediction Log

Standard stdout logging is insufficient. You need structured, schema-compliant logging.

The Canonical Schema

{
  "event_id": "uuid-v4-1234...",
  "timestamp": "2023-10-27T10:00:00Z",
  "request_id": "req-890...",
  "model_context": {
    "model_name": "loan-approver",
    "model_version": "v1.2.4",
    "git_sha": "a1b2c3d...",
    "container_image": "123.dkr.ecr...:v1.2.4"
  },
  "inputs": {
    "age": 34,
    "income": 50000,
    "credit_score": 720
  },
  "outputs": {
    "probability": 0.82,
    "decision": "APPROVE"
  },
  "metadata": {
    "latency_ms": 45,
    "customer_id": "cust-555"
  }
}

Log Field Categories

CategoryFieldsPurposeRetention
Identityevent_id, request_idCorrelationForever
TemporaltimestampTimeline reconstruction7 years
Contextmodel_version, git_shaReproducibility7 years
InputsAll features usedReplay capabilityBy regulation
Outputsprediction, confidenceDecision recordBy regulation
Metadatalatency, customer_idOperations, debugging90 days

32.6.2. Architecture: The Firehose Pattern

Do NOT write logs to a database in the critical path of inference.

graph LR
    subgraph "Inference Path"
        A[Model Container] -->|STDOUT JSON| B(FluentBit Sidecar)
    end
    
    subgraph "Async Pipeline"
        B -->|Async Batch| C{Kinesis / Kafka}
        C -->|Stream| D[S3 Data Lake]
    end
    
    subgraph "Analysis"
        D -->|Ingest| E[Athena / BigQuery]
        E --> F[Compliance Dashboard]
    end

Implementation: FluentBit Configuration

# fluent-bit.yaml
[SERVICE]
    Flush        5
    Daemon       Off
    Log_Level    info

[INPUT]
    Name         tail
    Path         /var/log/containers/*model*.log
    Parser       json
    Tag          ml.audit
    Mem_Buf_Limit 50MB

[FILTER]
    Name         parser
    Match        ml.audit
    Key_Name     log
    Parser       json_payload

[OUTPUT]
    Name         kinesis_firehose
    Match        ml.audit
    region       us-east-1
    delivery_stream ml-audit-stream
    time_key     timestamp
    time_key_format %Y-%m-%dT%H:%M:%S.%LZ

Terraform: Kinesis Firehose to S3

resource "aws_kinesis_firehose_delivery_stream" "audit_logs" {
  name        = "ml-audit-logs"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn   = aws_iam_role.firehose.arn
    bucket_arn = aws_s3_bucket.audit_logs.arn
    
    prefix              = "audit/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
    error_output_prefix = "errors/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{firehose:error-output-type}/"
    
    buffering_size     = 64   # MB
    buffering_interval = 60   # seconds
    compression_format = "GZIP"
    
    data_format_conversion_configuration {
      enabled = true
      
      input_format_configuration {
        deserializer {
          open_x_json_ser_de {}
        }
      }
      
      output_format_configuration {
        serializer {
          parquet_ser_de {
            compression = "SNAPPY"
          }
        }
      }
      
      schema_configuration {
        database_name = aws_glue_catalog_database.audit.name
        table_name    = aws_glue_catalog_table.predictions.name
        role_arn      = aws_iam_role.firehose.arn
      }
    }
  }
}

# S3 with Object Lock for WORM compliance
resource "aws_s3_bucket" "audit_logs" {
  bucket = "ml-audit-logs-${var.environment}"
  
  object_lock_enabled = true
}

resource "aws_s3_bucket_object_lock_configuration" "audit" {
  bucket = aws_s3_bucket.audit_logs.id

  rule {
    default_retention {
      mode = "COMPLIANCE"
      years = 7
    }
  }
}

32.6.3. Reproducibility as Audit

The ultimate audit trail is the ability to reproduce the prediction.

Obstacles to Reproducibility

ObstacleCauseMitigation
Floating Point Non-determinismGPU operationsSet seeds, use deterministic mode
Dependency Driftpip install pandasPin versions, use lock files
Feature Store DriftValues change over timeTime-travel queries
Config DriftDifferent parametersVersion config files

Time-Travel Query Implementation

from datetime import datetime
from typing import Dict, Any

class AuditableFeatureStore:
    """Feature store with time-travel for reproducibility."""
    
    def get_features(
        self,
        entity_id: str,
        feature_names: list,
        timestamp: datetime = None
    ) -> Dict[str, Any]:
        """
        Retrieve features as they existed at a specific time.
        
        Args:
            entity_id: Customer/entity identifier
            feature_names: List of features to retrieve
            timestamp: Point-in-time for reconstruction
        """
        if timestamp is None:
            timestamp = datetime.utcnow()
        
        # Query feature store with temporal filter
        query = f"""
        SELECT {', '.join(feature_names)}
        FROM feature_table
        WHERE entity_id = '{entity_id}'
        AND event_timestamp <= '{timestamp.isoformat()}'
        ORDER BY event_timestamp DESC
        LIMIT 1
        """
        
        return self._execute_query(query)
    
    def replay_prediction(
        self,
        prediction_log: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Replay a historical prediction for verification.
        
        Returns the original and replayed outputs for comparison.
        """
        # Get model at that version
        model = self._load_model_version(
            prediction_log['model_context']['model_version']
        )
        
        # Get features at that timestamp
        features = self.get_features(
            entity_id=prediction_log['metadata']['customer_id'],
            feature_names=list(prediction_log['inputs'].keys()),
            timestamp=datetime.fromisoformat(prediction_log['timestamp'])
        )
        
        # Replay
        replayed = model.predict(features)
        
        return {
            'original': prediction_log['outputs'],
            'replayed': replayed,
            'match': abs(replayed['probability'] - 
                        prediction_log['outputs']['probability']) < 0.001
        }

32.6.4. Chain of Custody (Model Provenance)

Auditors track the chain of custody: Data → Training Job → Artifact → Endpoint.

graph TB
    A[Raw Data S3] -->|SHA256: abc...| B[Feature Pipeline]
    B -->|SHA256: def...| C[Training Dataset]
    C --> D[Training Job j-12345]
    D -->|SHA256: ghi...| E[Model Artifact]
    E --> F[Model Registry v1.2.4]
    F --> G[Endpoint prod-loan-v4]
    
    H[CloudTrail] -->|API Logs| I[Who approved?]
    I --> F

Provenance Tracking Implementation

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict
import hashlib

@dataclass
class ProvenanceRecord:
    """Immutable record of an artifact's provenance."""
    artifact_id: str
    artifact_type: str  # 'dataset', 'model', 'endpoint'
    created_at: datetime
    created_by: str
    
    # Integrity
    content_hash: str
    
    # Lineage
    parent_artifacts: List[str] = field(default_factory=list)
    
    # Metadata
    metadata: Dict = field(default_factory=dict)

class ProvenanceTracker:
    """Track and verify artifact provenance chain."""
    
    def __init__(self, storage_backend):
        self.storage = storage_backend
    
    def register_artifact(
        self,
        artifact_path: str,
        artifact_type: str,
        created_by: str,
        parent_artifacts: List[str] = None
    ) -> ProvenanceRecord:
        """Register a new artifact with provenance."""
        
        # Compute content hash
        content_hash = self._compute_hash(artifact_path)
        
        record = ProvenanceRecord(
            artifact_id=f"{artifact_type}/{content_hash[:12]}",
            artifact_type=artifact_type,
            created_at=datetime.utcnow(),
            created_by=created_by,
            content_hash=content_hash,
            parent_artifacts=parent_artifacts or []
        )
        
        # Store immutably (QLDB, blockchain, etc.)
        self.storage.store(record)
        
        return record
    
    def verify_chain(self, artifact_id: str) -> Dict:
        """Verify the complete provenance chain."""
        
        record = self.storage.get(artifact_id)
        chain = [record]
        
        # Walk the chain
        for parent_id in record.parent_artifacts:
            parent_chain = self.verify_chain(parent_id)
            chain.extend(parent_chain['chain'])
        
        # Verify each link
        valid = all(
            self._verify_hash(r.artifact_id, r.content_hash)
            for r in chain
        )
        
        return {
            'artifact_id': artifact_id,
            'chain': chain,
            'valid': valid,
            'chain_length': len(chain)
        }
    
    def _compute_hash(self, path: str) -> str:
        """Compute SHA256 hash of artifact."""
        sha = hashlib.sha256()
        with open(path, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                sha.update(chunk)
        return sha.hexdigest()

32.6.5. Securing the Logs

Audit logs contain the most sensitive data in your company.

Security Controls

ControlImplementationPurpose
Encryption at RestS3 SSE-KMSProtect stored data
Encryption in TransitTLS 1.3Protect data in flight
Access ControlSeparate AWS AccountIsolation
ImmutabilityS3 Object LockPrevent tampering
IntegritySHA256 checksumsDetect tampering

Terraform: Secure Log Storage

# Separate account for security isolation
resource "aws_s3_bucket" "audit_logs" {
  bucket = "ml-audit-logs-secure"
  
  object_lock_enabled = true
}

# KMS encryption
resource "aws_kms_key" "audit" {
  description             = "Audit log encryption key"
  deletion_window_in_days = 30
  enable_key_rotation     = true
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "AuditLogAccess"
        Effect    = "Allow"
        Principal = {
          AWS = [
            "arn:aws:iam::${var.security_account_id}:role/AuditReader",
            "arn:aws:iam::${var.security_account_id}:role/ComplianceOfficer"
          ]
        }
        Action = [
          "kms:Decrypt",
          "kms:DescribeKey"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_s3_bucket_server_side_encryption_configuration" "audit" {
  bucket = aws_s3_bucket.audit_logs.id

  rule {
    apply_server_side_encryption_by_default {
      kms_master_key_id = aws_kms_key.audit.arn
      sse_algorithm     = "aws:kms"
    }
    bucket_key_enabled = true
  }
}

# IAM: Read-only access even for admins
resource "aws_iam_policy" "audit_read_only" {
  name = "AuditLogReadOnly"
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.audit_logs.arn,
          "${aws_s3_bucket.audit_logs.arn}/*"
        ]
      },
      {
        Effect = "Deny"
        Action = [
          "s3:DeleteObject",
          "s3:PutObject"
        ]
        Resource = "${aws_s3_bucket.audit_logs.arn}/*"
      }
    ]
  })
}

32.6.6. The Merkle Tree Ledger

S3 Object Lock protects against deletion, but how do you protect against silent modification?

graph TB
    A[Block 1: Hash Events 1-100] -->|0xABC| B[Block 2]
    B[Block 2: Hash Events 101-200 + 0xABC] -->|0xDEF| C[Block 3]
    C[Block 3: Hash Events 201-300 + 0xDEF] -->|0xGHI| D[...]
    
    E[Modified Event 50] -.->|Invalidates| A
    A -.->|Breaks| B
    B -.->|Breaks| C

AWS QLDB Integration

from pyqldb.driver.qldb_driver import QldbDriver
import hashlib
import json

class AuditLedger:
    """Immutable ledger for audit log verification."""
    
    def __init__(self, ledger_name: str):
        self.driver = QldbDriver(ledger_name)
    
    def record_log_batch(
        self,
        s3_uri: str,
        etag: str,
        sha256: str,
        record_count: int
    ):
        """Record a log file in the immutable ledger."""
        
        def insert(executor):
            executor.execute_statement(
                """
                INSERT INTO AuditLogRecords
                << {
                    's3Uri': ?,
                    'etag': ?,
                    'sha256': ?,
                    'recordCount': ?,
                    'recordedAt': ?
                } >>
                """,
                s3_uri, etag, sha256, record_count,
                datetime.utcnow().isoformat()
            )
        
        self.driver.execute_lambda(insert)
    
    def verify_log_file(self, s3_uri: str, current_sha256: str) -> bool:
        """Verify a log file hasn't been tampered with."""
        
        def query(executor):
            result = executor.execute_statement(
                "SELECT sha256 FROM AuditLogRecords WHERE s3Uri = ?",
                s3_uri
            )
            return list(result)
        
        records = self.driver.execute_lambda(query)
        
        if not records:
            return False  # Not registered
        
        original_sha256 = records[0]['sha256']
        return original_sha256 == current_sha256

32.6.7. OpenLineage Standard

Proprietary logging schemas create vendor lock-in.

{
  "eventType": "RUN_COMPLETED",
  "eventTime": "2023-10-27T10:00:00.000Z",
  "run": {
    "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
  },
  "job": {
    "namespace": "my-namespace",
    "name": "train_model_v4"
  },
  "inputs": [
    {
      "namespace": "s3://my-bucket",
      "name": "training_data.parquet"
    }
  ],
  "outputs": [
    {
      "namespace": "sagemaker-registry",
      "name": "model_artifact_v4.tar.gz"
    }
  ]
}

32.6.8. Retention Policies

RegulationRetentionLog TypeTier
GDPRMinimalPIIDelete ASAP
SOX7 yearsFinancialGlacier
HIPAA6 yearsHealthcareGlacier
Tax7 yearsRevenueGlacier

S3 Lifecycle Policy

resource "aws_s3_bucket_lifecycle_configuration" "audit" {
  bucket = aws_s3_bucket.audit_logs.id

  rule {
    id     = "audit-tiering"
    status = "Enabled"

    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }

    transition {
      days          = 365
      storage_class = "GLACIER"
    }

    expiration {
      days = 2555  # 7 years
    }
  }
}

32.6.9. SOX 404 Compliance Checklist

ControlEvidence RequiredImplementation
Access ControlSegregation of dutiesIAM roles, approval gates
Change ManagementAudit trail of changesGit commits, JIRA tickets
ValidationTest evidenceCI/CD test reports
MonitoringAlerting proofPagerDuty incidents

[End of Section 32.6]