Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

32.4. Dataset Licensing & Attribution: The IP Supply Chain

Caution

The Poisoned Well: If you train on GPL-licensed code, your entire model could be subject to “copyleft” requirements. One contaminated dataset can create years of legal exposure.


32.4.1. License Types for AI

Understanding license compatibility is critical for commercial AI:

LicenseTypeCommercial UseTraining Safe?Redistribution
CC0Public DomainNo restrictions
MITPermissiveKeep license file
Apache 2.0PermissiveKeep license + NOTICE
BSD-3PermissiveKeep license
CC-BYAttribution✓ with attributionCredit author
CC-BY-SAShareAlike⚠️ Output may need same licenseShare alike
GPL-2.0Strong Copyleft⚠️ High riskSource disclosure
GPL-3.0Strong Copyleft⚠️ High riskSource + patents
LGPLWeak Copyleft⚠️ Medium riskLibrary linking OK
CC-NCNon-CommercialCommercial prohibited
CC-NDNo Derivatives?⚠️ Gray areaIs training a “derivative”?
ProprietaryVariesCheck ToSCheck ToSUsually prohibited

The Training-as-Derivative Debate

graph TD
    A[Training Data] --> B{Is model a<br>'derivative work'?}
    B -->|Legal Position 1| C[Yes: Model inherits license]
    B -->|Legal Position 2| D[No: Model is transformation]
    C --> E[GPL model must be open]
    D --> F[Commercial use OK]
    
    G[Current Status] --> H[Unsettled law]
    H --> I[Conservative approach:<br>Assume derivative]

License Risk Matrix

Data TypeLow RiskMedium RiskHigh Risk
TextCC0, WikipediaBooks3, arXivWeb scraping
ImagesLAION-5B-CC0LAION-2BGetty, stock photos
CodeApache reposMIT reposGPL repos
AudioLibriSpeechYouTubeCommercial music
VideoKineticsYouTube-8MMovies, streaming

32.4.2. The License Lake Architecture

Segregate data by license zone to prevent contamination:

graph TB
    A[Raw Data Ingestion] --> B{License Scanner}
    B -->|CC0/MIT/Apache| C[Zone Green<br>Commercial OK]
    B -->|CC-BY/CC-BY-SA| D[Zone Yellow<br>Attribution Required]
    B -->|GPL/LGPL/Unknown| E[Zone Red<br>Quarantine]
    B -->|CC-NC/ND/Proprietary| F[Zone Black<br>DO NOT USE]
    
    C --> G[Production Training]
    D --> H[Attribution Pipeline]
    E --> I[Legal Review]
    F --> J[Delete or Request License]
    
    subgraph "Access Control"
        G
        H
        I
        J
    end

Terraform: Zone-Based Access Control

# data_lake_zones.tf

variable "environment" {
  type = string
}

# Zone definitions
locals {
  zones = {
    green = {
      description = "Commercial use permitted"
      allowed_licenses = ["cc0-1.0", "mit", "apache-2.0", "bsd-3-clause"]
    }
    yellow = {
      description = "Attribution required"
      allowed_licenses = ["cc-by-4.0", "cc-by-3.0"]
    }
    red = {
      description = "Legal review required"
      allowed_licenses = ["gpl-2.0", "gpl-3.0", "lgpl-2.1", "unknown"]
    }
    black = {
      description = "DO NOT USE"
      allowed_licenses = ["cc-nc", "cc-nd", "proprietary"]
    }
  }
}

# S3 buckets per zone
resource "aws_s3_bucket" "data_zone" {
  for_each = local.zones
  
  bucket = "data-lake-${each.key}-${var.environment}"
  
  tags = {
    Zone        = each.key
    Description = each.value.description
    Environment = var.environment
    ManagedBy   = "terraform"
  }
}

# Block public access for all zones
resource "aws_s3_bucket_public_access_block" "data_zone" {
  for_each = aws_s3_bucket.data_zone
  
  bucket = each.value.id
  
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# Commercial training can only access green zone
resource "aws_iam_policy" "commercial_training" {
  name        = "CommercialTrainingAccess-${var.environment}"
  description = "Access to commercially safe training data"
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid      = "AllowGreenZone"
        Effect   = "Allow"
        Action   = ["s3:GetObject", "s3:ListBucket"]
        Resource = [
          aws_s3_bucket.data_zone["green"].arn,
          "${aws_s3_bucket.data_zone["green"].arn}/*"
        ]
      },
      {
        Sid      = "DenyOtherZones"
        Effect   = "Deny"
        Action   = ["s3:*"]
        Resource = flatten([
          for zone in ["yellow", "red", "black"] : [
            aws_s3_bucket.data_zone[zone].arn,
            "${aws_s3_bucket.data_zone[zone].arn}/*"
          ]
        ])
      }
    ]
  })
}

# Research can access green + yellow with attribution tracking
resource "aws_iam_policy" "research_training" {
  name        = "ResearchTrainingAccess-${var.environment}"
  description = "Access to research data with attribution requirements"
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid      = "AllowGreenYellowZones"
        Effect   = "Allow"
        Action   = ["s3:GetObject", "s3:ListBucket"]
        Resource = flatten([
          for zone in ["green", "yellow"] : [
            aws_s3_bucket.data_zone[zone].arn,
            "${aws_s3_bucket.data_zone[zone].arn}/*"
          ]
        ])
      },
      {
        Sid      = "DenyRestrictedZones"
        Effect   = "Deny"
        Action   = ["s3:*"]
        Resource = flatten([
          for zone in ["red", "black"] : [
            aws_s3_bucket.data_zone[zone].arn,
            "${aws_s3_bucket.data_zone[zone].arn}/*"
          ]
        ])
      }
    ]
  })
}

# Legal team can review red zone
resource "aws_iam_policy" "legal_review" {
  name        = "LegalReviewAccess-${var.environment}"
  description = "Read access to quarantined data for legal review"
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid      = "AllowRedZoneRead"
        Effect   = "Allow"
        Action   = ["s3:GetObject", "s3:ListBucket"]
        Resource = [
          aws_s3_bucket.data_zone["red"].arn,
          "${aws_s3_bucket.data_zone["red"].arn}/*"
        ]
      }
    ]
  })
}

32.4.3. Data Bill of Materials (DataBOM)

Like SBOM for software, DataBOM tracks the provenance of training data:

{
  "spdxVersion": "SPDX-2.3",
  "dataFormatVersion": "1.0",
  "creationInfo": {
    "created": "2024-01-15T10:30:00Z",
    "creators": ["Tool: DataBOM-Generator-1.0", "Organization: Acme Corp"],
    "licenseListVersion": "3.21"
  },
  "documentName": "TrainingData-Manifest-v4",
  "documentNamespace": "https://acme.com/databom/training-v4",
  "packages": [
    {
      "name": "wikipedia-en-2024",
      "downloadLocation": "https://dumps.wikimedia.org/enwiki/20240101/",
      "filesAnalyzed": true,
      "licenseConcluded": "CC-BY-SA-4.0",
      "licenseDeclared": "CC-BY-SA-4.0",
      "copyrightText": "Wikipedia contributors, Wikimedia Foundation",
      "supplier": "Organization: Wikimedia Foundation",
      "checksums": [
        {
          "algorithm": "SHA256",
          "checksumValue": "a1b2c3d4e5f6..."
        }
      ],
      "attributionTexts": [
        "Content from Wikipedia, the free encyclopedia, under CC BY-SA 4.0"
      ],
      "annotations": [
        {
          "annotationType": "OTHER",
          "annotator": "Tool: LicenseScanner",
          "annotationDate": "2024-01-10T08:00:00Z",
          "comment": "All articles verified as CC-BY-SA"
        }
      ]
    },
    {
      "name": "internal-support-tickets",
      "downloadLocation": "NOASSERTION",
      "filesAnalyzed": true,
      "licenseConcluded": "Proprietary",
      "licenseDeclared": "Proprietary",
      "copyrightText": "Acme Corp 2020-2024",
      "supplier": "Organization: Acme Corp",
      "annotations": [
        {
          "annotationType": "OTHER",
          "annotator": "Person: Legal Counsel",
          "annotationDate": "2024-01-12T14:00:00Z",
          "comment": "Verified: Customer consent obtained for AI training"
        }
      ]
    },
    {
      "name": "github-code-samples",
      "downloadLocation": "https://github.com/...",
      "filesAnalyzed": true,
      "licenseConcluded": "(MIT OR Apache-2.0)",
      "licenseInfoInFile": ["MIT", "Apache-2.0"],
      "copyrightText": "Various contributors",
      "supplier": "Organization: GitHub",
      "externalRefs": [
        {
          "referenceCategory": "SECURITY",
          "referenceType": "cpe23Type",
          "referenceLocator": "cpe:2.3:*:*:*:*:*:*:*:*"
        }
      ]
    }
  ],
  "files": [
    {
      "fileName": "corpus/wikipedia.parquet",
      "SPDXID": "SPDXRef-File-Wikipedia",
      "licenseConcluded": "CC-BY-SA-4.0",
      "copyrightText": "Wikimedia Foundation",
      "checksums": [
        {"algorithm": "SHA256", "checksumValue": "a1b2c3..."}
      ]
    }
  ],
  "relationships": [
    {
      "spdxElementId": "SPDXRef-DOCUMENT",
      "relationshipType": "DESCRIBES",
      "relatedSpdxElement": "SPDXRef-Package-wikipedia-en-2024"
    }
  ]
}

DataBOM Generator

import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict
from dataclasses import dataclass, field, asdict

@dataclass
class DataSource:
    name: str
    location: str
    license_concluded: str
    license_declared: str
    copyright_text: str
    supplier: str
    checksum: Optional[str] = None
    attribution_texts: List[str] = field(default_factory=list)
    annotations: List[Dict] = field(default_factory=list)

@dataclass 
class DataBOM:
    document_name: str
    namespace: str
    creator: str
    sources: List[DataSource] = field(default_factory=list)
    
    def add_source(self, source: DataSource) -> None:
        self.sources.append(source)
    
    def to_spdx(self) -> dict:
        """Export to SPDX format."""
        return {
            "spdxVersion": "SPDX-2.3",
            "dataFormatVersion": "1.0",
            "creationInfo": {
                "created": datetime.utcnow().isoformat() + "Z",
                "creators": [self.creator],
            },
            "documentName": self.document_name,
            "documentNamespace": self.namespace,
            "packages": [
                {
                    "name": src.name,
                    "downloadLocation": src.location,
                    "licenseConcluded": src.license_concluded,
                    "licenseDeclared": src.license_declared,
                    "copyrightText": src.copyright_text,
                    "supplier": src.supplier,
                    "checksums": [{"algorithm": "SHA256", "value": src.checksum}] if src.checksum else [],
                    "attributionTexts": src.attribution_texts,
                    "annotations": src.annotations
                }
                for src in self.sources
            ]
        }
    
    def save(self, path: str) -> None:
        """Save DataBOM to file."""
        with open(path, 'w') as f:
            json.dump(self.to_spdx(), f, indent=2)
    
    @classmethod
    def load(cls, path: str) -> 'DataBOM':
        """Load DataBOM from file."""
        with open(path) as f:
            data = json.load(f)
        
        bom = cls(
            document_name=data["documentName"],
            namespace=data["documentNamespace"],
            creator=data["creationInfo"]["creators"][0]
        )
        
        for pkg in data.get("packages", []):
            source = DataSource(
                name=pkg["name"],
                location=pkg["downloadLocation"],
                license_concluded=pkg["licenseConcluded"],
                license_declared=pkg.get("licenseDeclared", pkg["licenseConcluded"]),
                copyright_text=pkg["copyrightText"],
                supplier=pkg["supplier"],
                attribution_texts=pkg.get("attributionTexts", [])
            )
            bom.add_source(source)
        
        return bom


def calculate_file_checksum(file_path: str) -> str:
    """Calculate SHA256 checksum of a file."""
    sha256_hash = hashlib.sha256()
    
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            sha256_hash.update(chunk)
    
    return sha256_hash.hexdigest()


# Usage
bom = DataBOM(
    document_name="ProductionTrainingData-v2",
    namespace="https://company.com/databom/prod-v2",
    creator="Tool: DataBOM-Generator"
)

bom.add_source(DataSource(
    name="wikipedia-corpus",
    location="s3://data-lake/wikipedia/2024-01/",
    license_concluded="CC-BY-SA-4.0",
    license_declared="CC-BY-SA-4.0",
    copyright_text="Wikimedia Foundation",
    supplier="Organization: Wikimedia",
    checksum=calculate_file_checksum("wikipedia.parquet"),
    attribution_texts=["Wikipedia contributors"]
))

bom.save("databom.spdx.json")

32.4.4. License Scanning Pipeline

Automated scanning prevents contamination:

import json
import subprocess
from pathlib import Path
from typing import Dict, List, Set, Optional
from dataclasses import dataclass
from enum import Enum

class LicenseZone(Enum):
    GREEN = "green"
    YELLOW = "yellow"
    RED = "red"
    BLACK = "black"

@dataclass
class LicenseResult:
    file_path: str
    licenses: List[str]
    confidence: float
    zone: LicenseZone

class LicenseScanner:
    """Scan datasets for license information."""
    
    # License categorization
    GREEN_LICENSES: Set[str] = {
        "mit", "apache-2.0", "bsd-2-clause", "bsd-3-clause",
        "cc0-1.0", "unlicense", "wtfpl", "isc", "zlib"
    }
    
    YELLOW_LICENSES: Set[str] = {
        "cc-by-4.0", "cc-by-3.0", "cc-by-2.5", "cc-by-2.0",
        "cc-by-sa-4.0", "cc-by-sa-3.0", "ofl-1.1"
    }
    
    RED_LICENSES: Set[str] = {
        "gpl-2.0", "gpl-3.0", "lgpl-2.1", "lgpl-3.0",
        "agpl-3.0", "mpl-2.0", "eupl-1.2"
    }
    
    BLACK_LICENSES: Set[str] = {
        "cc-by-nc-4.0", "cc-by-nc-3.0", "cc-by-nd-4.0",
        "cc-by-nc-nd-4.0", "proprietary", "all-rights-reserved"
    }
    
    def __init__(self, scancode_path: str = "scancode"):
        self.scancode_path = scancode_path
    
    def scan_directory(self, data_path: str, output_path: str = "scan.json") -> dict:
        """Scan directory for licenses using ScanCode."""
        cmd = [
            self.scancode_path,
            "--license",
            "--license-text",
            "--copyright",
            "--info",
            "--classify",
            "--json-pp", output_path,
            "--processes", "4",
            data_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            raise RuntimeError(f"ScanCode failed: {result.stderr}")
        
        with open(output_path) as f:
            return json.load(f)
    
    def categorize_license(self, license_key: str) -> LicenseZone:
        """Categorize a license into a zone."""
        license_lower = license_key.lower()
        
        if license_lower in self.GREEN_LICENSES:
            return LicenseZone.GREEN
        elif license_lower in self.YELLOW_LICENSES:
            return LicenseZone.YELLOW
        elif license_lower in self.RED_LICENSES:
            return LicenseZone.RED
        elif license_lower in self.BLACK_LICENSES:
            return LicenseZone.BLACK
        else:
            return LicenseZone.RED  # Unknown = quarantine
    
    def categorize_files(self, scan_results: dict) -> Dict[LicenseZone, List[LicenseResult]]:
        """Categorize scanned files by license zone."""
        
        zones = {zone: [] for zone in LicenseZone}
        
        for file_entry in scan_results.get("files", []):
            path = file_entry.get("path", "")
            licenses = file_entry.get("licenses", [])
            
            if not licenses:
                # No license detected = quarantine
                result = LicenseResult(
                    file_path=path,
                    licenses=["unknown"],
                    confidence=0.0,
                    zone=LicenseZone.RED
                )
                zones[LicenseZone.RED].append(result)
                continue
            
            # Get most restrictive license (worst case)
            file_zone = LicenseZone.GREEN
            license_keys = []
            max_confidence = 0.0
            
            for lic in licenses:
                license_key = lic.get("key", "unknown")
                confidence = lic.get("score", 0) / 100.0
                license_keys.append(license_key)
                max_confidence = max(max_confidence, confidence)
                
                license_zone = self.categorize_license(license_key)
                
                # Take most restrictive
                if license_zone.value > file_zone.value:
                    file_zone = license_zone
            
            result = LicenseResult(
                file_path=path,
                licenses=license_keys,
                confidence=max_confidence,
                zone=file_zone
            )
            zones[file_zone].append(result)
        
        return zones
    
    def generate_report(self, zones: Dict[LicenseZone, List[LicenseResult]]) -> str:
        """Generate human-readable report."""
        
        lines = ["# License Scan Report\n"]
        
        for zone in LicenseZone:
            files = zones[zone]
            lines.append(f"\n## {zone.name} Zone ({len(files)} files)\n")
            
            if zone == LicenseZone.GREEN:
                lines.append("✅ Safe for commercial training\n")
            elif zone == LicenseZone.YELLOW:
                lines.append("⚠️ Attribution required\n")
            elif zone == LicenseZone.RED:
                lines.append("🔴 Requires legal review\n")
            elif zone == LicenseZone.BLACK:
                lines.append("⛔ DO NOT USE for training\n")
            
            for result in files[:10]:  # Show first 10
                lines.append(f"- `{result.file_path}`: {', '.join(result.licenses)}")
            
            if len(files) > 10:
                lines.append(f"- ... and {len(files) - 10} more")
        
        return "\n".join(lines)


# CI/CD Integration
def scan_and_gate(data_path: str, allow_yellow: bool = False) -> bool:
    """Gate function for CI/CD pipeline."""
    
    scanner = LicenseScanner()
    
    print(f"Scanning {data_path}...")
    results = scanner.scan_directory(data_path)
    zones = scanner.categorize_files(results)
    
    print(scanner.generate_report(zones))
    
    # Fail if any red or black
    if zones[LicenseZone.RED] or zones[LicenseZone.BLACK]:
        print("❌ FAILED: Found restricted licenses")
        return False
    
    # Optionally fail on yellow
    if not allow_yellow and zones[LicenseZone.YELLOW]:
        print("❌ FAILED: Found attribution-required licenses")
        return False
    
    print("✅ PASSED: All licenses acceptable")
    return True

GitHub Actions Integration

# .github/workflows/license-scan.yaml
name: License Scan

on:
  push:
    paths:
      - 'data/**'
  pull_request:
    paths:
      - 'data/**'

jobs:
  scan:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
        with:
          lfs: true  # Fetch large files
      
      - name: Install ScanCode
        run: |
          pip install scancode-toolkit
      
      - name: Run License Scan
        run: |
          python scripts/license_scan.py data/ --output scan-results.json
      
      - name: Upload Results
        uses: actions/upload-artifact@v4
        with:
          name: license-scan-results
          path: scan-results.json
      
      - name: Check for Violations
        run: |
          python scripts/check_licenses.py scan-results.json --fail-on-yellow

32.4.5. Attribution System

For CC-BY and similar licenses, you must maintain attribution:

import hashlib
from typing import Optional, List, Dict
from dataclasses import dataclass, field
from datetime import datetime
import sqlite3
import json

@dataclass
class Attribution:
    content_hash: str
    author: str
    license: str
    source_url: Optional[str]
    title: Optional[str]
    date_indexed: str
    attribution_text: str

class AttributionIndex:
    """Track content sources for attribution requirements."""
    
    ATTRIBUTION_TEMPLATES = {
        "cc-by-4.0": '"{title}" by {author} is licensed under CC BY 4.0. Source: {source_url}',
        "cc-by-sa-4.0": '"{title}" by {author} is licensed under CC BY-SA 4.0. Source: {source_url}',
        "cc-by-3.0": '"{title}" by {author} is licensed under CC BY 3.0. Source: {source_url}',
        "mit": "MIT License - Copyright (c) {author}",
        "apache-2.0": "Apache 2.0 License - Copyright {author}. See NOTICE file.",
    }
    
    def __init__(self, db_path: str = "attribution.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_schema()
    
    def _init_schema(self) -> None:
        """Initialize database schema."""
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS attributions (
                content_hash TEXT PRIMARY KEY,
                author TEXT NOT NULL,
                license TEXT NOT NULL,
                source_url TEXT,
                title TEXT,
                date_indexed TEXT,
                attribution_text TEXT
            )
        """)
        self.conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_license ON attributions(license)
        """)
        self.conn.commit()
    
    def _generate_attribution_text(
        self,
        license_key: str,
        author: str,
        title: Optional[str],
        source_url: Optional[str]
    ) -> str:
        """Generate attribution text from template."""
        template = self.ATTRIBUTION_TEMPLATES.get(
            license_key.lower(),
            "{title} by {author}. License: {license}. Source: {source_url}"
        )
        
        return template.format(
            title=title or "Untitled",
            author=author,
            license=license_key,
            source_url=source_url or "N/A"
        )
    
    def index_content(
        self,
        content: str,
        author: str,
        license: str,
        source_url: Optional[str] = None,
        title: Optional[str] = None
    ) -> str:
        """Index content with attribution metadata.
        
        Returns:
            content_hash for reference
        """
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        
        attribution_text = self._generate_attribution_text(
            license, author, title, source_url
        )
        
        self.conn.execute("""
            INSERT OR REPLACE INTO attributions 
            (content_hash, author, license, source_url, title, date_indexed, attribution_text)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            content_hash,
            author,
            license,
            source_url,
            title,
            datetime.utcnow().isoformat(),
            attribution_text
        ))
        self.conn.commit()
        
        return content_hash
    
    def get_attribution(self, content: str) -> Optional[Attribution]:
        """Get attribution for content."""
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        
        row = self.conn.execute("""
            SELECT content_hash, author, license, source_url, title, date_indexed, attribution_text
            FROM attributions WHERE content_hash = ?
        """, (content_hash,)).fetchone()
        
        if row:
            return Attribution(*row)
        return None
    
    def get_attribution_by_hash(self, content_hash: str) -> Optional[Attribution]:
        """Get attribution by hash."""
        row = self.conn.execute("""
            SELECT content_hash, author, license, source_url, title, date_indexed, attribution_text
            FROM attributions WHERE content_hash = ?
        """, (content_hash,)).fetchone()
        
        if row:
            return Attribution(*row)
        return None
    
    def filter_by_license(
        self,
        content_hashes: List[str],
        allowed_licenses: set
    ) -> List[str]:
        """Filter content to only allowed licenses."""
        
        placeholders = ",".join("?" * len(content_hashes))
        allowed_list = list(allowed_licenses)
        
        rows = self.conn.execute(f"""
            SELECT content_hash FROM attributions 
            WHERE content_hash IN ({placeholders})
            AND LOWER(license) IN ({",".join("?" * len(allowed_list))})
        """, content_hashes + allowed_list).fetchall()
        
        return [row[0] for row in rows]
    
    def generate_credits_file(self, content_hashes: List[str]) -> str:
        """Generate CREDITS/ATTRIBUTION file for model release."""
        
        placeholders = ",".join("?" * len(content_hashes))
        rows = self.conn.execute(f"""
            SELECT DISTINCT author, license, source_url, attribution_text
            FROM attributions 
            WHERE content_hash IN ({placeholders})
            ORDER BY license, author
        """, content_hashes).fetchall()
        
        lines = [
            "# TRAINING DATA ATTRIBUTIONS",
            "",
            "This model was trained on data from the following sources:",
            ""
        ]
        
        current_license = None
        for author, license, source_url, attribution_text in rows:
            if license != current_license:
                lines.append(f"\n## {license}\n")
                current_license = license
            
            lines.append(f"- {attribution_text}")
        
        return "\n".join(lines)
    
    def export_manifest(self, content_hashes: List[str], output_path: str) -> None:
        """Export attribution manifest as JSON."""
        
        placeholders = ",".join("?" * len(content_hashes))
        rows = self.conn.execute(f"""
            SELECT content_hash, author, license, source_url, title, date_indexed, attribution_text
            FROM attributions 
            WHERE content_hash IN ({placeholders})
        """, content_hashes).fetchall()
        
        manifest = {
            "generated_at": datetime.utcnow().isoformat(),
            "total_attributions": len(rows),
            "attributions": [
                {
                    "content_hash": row[0],
                    "author": row[1],
                    "license": row[2],
                    "source_url": row[3],
                    "title": row[4],
                    "attribution_text": row[6]
                }
                for row in rows
            ]
        }
        
        with open(output_path, 'w') as f:
            json.dump(manifest, f, indent=2)


# Usage
index = AttributionIndex()

# Index training data
hash1 = index.index_content(
    content="Some Wikipedia article text...",
    author="Wikipedia contributors",
    license="CC-BY-SA-4.0",
    source_url="https://en.wikipedia.org/wiki/Article",
    title="Example Article"
)

# Generate credits for model release
credits = index.generate_credits_file([hash1])
with open("CREDITS.md", "w") as f:
    f.write(credits)

32.4.6. Model Licensing (Output)

When you release a model, you need to license it appropriately:

RAIL (Responsible AI License)

# model_license.yaml
license: openrail-m
version: 1.0
model_name: "acme-classifier-v2"
release_date: "2024-01-15"

# What users CAN do
permissions:
  - commercial_use
  - modification
  - distribution
  - patent_use
  - private_use

# Usage restrictions
use_restrictions:
  - "No generation of deepfakes for deception"
  - "No medical diagnosis without licensed oversight"
  - "No autonomous weapons systems"
  - "No mass surveillance"
  - "No generation of CSAM"
  - "No spam or misinformation campaigns"

# Conditions
conditions:
  - attribution_required: true
  - license_notice_required: true
  - state_changes_required: true

# Training data summary
training_data:
  sources:
    - name: "Wikipedia"
      license: "CC-BY-SA-4.0"
    - name: "Internal data"
      license: "Proprietary"
  attribution_file: "CREDITS.md"

# Model lineage
base_model: null  # This is original, not fine-tuned
fine_tuned_from: null

Embedding License in Model Metadata

from safetensors import safe_open
from safetensors.torch import save_file
from typing import Dict
import json

def add_license_metadata(
    model_path: str, 
    license_info: dict,
    output_path: str = None
) -> None:
    """Add license metadata to safetensors file."""
    
    if output_path is None:
        output_path = model_path
    
    # Load existing model
    with safe_open(model_path, framework="pt") as f:
        tensors = {k: f.get_tensor(k) for k in f.keys()}
        existing_metadata = dict(f.metadata()) if f.metadata() else {}
    
    # Add license metadata
    metadata = existing_metadata.copy()
    metadata.update({
        "license": license_info.get("license", "unknown"),
        "license_version": license_info.get("version", "1.0"),
        "author": license_info.get("author", "unknown"),
        "model_name": license_info.get("model_name", ""),
        "use_restrictions": json.dumps(license_info.get("use_restrictions", [])),
        "training_data_summary": json.dumps(license_info.get("training_data", {})),
        "attribution_required": str(license_info.get("attribution_required", True)),
    })
    
    # Save with metadata
    save_file(tensors, output_path, metadata)


def read_license_metadata(model_path: str) -> dict:
    """Read license metadata from safetensors file."""
    
    with safe_open(model_path, framework="pt") as f:
        metadata = dict(f.metadata()) if f.metadata() else {}
    
    result = {
        "license": metadata.get("license", "unknown"),
        "license_version": metadata.get("license_version"),
        "author": metadata.get("author"),
        "model_name": metadata.get("model_name"),
        "attribution_required": metadata.get("attribution_required", "True") == "True",
    }
    
    # Parse JSON fields
    if "use_restrictions" in metadata:
        result["use_restrictions"] = json.loads(metadata["use_restrictions"])
    
    if "training_data_summary" in metadata:
        result["training_data"] = json.loads(metadata["training_data_summary"])
    
    return result


def verify_model_license(model_path: str, intended_use: str) -> dict:
    """Verify if intended use is permitted by license."""
    
    license_info = read_license_metadata(model_path)
    restrictions = license_info.get("use_restrictions", [])
    
    # Simple keyword matching (in production, use NLP)
    blocked = False
    blocking_restriction = None
    
    intended_lower = intended_use.lower()
    for restriction in restrictions:
        # Check for keyword matches
        keywords = restriction.lower().split()
        if any(kw in intended_lower for kw in ["deepfake", "weapon", "surveillance", "spam"]):
            if any(kw in restriction.lower() for kw in ["deepfake", "weapon", "surveillance", "spam"]):
                blocked = True
                blocking_restriction = restriction
                break
    
    return {
        "permitted": not blocked,
        "license": license_info["license"],
        "blocking_restriction": blocking_restriction,
        "attribution_required": license_info["attribution_required"]
    }


# Usage
add_license_metadata(
    "model.safetensors",
    {
        "license": "openrail-m",
        "version": "1.0",
        "author": "Acme Corp",
        "model_name": "acme-classifier-v2",
        "use_restrictions": [
            "No deepfakes",
            "No medical diagnosis without oversight"
        ],
        "attribution_required": True
    }
)

# Check usage
result = verify_model_license("model.safetensors", "customer support chatbot")
print(result)  # {'permitted': True, 'license': 'openrail-m', ...}

32.4.7. Takedown Request Handling

Artists and content owners can request removal:

from PIL import Image
import imagehash
from typing import Optional, List
from datetime import datetime
from dataclasses import dataclass
import sqlite3
import json

@dataclass
class TakedownRequest:
    request_id: str
    owner: str
    owner_email: str
    content_type: str  # "image", "text", "code"
    reason: str
    status: str  # "pending", "approved", "denied", "processed"
    submitted_at: str
    processed_at: Optional[str] = None
    
class TakedownHandler:
    """Handle artist/owner takedown requests."""
    
    def __init__(self, db_path: str = "takedowns.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_schema()
    
    def _init_schema(self) -> None:
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS takedown_requests (
                request_id TEXT PRIMARY KEY,
                owner TEXT NOT NULL,
                owner_email TEXT NOT NULL,
                content_type TEXT NOT NULL,
                reason TEXT,
                status TEXT DEFAULT 'pending',
                submitted_at TEXT,
                processed_at TEXT
            )
        """)
        
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS blocked_content (
                hash TEXT PRIMARY KEY,
                hash_type TEXT,
                request_id TEXT,
                blocked_at TEXT,
                FOREIGN KEY (request_id) REFERENCES takedown_requests(request_id)
            )
        """)
        self.conn.commit()
    
    def submit_request(
        self,
        request_id: str,
        owner: str,
        owner_email: str,
        content_type: str,
        content_samples: List[str],
        reason: str
    ) -> TakedownRequest:
        """Submit a new takedown request."""
        
        request = TakedownRequest(
            request_id=request_id,
            owner=owner,
            owner_email=owner_email,
            content_type=content_type,
            reason=reason,
            status="pending",
            submitted_at=datetime.utcnow().isoformat()
        )
        
        self.conn.execute("""
            INSERT INTO takedown_requests 
            (request_id, owner, owner_email, content_type, reason, status, submitted_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            request.request_id, request.owner, request.owner_email,
            request.content_type, request.reason, request.status, request.submitted_at
        ))
        
        # Pre-compute hashes for samples
        for sample_path in content_samples:
            self._add_content_hash(sample_path, content_type, request_id, "pending")
        
        self.conn.commit()
        return request
    
    def _add_content_hash(
        self, 
        content_path: str, 
        content_type: str, 
        request_id: str,
        status: str
    ) -> str:
        """Compute and store content hash."""
        
        if content_type == "image":
            img = Image.open(content_path)
            # Use perceptual hash for images (survives transformations)
            phash = str(imagehash.phash(img))
            hash_type = "phash"
        else:
            # Use content hash for text/code
            with open(content_path, 'rb') as f:
                import hashlib
                phash = hashlib.sha256(f.read()).hexdigest()
            hash_type = "sha256"
        
        if status == "pending":
            # Store in pending table, not blocklist yet
            pass
        else:
            self.conn.execute("""
                INSERT OR REPLACE INTO blocked_content 
                (hash, hash_type, request_id, blocked_at)
                VALUES (?, ?, ?, ?)
            """, (phash, hash_type, request_id, datetime.utcnow().isoformat()))
        
        return phash
    
    def approve_request(self, request_id: str) -> None:
        """Approve takedown request and add to blocklist."""
        
        self.conn.execute("""
            UPDATE takedown_requests 
            SET status = 'approved', processed_at = ?
            WHERE request_id = ?
        """, (datetime.utcnow().isoformat(), request_id))
        
        # Move pending hashes to blocklist
        # (In production, this would query pending hashes)
        
        self.conn.commit()
    
    def is_blocked_image(self, image_path: str, threshold: int = 5) -> bool:
        """Check if image is on blocklist using perceptual hash."""
        
        img = Image.open(image_path)
        img_hash = imagehash.phash(img)
        
        # Check against all blocked hashes
        rows = self.conn.execute("""
            SELECT hash FROM blocked_content WHERE hash_type = 'phash'
        """).fetchall()
        
        for (stored_hash,) in rows:
            stored = imagehash.hex_to_hash(stored_hash)
            # Hamming distance
            if img_hash - stored <= threshold:
                return True
        
        return False
    
    def is_blocked_text(self, content: str) -> bool:
        """Check if text content is blocked."""
        import hashlib
        
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        
        row = self.conn.execute("""
            SELECT 1 FROM blocked_content 
            WHERE hash = ? AND hash_type = 'sha256'
        """, (content_hash,)).fetchone()
        
        return row is not None
    
    def filter_training_batch(
        self, 
        image_paths: List[str]
    ) -> List[str]:
        """Filter a batch of images, removing blocked ones."""
        
        return [
            path for path in image_paths
            if not self.is_blocked_image(path)
        ]
    
    def get_statistics(self) -> dict:
        """Get takedown statistics."""
        
        stats = {}
        
        for status in ["pending", "approved", "denied", "processed"]:
            count = self.conn.execute("""
                SELECT COUNT(*) FROM takedown_requests WHERE status = ?
            """, (status,)).fetchone()[0]
            stats[f"requests_{status}"] = count
        
        stats["total_blocked"] = self.conn.execute("""
            SELECT COUNT(*) FROM blocked_content
        """).fetchone()[0]
        
        return stats


# Usage
handler = TakedownHandler()

# Artist submits request
request = handler.submit_request(
    request_id="TR-2024-001",
    owner="Jane Artist",
    owner_email="jane@artist.com",
    content_type="image",
    content_samples=["artwork1.jpg", "artwork2.jpg"],
    reason="I did not consent to AI training"
)

# Legal reviews and approves
handler.approve_request("TR-2024-001")

# Training pipeline checks
if handler.is_blocked_image("some_image.jpg"):
    print("Skipping blocked image")

32.4.8. Compliance Audit Trail

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, List
import json
import hashlib

@dataclass
class AuditEvent:
    event_id: str
    event_type: str  # "data_ingestion", "license_scan", "training_start", etc.
    timestamp: str
    actor: str  # user or system
    resource: str  # dataset, model, etc.
    action: str
    outcome: str  # "success", "failure", "blocked"
    details: dict
    
    def to_dict(self) -> dict:
        return asdict(self)

class ComplianceAuditor:
    """Maintain audit trail for compliance."""
    
    def __init__(self, log_path: str = "audit_log.jsonl"):
        self.log_path = log_path
    
    def log_event(self, event: AuditEvent) -> None:
        """Append event to audit log."""
        with open(self.log_path, 'a') as f:
            f.write(json.dumps(event.to_dict()) + "\n")
    
    def log_data_ingestion(
        self,
        dataset_name: str,
        source: str,
        license: str,
        actor: str,
        zone: str
    ) -> AuditEvent:
        """Log data ingestion event."""
        event = AuditEvent(
            event_id=self._generate_id(),
            event_type="data_ingestion",
            timestamp=datetime.utcnow().isoformat(),
            actor=actor,
            resource=dataset_name,
            action="ingest",
            outcome="success",
            details={
                "source": source,
                "license": license,
                "assigned_zone": zone
            }
        )
        self.log_event(event)
        return event
    
    def log_training_run(
        self,
        model_name: str,
        datasets: List[str],
        actor: str,
        config: dict
    ) -> AuditEvent:
        """Log training run event."""
        event = AuditEvent(
            event_id=self._generate_id(),
            event_type="training_start",
            timestamp=datetime.utcnow().isoformat(),
            actor=actor,
            resource=model_name,
            action="train",
            outcome="started",
            details={
                "datasets": datasets,
                "config_hash": hashlib.sha256(json.dumps(config).encode()).hexdigest()[:12]
            }
        )
        self.log_event(event)
        return event
    
    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())[:8]
    
    def query_by_dataset(self, dataset_name: str) -> List[AuditEvent]:
        """Query all events related to a dataset."""
        events = []
        with open(self.log_path, 'r') as f:
            for line in f:
                event_dict = json.loads(line)
                if (event_dict.get("resource") == dataset_name or 
                    dataset_name in event_dict.get("details", {}).get("datasets", [])):
                    events.append(AuditEvent(**event_dict))
        return events

32.4.9. Summary Checklist

StepActionOwnerFrequency
1Define license zones (Green/Yellow/Red/Black)Legal + PlatformOnce
2Implement zone-based storage with IAMPlatformOnce
3Set up license scanning in CI/CDPlatformOnce
4Create attribution index for CC-BY dataData EngineeringOngoing
5Maintain DataBOM for all training runsML EngineeringPer run
6Implement takedown request handlingLegal + PlatformOngoing
7Add license metadata to released modelsML EngineeringPer release
8Audit trail for compliancePlatformOngoing
9Quarterly license compliance reviewLegalQuarterly
10Update license classifications as law evolvesLegalBi-annually

Decision Quick Reference

If data is…Then…Risk Level
CC0/MIT/ApacheUse freely for commercial✅ Low
CC-BYUse with attribution⚠️ Low-Medium
CC-BY-SAConsult legal on model licensing⚠️ Medium
GPL/LGPLQuarantine, consult legal🔴 High
CC-NC/NDDo not use for commercial models⛔ Critical
Unknown sourceQuarantine until verified🔴 High
Web scrapeConsult legal, consider robots.txt🔴 High

[End of Section 32.4]