32.6. Audit Trails: The Black Box Recorder
Important
The Golden Rule of Audit: If it isn’t logged, it didn’t happen. In regulated environments, the inability to produce a log for a specific prediction is often treated legally as if the system failed.
An ML Audit Trail is different from standard application logging. We don’t just care about “Error: NullPointerException”. We care about the why and the what of every decision.
32.6.1. The Anatomy of a Prediction Log
Standard stdout logging is insufficient. You need structured, schema-compliant logging.
The Canonical Schema
{
"event_id": "uuid-v4-1234...",
"timestamp": "2023-10-27T10:00:00Z",
"request_id": "req-890...",
"model_context": {
"model_name": "loan-approver",
"model_version": "v1.2.4",
"git_sha": "a1b2c3d...",
"container_image": "123.dkr.ecr...:v1.2.4"
},
"inputs": {
"age": 34,
"income": 50000,
"credit_score": 720
},
"outputs": {
"probability": 0.82,
"decision": "APPROVE"
},
"metadata": {
"latency_ms": 45,
"customer_id": "cust-555"
}
}
Log Field Categories
| Category | Fields | Purpose | Retention |
|---|---|---|---|
| Identity | event_id, request_id | Correlation | Forever |
| Temporal | timestamp | Timeline reconstruction | 7 years |
| Context | model_version, git_sha | Reproducibility | 7 years |
| Inputs | All features used | Replay capability | By regulation |
| Outputs | prediction, confidence | Decision record | By regulation |
| Metadata | latency, customer_id | Operations, debugging | 90 days |
32.6.2. Architecture: The Firehose Pattern
Do NOT write logs to a database in the critical path of inference.
graph LR
subgraph "Inference Path"
A[Model Container] -->|STDOUT JSON| B(FluentBit Sidecar)
end
subgraph "Async Pipeline"
B -->|Async Batch| C{Kinesis / Kafka}
C -->|Stream| D[S3 Data Lake]
end
subgraph "Analysis"
D -->|Ingest| E[Athena / BigQuery]
E --> F[Compliance Dashboard]
end
Implementation: FluentBit Configuration
# fluent-bit.yaml
[SERVICE]
Flush 5
Daemon Off
Log_Level info
[INPUT]
Name tail
Path /var/log/containers/*model*.log
Parser json
Tag ml.audit
Mem_Buf_Limit 50MB
[FILTER]
Name parser
Match ml.audit
Key_Name log
Parser json_payload
[OUTPUT]
Name kinesis_firehose
Match ml.audit
region us-east-1
delivery_stream ml-audit-stream
time_key timestamp
time_key_format %Y-%m-%dT%H:%M:%S.%LZ
Terraform: Kinesis Firehose to S3
resource "aws_kinesis_firehose_delivery_stream" "audit_logs" {
name = "ml-audit-logs"
destination = "extended_s3"
extended_s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.audit_logs.arn
prefix = "audit/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
error_output_prefix = "errors/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{firehose:error-output-type}/"
buffering_size = 64 # MB
buffering_interval = 60 # seconds
compression_format = "GZIP"
data_format_conversion_configuration {
enabled = true
input_format_configuration {
deserializer {
open_x_json_ser_de {}
}
}
output_format_configuration {
serializer {
parquet_ser_de {
compression = "SNAPPY"
}
}
}
schema_configuration {
database_name = aws_glue_catalog_database.audit.name
table_name = aws_glue_catalog_table.predictions.name
role_arn = aws_iam_role.firehose.arn
}
}
}
}
# S3 with Object Lock for WORM compliance
resource "aws_s3_bucket" "audit_logs" {
bucket = "ml-audit-logs-${var.environment}"
object_lock_enabled = true
}
resource "aws_s3_bucket_object_lock_configuration" "audit" {
bucket = aws_s3_bucket.audit_logs.id
rule {
default_retention {
mode = "COMPLIANCE"
years = 7
}
}
}
32.6.3. Reproducibility as Audit
The ultimate audit trail is the ability to reproduce the prediction.
Obstacles to Reproducibility
| Obstacle | Cause | Mitigation |
|---|---|---|
| Floating Point Non-determinism | GPU operations | Set seeds, use deterministic mode |
| Dependency Drift | pip install pandas | Pin versions, use lock files |
| Feature Store Drift | Values change over time | Time-travel queries |
| Config Drift | Different parameters | Version config files |
Time-Travel Query Implementation
from datetime import datetime
from typing import Dict, Any
class AuditableFeatureStore:
"""Feature store with time-travel for reproducibility."""
def get_features(
self,
entity_id: str,
feature_names: list,
timestamp: datetime = None
) -> Dict[str, Any]:
"""
Retrieve features as they existed at a specific time.
Args:
entity_id: Customer/entity identifier
feature_names: List of features to retrieve
timestamp: Point-in-time for reconstruction
"""
if timestamp is None:
timestamp = datetime.utcnow()
# Query feature store with temporal filter
query = f"""
SELECT {', '.join(feature_names)}
FROM feature_table
WHERE entity_id = '{entity_id}'
AND event_timestamp <= '{timestamp.isoformat()}'
ORDER BY event_timestamp DESC
LIMIT 1
"""
return self._execute_query(query)
def replay_prediction(
self,
prediction_log: Dict[str, Any]
) -> Dict[str, Any]:
"""
Replay a historical prediction for verification.
Returns the original and replayed outputs for comparison.
"""
# Get model at that version
model = self._load_model_version(
prediction_log['model_context']['model_version']
)
# Get features at that timestamp
features = self.get_features(
entity_id=prediction_log['metadata']['customer_id'],
feature_names=list(prediction_log['inputs'].keys()),
timestamp=datetime.fromisoformat(prediction_log['timestamp'])
)
# Replay
replayed = model.predict(features)
return {
'original': prediction_log['outputs'],
'replayed': replayed,
'match': abs(replayed['probability'] -
prediction_log['outputs']['probability']) < 0.001
}
32.6.4. Chain of Custody (Model Provenance)
Auditors track the chain of custody: Data → Training Job → Artifact → Endpoint.
graph TB
A[Raw Data S3] -->|SHA256: abc...| B[Feature Pipeline]
B -->|SHA256: def...| C[Training Dataset]
C --> D[Training Job j-12345]
D -->|SHA256: ghi...| E[Model Artifact]
E --> F[Model Registry v1.2.4]
F --> G[Endpoint prod-loan-v4]
H[CloudTrail] -->|API Logs| I[Who approved?]
I --> F
Provenance Tracking Implementation
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict
import hashlib
@dataclass
class ProvenanceRecord:
"""Immutable record of an artifact's provenance."""
artifact_id: str
artifact_type: str # 'dataset', 'model', 'endpoint'
created_at: datetime
created_by: str
# Integrity
content_hash: str
# Lineage
parent_artifacts: List[str] = field(default_factory=list)
# Metadata
metadata: Dict = field(default_factory=dict)
class ProvenanceTracker:
"""Track and verify artifact provenance chain."""
def __init__(self, storage_backend):
self.storage = storage_backend
def register_artifact(
self,
artifact_path: str,
artifact_type: str,
created_by: str,
parent_artifacts: List[str] = None
) -> ProvenanceRecord:
"""Register a new artifact with provenance."""
# Compute content hash
content_hash = self._compute_hash(artifact_path)
record = ProvenanceRecord(
artifact_id=f"{artifact_type}/{content_hash[:12]}",
artifact_type=artifact_type,
created_at=datetime.utcnow(),
created_by=created_by,
content_hash=content_hash,
parent_artifacts=parent_artifacts or []
)
# Store immutably (QLDB, blockchain, etc.)
self.storage.store(record)
return record
def verify_chain(self, artifact_id: str) -> Dict:
"""Verify the complete provenance chain."""
record = self.storage.get(artifact_id)
chain = [record]
# Walk the chain
for parent_id in record.parent_artifacts:
parent_chain = self.verify_chain(parent_id)
chain.extend(parent_chain['chain'])
# Verify each link
valid = all(
self._verify_hash(r.artifact_id, r.content_hash)
for r in chain
)
return {
'artifact_id': artifact_id,
'chain': chain,
'valid': valid,
'chain_length': len(chain)
}
def _compute_hash(self, path: str) -> str:
"""Compute SHA256 hash of artifact."""
sha = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
sha.update(chunk)
return sha.hexdigest()
32.6.5. Securing the Logs
Audit logs contain the most sensitive data in your company.
Security Controls
| Control | Implementation | Purpose |
|---|---|---|
| Encryption at Rest | S3 SSE-KMS | Protect stored data |
| Encryption in Transit | TLS 1.3 | Protect data in flight |
| Access Control | Separate AWS Account | Isolation |
| Immutability | S3 Object Lock | Prevent tampering |
| Integrity | SHA256 checksums | Detect tampering |
Terraform: Secure Log Storage
# Separate account for security isolation
resource "aws_s3_bucket" "audit_logs" {
bucket = "ml-audit-logs-secure"
object_lock_enabled = true
}
# KMS encryption
resource "aws_kms_key" "audit" {
description = "Audit log encryption key"
deletion_window_in_days = 30
enable_key_rotation = true
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AuditLogAccess"
Effect = "Allow"
Principal = {
AWS = [
"arn:aws:iam::${var.security_account_id}:role/AuditReader",
"arn:aws:iam::${var.security_account_id}:role/ComplianceOfficer"
]
}
Action = [
"kms:Decrypt",
"kms:DescribeKey"
]
Resource = "*"
}
]
})
}
resource "aws_s3_bucket_server_side_encryption_configuration" "audit" {
bucket = aws_s3_bucket.audit_logs.id
rule {
apply_server_side_encryption_by_default {
kms_master_key_id = aws_kms_key.audit.arn
sse_algorithm = "aws:kms"
}
bucket_key_enabled = true
}
}
# IAM: Read-only access even for admins
resource "aws_iam_policy" "audit_read_only" {
name = "AuditLogReadOnly"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.audit_logs.arn,
"${aws_s3_bucket.audit_logs.arn}/*"
]
},
{
Effect = "Deny"
Action = [
"s3:DeleteObject",
"s3:PutObject"
]
Resource = "${aws_s3_bucket.audit_logs.arn}/*"
}
]
})
}
32.6.6. The Merkle Tree Ledger
S3 Object Lock protects against deletion, but how do you protect against silent modification?
graph TB
A[Block 1: Hash Events 1-100] -->|0xABC| B[Block 2]
B[Block 2: Hash Events 101-200 + 0xABC] -->|0xDEF| C[Block 3]
C[Block 3: Hash Events 201-300 + 0xDEF] -->|0xGHI| D[...]
E[Modified Event 50] -.->|Invalidates| A
A -.->|Breaks| B
B -.->|Breaks| C
AWS QLDB Integration
from pyqldb.driver.qldb_driver import QldbDriver
import hashlib
import json
class AuditLedger:
"""Immutable ledger for audit log verification."""
def __init__(self, ledger_name: str):
self.driver = QldbDriver(ledger_name)
def record_log_batch(
self,
s3_uri: str,
etag: str,
sha256: str,
record_count: int
):
"""Record a log file in the immutable ledger."""
def insert(executor):
executor.execute_statement(
"""
INSERT INTO AuditLogRecords
<< {
's3Uri': ?,
'etag': ?,
'sha256': ?,
'recordCount': ?,
'recordedAt': ?
} >>
""",
s3_uri, etag, sha256, record_count,
datetime.utcnow().isoformat()
)
self.driver.execute_lambda(insert)
def verify_log_file(self, s3_uri: str, current_sha256: str) -> bool:
"""Verify a log file hasn't been tampered with."""
def query(executor):
result = executor.execute_statement(
"SELECT sha256 FROM AuditLogRecords WHERE s3Uri = ?",
s3_uri
)
return list(result)
records = self.driver.execute_lambda(query)
if not records:
return False # Not registered
original_sha256 = records[0]['sha256']
return original_sha256 == current_sha256
32.6.7. OpenLineage Standard
Proprietary logging schemas create vendor lock-in.
{
"eventType": "RUN_COMPLETED",
"eventTime": "2023-10-27T10:00:00.000Z",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "my-namespace",
"name": "train_model_v4"
},
"inputs": [
{
"namespace": "s3://my-bucket",
"name": "training_data.parquet"
}
],
"outputs": [
{
"namespace": "sagemaker-registry",
"name": "model_artifact_v4.tar.gz"
}
]
}
32.6.8. Retention Policies
| Regulation | Retention | Log Type | Tier |
|---|---|---|---|
| GDPR | Minimal | PII | Delete ASAP |
| SOX | 7 years | Financial | Glacier |
| HIPAA | 6 years | Healthcare | Glacier |
| Tax | 7 years | Revenue | Glacier |
S3 Lifecycle Policy
resource "aws_s3_bucket_lifecycle_configuration" "audit" {
bucket = aws_s3_bucket.audit_logs.id
rule {
id = "audit-tiering"
status = "Enabled"
transition {
days = 30
storage_class = "STANDARD_IA"
}
transition {
days = 365
storage_class = "GLACIER"
}
expiration {
days = 2555 # 7 years
}
}
}
32.6.9. SOX 404 Compliance Checklist
| Control | Evidence Required | Implementation |
|---|---|---|
| Access Control | Segregation of duties | IAM roles, approval gates |
| Change Management | Audit trail of changes | Git commits, JIRA tickets |
| Validation | Test evidence | CI/CD test reports |
| Monitoring | Alerting proof | PagerDuty incidents |
[End of Section 32.6]