32.4. Dataset Licensing & Attribution: The IP Supply Chain
Caution
The Poisoned Well: If you train on GPL-licensed code, your entire model could be subject to “copyleft” requirements. One contaminated dataset can create years of legal exposure.
32.4.1. License Types for AI
Understanding license compatibility is critical for commercial AI:
| License | Type | Commercial Use | Training Safe? | Redistribution |
|---|---|---|---|---|
| CC0 | Public Domain | ✓ | ✓ | No restrictions |
| MIT | Permissive | ✓ | ✓ | Keep license file |
| Apache 2.0 | Permissive | ✓ | ✓ | Keep license + NOTICE |
| BSD-3 | Permissive | ✓ | ✓ | Keep license |
| CC-BY | Attribution | ✓ | ✓ with attribution | Credit author |
| CC-BY-SA | ShareAlike | ✓ | ⚠️ Output may need same license | Share alike |
| GPL-2.0 | Strong Copyleft | ✓ | ⚠️ High risk | Source disclosure |
| GPL-3.0 | Strong Copyleft | ✓ | ⚠️ High risk | Source + patents |
| LGPL | Weak Copyleft | ✓ | ⚠️ Medium risk | Library linking OK |
| CC-NC | Non-Commercial | ✗ | ✗ | Commercial prohibited |
| CC-ND | No Derivatives | ? | ⚠️ Gray area | Is training a “derivative”? |
| Proprietary | Varies | Check ToS | Check ToS | Usually prohibited |
The Training-as-Derivative Debate
graph TD
A[Training Data] --> B{Is model a<br>'derivative work'?}
B -->|Legal Position 1| C[Yes: Model inherits license]
B -->|Legal Position 2| D[No: Model is transformation]
C --> E[GPL model must be open]
D --> F[Commercial use OK]
G[Current Status] --> H[Unsettled law]
H --> I[Conservative approach:<br>Assume derivative]
License Risk Matrix
| Data Type | Low Risk | Medium Risk | High Risk |
|---|---|---|---|
| Text | CC0, Wikipedia | Books3, arXiv | Web scraping |
| Images | LAION-5B-CC0 | LAION-2B | Getty, stock photos |
| Code | Apache repos | MIT repos | GPL repos |
| Audio | LibriSpeech | YouTube | Commercial music |
| Video | Kinetics | YouTube-8M | Movies, streaming |
32.4.2. The License Lake Architecture
Segregate data by license zone to prevent contamination:
graph TB
A[Raw Data Ingestion] --> B{License Scanner}
B -->|CC0/MIT/Apache| C[Zone Green<br>Commercial OK]
B -->|CC-BY/CC-BY-SA| D[Zone Yellow<br>Attribution Required]
B -->|GPL/LGPL/Unknown| E[Zone Red<br>Quarantine]
B -->|CC-NC/ND/Proprietary| F[Zone Black<br>DO NOT USE]
C --> G[Production Training]
D --> H[Attribution Pipeline]
E --> I[Legal Review]
F --> J[Delete or Request License]
subgraph "Access Control"
G
H
I
J
end
Terraform: Zone-Based Access Control
# data_lake_zones.tf
variable "environment" {
type = string
}
# Zone definitions
locals {
zones = {
green = {
description = "Commercial use permitted"
allowed_licenses = ["cc0-1.0", "mit", "apache-2.0", "bsd-3-clause"]
}
yellow = {
description = "Attribution required"
allowed_licenses = ["cc-by-4.0", "cc-by-3.0"]
}
red = {
description = "Legal review required"
allowed_licenses = ["gpl-2.0", "gpl-3.0", "lgpl-2.1", "unknown"]
}
black = {
description = "DO NOT USE"
allowed_licenses = ["cc-nc", "cc-nd", "proprietary"]
}
}
}
# S3 buckets per zone
resource "aws_s3_bucket" "data_zone" {
for_each = local.zones
bucket = "data-lake-${each.key}-${var.environment}"
tags = {
Zone = each.key
Description = each.value.description
Environment = var.environment
ManagedBy = "terraform"
}
}
# Block public access for all zones
resource "aws_s3_bucket_public_access_block" "data_zone" {
for_each = aws_s3_bucket.data_zone
bucket = each.value.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Commercial training can only access green zone
resource "aws_iam_policy" "commercial_training" {
name = "CommercialTrainingAccess-${var.environment}"
description = "Access to commercially safe training data"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowGreenZone"
Effect = "Allow"
Action = ["s3:GetObject", "s3:ListBucket"]
Resource = [
aws_s3_bucket.data_zone["green"].arn,
"${aws_s3_bucket.data_zone["green"].arn}/*"
]
},
{
Sid = "DenyOtherZones"
Effect = "Deny"
Action = ["s3:*"]
Resource = flatten([
for zone in ["yellow", "red", "black"] : [
aws_s3_bucket.data_zone[zone].arn,
"${aws_s3_bucket.data_zone[zone].arn}/*"
]
])
}
]
})
}
# Research can access green + yellow with attribution tracking
resource "aws_iam_policy" "research_training" {
name = "ResearchTrainingAccess-${var.environment}"
description = "Access to research data with attribution requirements"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowGreenYellowZones"
Effect = "Allow"
Action = ["s3:GetObject", "s3:ListBucket"]
Resource = flatten([
for zone in ["green", "yellow"] : [
aws_s3_bucket.data_zone[zone].arn,
"${aws_s3_bucket.data_zone[zone].arn}/*"
]
])
},
{
Sid = "DenyRestrictedZones"
Effect = "Deny"
Action = ["s3:*"]
Resource = flatten([
for zone in ["red", "black"] : [
aws_s3_bucket.data_zone[zone].arn,
"${aws_s3_bucket.data_zone[zone].arn}/*"
]
])
}
]
})
}
# Legal team can review red zone
resource "aws_iam_policy" "legal_review" {
name = "LegalReviewAccess-${var.environment}"
description = "Read access to quarantined data for legal review"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowRedZoneRead"
Effect = "Allow"
Action = ["s3:GetObject", "s3:ListBucket"]
Resource = [
aws_s3_bucket.data_zone["red"].arn,
"${aws_s3_bucket.data_zone["red"].arn}/*"
]
}
]
})
}
32.4.3. Data Bill of Materials (DataBOM)
Like SBOM for software, DataBOM tracks the provenance of training data:
{
"spdxVersion": "SPDX-2.3",
"dataFormatVersion": "1.0",
"creationInfo": {
"created": "2024-01-15T10:30:00Z",
"creators": ["Tool: DataBOM-Generator-1.0", "Organization: Acme Corp"],
"licenseListVersion": "3.21"
},
"documentName": "TrainingData-Manifest-v4",
"documentNamespace": "https://acme.com/databom/training-v4",
"packages": [
{
"name": "wikipedia-en-2024",
"downloadLocation": "https://dumps.wikimedia.org/enwiki/20240101/",
"filesAnalyzed": true,
"licenseConcluded": "CC-BY-SA-4.0",
"licenseDeclared": "CC-BY-SA-4.0",
"copyrightText": "Wikipedia contributors, Wikimedia Foundation",
"supplier": "Organization: Wikimedia Foundation",
"checksums": [
{
"algorithm": "SHA256",
"checksumValue": "a1b2c3d4e5f6..."
}
],
"attributionTexts": [
"Content from Wikipedia, the free encyclopedia, under CC BY-SA 4.0"
],
"annotations": [
{
"annotationType": "OTHER",
"annotator": "Tool: LicenseScanner",
"annotationDate": "2024-01-10T08:00:00Z",
"comment": "All articles verified as CC-BY-SA"
}
]
},
{
"name": "internal-support-tickets",
"downloadLocation": "NOASSERTION",
"filesAnalyzed": true,
"licenseConcluded": "Proprietary",
"licenseDeclared": "Proprietary",
"copyrightText": "Acme Corp 2020-2024",
"supplier": "Organization: Acme Corp",
"annotations": [
{
"annotationType": "OTHER",
"annotator": "Person: Legal Counsel",
"annotationDate": "2024-01-12T14:00:00Z",
"comment": "Verified: Customer consent obtained for AI training"
}
]
},
{
"name": "github-code-samples",
"downloadLocation": "https://github.com/...",
"filesAnalyzed": true,
"licenseConcluded": "(MIT OR Apache-2.0)",
"licenseInfoInFile": ["MIT", "Apache-2.0"],
"copyrightText": "Various contributors",
"supplier": "Organization: GitHub",
"externalRefs": [
{
"referenceCategory": "SECURITY",
"referenceType": "cpe23Type",
"referenceLocator": "cpe:2.3:*:*:*:*:*:*:*:*"
}
]
}
],
"files": [
{
"fileName": "corpus/wikipedia.parquet",
"SPDXID": "SPDXRef-File-Wikipedia",
"licenseConcluded": "CC-BY-SA-4.0",
"copyrightText": "Wikimedia Foundation",
"checksums": [
{"algorithm": "SHA256", "checksumValue": "a1b2c3..."}
]
}
],
"relationships": [
{
"spdxElementId": "SPDXRef-DOCUMENT",
"relationshipType": "DESCRIBES",
"relatedSpdxElement": "SPDXRef-Package-wikipedia-en-2024"
}
]
}
DataBOM Generator
import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict
from dataclasses import dataclass, field, asdict
@dataclass
class DataSource:
name: str
location: str
license_concluded: str
license_declared: str
copyright_text: str
supplier: str
checksum: Optional[str] = None
attribution_texts: List[str] = field(default_factory=list)
annotations: List[Dict] = field(default_factory=list)
@dataclass
class DataBOM:
document_name: str
namespace: str
creator: str
sources: List[DataSource] = field(default_factory=list)
def add_source(self, source: DataSource) -> None:
self.sources.append(source)
def to_spdx(self) -> dict:
"""Export to SPDX format."""
return {
"spdxVersion": "SPDX-2.3",
"dataFormatVersion": "1.0",
"creationInfo": {
"created": datetime.utcnow().isoformat() + "Z",
"creators": [self.creator],
},
"documentName": self.document_name,
"documentNamespace": self.namespace,
"packages": [
{
"name": src.name,
"downloadLocation": src.location,
"licenseConcluded": src.license_concluded,
"licenseDeclared": src.license_declared,
"copyrightText": src.copyright_text,
"supplier": src.supplier,
"checksums": [{"algorithm": "SHA256", "value": src.checksum}] if src.checksum else [],
"attributionTexts": src.attribution_texts,
"annotations": src.annotations
}
for src in self.sources
]
}
def save(self, path: str) -> None:
"""Save DataBOM to file."""
with open(path, 'w') as f:
json.dump(self.to_spdx(), f, indent=2)
@classmethod
def load(cls, path: str) -> 'DataBOM':
"""Load DataBOM from file."""
with open(path) as f:
data = json.load(f)
bom = cls(
document_name=data["documentName"],
namespace=data["documentNamespace"],
creator=data["creationInfo"]["creators"][0]
)
for pkg in data.get("packages", []):
source = DataSource(
name=pkg["name"],
location=pkg["downloadLocation"],
license_concluded=pkg["licenseConcluded"],
license_declared=pkg.get("licenseDeclared", pkg["licenseConcluded"]),
copyright_text=pkg["copyrightText"],
supplier=pkg["supplier"],
attribution_texts=pkg.get("attributionTexts", [])
)
bom.add_source(source)
return bom
def calculate_file_checksum(file_path: str) -> str:
"""Calculate SHA256 checksum of a file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
# Usage
bom = DataBOM(
document_name="ProductionTrainingData-v2",
namespace="https://company.com/databom/prod-v2",
creator="Tool: DataBOM-Generator"
)
bom.add_source(DataSource(
name="wikipedia-corpus",
location="s3://data-lake/wikipedia/2024-01/",
license_concluded="CC-BY-SA-4.0",
license_declared="CC-BY-SA-4.0",
copyright_text="Wikimedia Foundation",
supplier="Organization: Wikimedia",
checksum=calculate_file_checksum("wikipedia.parquet"),
attribution_texts=["Wikipedia contributors"]
))
bom.save("databom.spdx.json")
32.4.4. License Scanning Pipeline
Automated scanning prevents contamination:
import json
import subprocess
from pathlib import Path
from typing import Dict, List, Set, Optional
from dataclasses import dataclass
from enum import Enum
class LicenseZone(Enum):
GREEN = "green"
YELLOW = "yellow"
RED = "red"
BLACK = "black"
@dataclass
class LicenseResult:
file_path: str
licenses: List[str]
confidence: float
zone: LicenseZone
class LicenseScanner:
"""Scan datasets for license information."""
# License categorization
GREEN_LICENSES: Set[str] = {
"mit", "apache-2.0", "bsd-2-clause", "bsd-3-clause",
"cc0-1.0", "unlicense", "wtfpl", "isc", "zlib"
}
YELLOW_LICENSES: Set[str] = {
"cc-by-4.0", "cc-by-3.0", "cc-by-2.5", "cc-by-2.0",
"cc-by-sa-4.0", "cc-by-sa-3.0", "ofl-1.1"
}
RED_LICENSES: Set[str] = {
"gpl-2.0", "gpl-3.0", "lgpl-2.1", "lgpl-3.0",
"agpl-3.0", "mpl-2.0", "eupl-1.2"
}
BLACK_LICENSES: Set[str] = {
"cc-by-nc-4.0", "cc-by-nc-3.0", "cc-by-nd-4.0",
"cc-by-nc-nd-4.0", "proprietary", "all-rights-reserved"
}
def __init__(self, scancode_path: str = "scancode"):
self.scancode_path = scancode_path
def scan_directory(self, data_path: str, output_path: str = "scan.json") -> dict:
"""Scan directory for licenses using ScanCode."""
cmd = [
self.scancode_path,
"--license",
"--license-text",
"--copyright",
"--info",
"--classify",
"--json-pp", output_path,
"--processes", "4",
data_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ScanCode failed: {result.stderr}")
with open(output_path) as f:
return json.load(f)
def categorize_license(self, license_key: str) -> LicenseZone:
"""Categorize a license into a zone."""
license_lower = license_key.lower()
if license_lower in self.GREEN_LICENSES:
return LicenseZone.GREEN
elif license_lower in self.YELLOW_LICENSES:
return LicenseZone.YELLOW
elif license_lower in self.RED_LICENSES:
return LicenseZone.RED
elif license_lower in self.BLACK_LICENSES:
return LicenseZone.BLACK
else:
return LicenseZone.RED # Unknown = quarantine
def categorize_files(self, scan_results: dict) -> Dict[LicenseZone, List[LicenseResult]]:
"""Categorize scanned files by license zone."""
zones = {zone: [] for zone in LicenseZone}
for file_entry in scan_results.get("files", []):
path = file_entry.get("path", "")
licenses = file_entry.get("licenses", [])
if not licenses:
# No license detected = quarantine
result = LicenseResult(
file_path=path,
licenses=["unknown"],
confidence=0.0,
zone=LicenseZone.RED
)
zones[LicenseZone.RED].append(result)
continue
# Get most restrictive license (worst case)
file_zone = LicenseZone.GREEN
license_keys = []
max_confidence = 0.0
for lic in licenses:
license_key = lic.get("key", "unknown")
confidence = lic.get("score", 0) / 100.0
license_keys.append(license_key)
max_confidence = max(max_confidence, confidence)
license_zone = self.categorize_license(license_key)
# Take most restrictive
if license_zone.value > file_zone.value:
file_zone = license_zone
result = LicenseResult(
file_path=path,
licenses=license_keys,
confidence=max_confidence,
zone=file_zone
)
zones[file_zone].append(result)
return zones
def generate_report(self, zones: Dict[LicenseZone, List[LicenseResult]]) -> str:
"""Generate human-readable report."""
lines = ["# License Scan Report\n"]
for zone in LicenseZone:
files = zones[zone]
lines.append(f"\n## {zone.name} Zone ({len(files)} files)\n")
if zone == LicenseZone.GREEN:
lines.append("✅ Safe for commercial training\n")
elif zone == LicenseZone.YELLOW:
lines.append("⚠️ Attribution required\n")
elif zone == LicenseZone.RED:
lines.append("🔴 Requires legal review\n")
elif zone == LicenseZone.BLACK:
lines.append("⛔ DO NOT USE for training\n")
for result in files[:10]: # Show first 10
lines.append(f"- `{result.file_path}`: {', '.join(result.licenses)}")
if len(files) > 10:
lines.append(f"- ... and {len(files) - 10} more")
return "\n".join(lines)
# CI/CD Integration
def scan_and_gate(data_path: str, allow_yellow: bool = False) -> bool:
"""Gate function for CI/CD pipeline."""
scanner = LicenseScanner()
print(f"Scanning {data_path}...")
results = scanner.scan_directory(data_path)
zones = scanner.categorize_files(results)
print(scanner.generate_report(zones))
# Fail if any red or black
if zones[LicenseZone.RED] or zones[LicenseZone.BLACK]:
print("❌ FAILED: Found restricted licenses")
return False
# Optionally fail on yellow
if not allow_yellow and zones[LicenseZone.YELLOW]:
print("❌ FAILED: Found attribution-required licenses")
return False
print("✅ PASSED: All licenses acceptable")
return True
GitHub Actions Integration
# .github/workflows/license-scan.yaml
name: License Scan
on:
push:
paths:
- 'data/**'
pull_request:
paths:
- 'data/**'
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
lfs: true # Fetch large files
- name: Install ScanCode
run: |
pip install scancode-toolkit
- name: Run License Scan
run: |
python scripts/license_scan.py data/ --output scan-results.json
- name: Upload Results
uses: actions/upload-artifact@v4
with:
name: license-scan-results
path: scan-results.json
- name: Check for Violations
run: |
python scripts/check_licenses.py scan-results.json --fail-on-yellow
32.4.5. Attribution System
For CC-BY and similar licenses, you must maintain attribution:
import hashlib
from typing import Optional, List, Dict
from dataclasses import dataclass, field
from datetime import datetime
import sqlite3
import json
@dataclass
class Attribution:
content_hash: str
author: str
license: str
source_url: Optional[str]
title: Optional[str]
date_indexed: str
attribution_text: str
class AttributionIndex:
"""Track content sources for attribution requirements."""
ATTRIBUTION_TEMPLATES = {
"cc-by-4.0": '"{title}" by {author} is licensed under CC BY 4.0. Source: {source_url}',
"cc-by-sa-4.0": '"{title}" by {author} is licensed under CC BY-SA 4.0. Source: {source_url}',
"cc-by-3.0": '"{title}" by {author} is licensed under CC BY 3.0. Source: {source_url}',
"mit": "MIT License - Copyright (c) {author}",
"apache-2.0": "Apache 2.0 License - Copyright {author}. See NOTICE file.",
}
def __init__(self, db_path: str = "attribution.db"):
self.conn = sqlite3.connect(db_path)
self._init_schema()
def _init_schema(self) -> None:
"""Initialize database schema."""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS attributions (
content_hash TEXT PRIMARY KEY,
author TEXT NOT NULL,
license TEXT NOT NULL,
source_url TEXT,
title TEXT,
date_indexed TEXT,
attribution_text TEXT
)
""")
self.conn.execute("""
CREATE INDEX IF NOT EXISTS idx_license ON attributions(license)
""")
self.conn.commit()
def _generate_attribution_text(
self,
license_key: str,
author: str,
title: Optional[str],
source_url: Optional[str]
) -> str:
"""Generate attribution text from template."""
template = self.ATTRIBUTION_TEMPLATES.get(
license_key.lower(),
"{title} by {author}. License: {license}. Source: {source_url}"
)
return template.format(
title=title or "Untitled",
author=author,
license=license_key,
source_url=source_url or "N/A"
)
def index_content(
self,
content: str,
author: str,
license: str,
source_url: Optional[str] = None,
title: Optional[str] = None
) -> str:
"""Index content with attribution metadata.
Returns:
content_hash for reference
"""
content_hash = hashlib.sha256(content.encode()).hexdigest()
attribution_text = self._generate_attribution_text(
license, author, title, source_url
)
self.conn.execute("""
INSERT OR REPLACE INTO attributions
(content_hash, author, license, source_url, title, date_indexed, attribution_text)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
content_hash,
author,
license,
source_url,
title,
datetime.utcnow().isoformat(),
attribution_text
))
self.conn.commit()
return content_hash
def get_attribution(self, content: str) -> Optional[Attribution]:
"""Get attribution for content."""
content_hash = hashlib.sha256(content.encode()).hexdigest()
row = self.conn.execute("""
SELECT content_hash, author, license, source_url, title, date_indexed, attribution_text
FROM attributions WHERE content_hash = ?
""", (content_hash,)).fetchone()
if row:
return Attribution(*row)
return None
def get_attribution_by_hash(self, content_hash: str) -> Optional[Attribution]:
"""Get attribution by hash."""
row = self.conn.execute("""
SELECT content_hash, author, license, source_url, title, date_indexed, attribution_text
FROM attributions WHERE content_hash = ?
""", (content_hash,)).fetchone()
if row:
return Attribution(*row)
return None
def filter_by_license(
self,
content_hashes: List[str],
allowed_licenses: set
) -> List[str]:
"""Filter content to only allowed licenses."""
placeholders = ",".join("?" * len(content_hashes))
allowed_list = list(allowed_licenses)
rows = self.conn.execute(f"""
SELECT content_hash FROM attributions
WHERE content_hash IN ({placeholders})
AND LOWER(license) IN ({",".join("?" * len(allowed_list))})
""", content_hashes + allowed_list).fetchall()
return [row[0] for row in rows]
def generate_credits_file(self, content_hashes: List[str]) -> str:
"""Generate CREDITS/ATTRIBUTION file for model release."""
placeholders = ",".join("?" * len(content_hashes))
rows = self.conn.execute(f"""
SELECT DISTINCT author, license, source_url, attribution_text
FROM attributions
WHERE content_hash IN ({placeholders})
ORDER BY license, author
""", content_hashes).fetchall()
lines = [
"# TRAINING DATA ATTRIBUTIONS",
"",
"This model was trained on data from the following sources:",
""
]
current_license = None
for author, license, source_url, attribution_text in rows:
if license != current_license:
lines.append(f"\n## {license}\n")
current_license = license
lines.append(f"- {attribution_text}")
return "\n".join(lines)
def export_manifest(self, content_hashes: List[str], output_path: str) -> None:
"""Export attribution manifest as JSON."""
placeholders = ",".join("?" * len(content_hashes))
rows = self.conn.execute(f"""
SELECT content_hash, author, license, source_url, title, date_indexed, attribution_text
FROM attributions
WHERE content_hash IN ({placeholders})
""", content_hashes).fetchall()
manifest = {
"generated_at": datetime.utcnow().isoformat(),
"total_attributions": len(rows),
"attributions": [
{
"content_hash": row[0],
"author": row[1],
"license": row[2],
"source_url": row[3],
"title": row[4],
"attribution_text": row[6]
}
for row in rows
]
}
with open(output_path, 'w') as f:
json.dump(manifest, f, indent=2)
# Usage
index = AttributionIndex()
# Index training data
hash1 = index.index_content(
content="Some Wikipedia article text...",
author="Wikipedia contributors",
license="CC-BY-SA-4.0",
source_url="https://en.wikipedia.org/wiki/Article",
title="Example Article"
)
# Generate credits for model release
credits = index.generate_credits_file([hash1])
with open("CREDITS.md", "w") as f:
f.write(credits)
32.4.6. Model Licensing (Output)
When you release a model, you need to license it appropriately:
RAIL (Responsible AI License)
# model_license.yaml
license: openrail-m
version: 1.0
model_name: "acme-classifier-v2"
release_date: "2024-01-15"
# What users CAN do
permissions:
- commercial_use
- modification
- distribution
- patent_use
- private_use
# Usage restrictions
use_restrictions:
- "No generation of deepfakes for deception"
- "No medical diagnosis without licensed oversight"
- "No autonomous weapons systems"
- "No mass surveillance"
- "No generation of CSAM"
- "No spam or misinformation campaigns"
# Conditions
conditions:
- attribution_required: true
- license_notice_required: true
- state_changes_required: true
# Training data summary
training_data:
sources:
- name: "Wikipedia"
license: "CC-BY-SA-4.0"
- name: "Internal data"
license: "Proprietary"
attribution_file: "CREDITS.md"
# Model lineage
base_model: null # This is original, not fine-tuned
fine_tuned_from: null
Embedding License in Model Metadata
from safetensors import safe_open
from safetensors.torch import save_file
from typing import Dict
import json
def add_license_metadata(
model_path: str,
license_info: dict,
output_path: str = None
) -> None:
"""Add license metadata to safetensors file."""
if output_path is None:
output_path = model_path
# Load existing model
with safe_open(model_path, framework="pt") as f:
tensors = {k: f.get_tensor(k) for k in f.keys()}
existing_metadata = dict(f.metadata()) if f.metadata() else {}
# Add license metadata
metadata = existing_metadata.copy()
metadata.update({
"license": license_info.get("license", "unknown"),
"license_version": license_info.get("version", "1.0"),
"author": license_info.get("author", "unknown"),
"model_name": license_info.get("model_name", ""),
"use_restrictions": json.dumps(license_info.get("use_restrictions", [])),
"training_data_summary": json.dumps(license_info.get("training_data", {})),
"attribution_required": str(license_info.get("attribution_required", True)),
})
# Save with metadata
save_file(tensors, output_path, metadata)
def read_license_metadata(model_path: str) -> dict:
"""Read license metadata from safetensors file."""
with safe_open(model_path, framework="pt") as f:
metadata = dict(f.metadata()) if f.metadata() else {}
result = {
"license": metadata.get("license", "unknown"),
"license_version": metadata.get("license_version"),
"author": metadata.get("author"),
"model_name": metadata.get("model_name"),
"attribution_required": metadata.get("attribution_required", "True") == "True",
}
# Parse JSON fields
if "use_restrictions" in metadata:
result["use_restrictions"] = json.loads(metadata["use_restrictions"])
if "training_data_summary" in metadata:
result["training_data"] = json.loads(metadata["training_data_summary"])
return result
def verify_model_license(model_path: str, intended_use: str) -> dict:
"""Verify if intended use is permitted by license."""
license_info = read_license_metadata(model_path)
restrictions = license_info.get("use_restrictions", [])
# Simple keyword matching (in production, use NLP)
blocked = False
blocking_restriction = None
intended_lower = intended_use.lower()
for restriction in restrictions:
# Check for keyword matches
keywords = restriction.lower().split()
if any(kw in intended_lower for kw in ["deepfake", "weapon", "surveillance", "spam"]):
if any(kw in restriction.lower() for kw in ["deepfake", "weapon", "surveillance", "spam"]):
blocked = True
blocking_restriction = restriction
break
return {
"permitted": not blocked,
"license": license_info["license"],
"blocking_restriction": blocking_restriction,
"attribution_required": license_info["attribution_required"]
}
# Usage
add_license_metadata(
"model.safetensors",
{
"license": "openrail-m",
"version": "1.0",
"author": "Acme Corp",
"model_name": "acme-classifier-v2",
"use_restrictions": [
"No deepfakes",
"No medical diagnosis without oversight"
],
"attribution_required": True
}
)
# Check usage
result = verify_model_license("model.safetensors", "customer support chatbot")
print(result) # {'permitted': True, 'license': 'openrail-m', ...}
32.4.7. Takedown Request Handling
Artists and content owners can request removal:
from PIL import Image
import imagehash
from typing import Optional, List
from datetime import datetime
from dataclasses import dataclass
import sqlite3
import json
@dataclass
class TakedownRequest:
request_id: str
owner: str
owner_email: str
content_type: str # "image", "text", "code"
reason: str
status: str # "pending", "approved", "denied", "processed"
submitted_at: str
processed_at: Optional[str] = None
class TakedownHandler:
"""Handle artist/owner takedown requests."""
def __init__(self, db_path: str = "takedowns.db"):
self.conn = sqlite3.connect(db_path)
self._init_schema()
def _init_schema(self) -> None:
self.conn.execute("""
CREATE TABLE IF NOT EXISTS takedown_requests (
request_id TEXT PRIMARY KEY,
owner TEXT NOT NULL,
owner_email TEXT NOT NULL,
content_type TEXT NOT NULL,
reason TEXT,
status TEXT DEFAULT 'pending',
submitted_at TEXT,
processed_at TEXT
)
""")
self.conn.execute("""
CREATE TABLE IF NOT EXISTS blocked_content (
hash TEXT PRIMARY KEY,
hash_type TEXT,
request_id TEXT,
blocked_at TEXT,
FOREIGN KEY (request_id) REFERENCES takedown_requests(request_id)
)
""")
self.conn.commit()
def submit_request(
self,
request_id: str,
owner: str,
owner_email: str,
content_type: str,
content_samples: List[str],
reason: str
) -> TakedownRequest:
"""Submit a new takedown request."""
request = TakedownRequest(
request_id=request_id,
owner=owner,
owner_email=owner_email,
content_type=content_type,
reason=reason,
status="pending",
submitted_at=datetime.utcnow().isoformat()
)
self.conn.execute("""
INSERT INTO takedown_requests
(request_id, owner, owner_email, content_type, reason, status, submitted_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
request.request_id, request.owner, request.owner_email,
request.content_type, request.reason, request.status, request.submitted_at
))
# Pre-compute hashes for samples
for sample_path in content_samples:
self._add_content_hash(sample_path, content_type, request_id, "pending")
self.conn.commit()
return request
def _add_content_hash(
self,
content_path: str,
content_type: str,
request_id: str,
status: str
) -> str:
"""Compute and store content hash."""
if content_type == "image":
img = Image.open(content_path)
# Use perceptual hash for images (survives transformations)
phash = str(imagehash.phash(img))
hash_type = "phash"
else:
# Use content hash for text/code
with open(content_path, 'rb') as f:
import hashlib
phash = hashlib.sha256(f.read()).hexdigest()
hash_type = "sha256"
if status == "pending":
# Store in pending table, not blocklist yet
pass
else:
self.conn.execute("""
INSERT OR REPLACE INTO blocked_content
(hash, hash_type, request_id, blocked_at)
VALUES (?, ?, ?, ?)
""", (phash, hash_type, request_id, datetime.utcnow().isoformat()))
return phash
def approve_request(self, request_id: str) -> None:
"""Approve takedown request and add to blocklist."""
self.conn.execute("""
UPDATE takedown_requests
SET status = 'approved', processed_at = ?
WHERE request_id = ?
""", (datetime.utcnow().isoformat(), request_id))
# Move pending hashes to blocklist
# (In production, this would query pending hashes)
self.conn.commit()
def is_blocked_image(self, image_path: str, threshold: int = 5) -> bool:
"""Check if image is on blocklist using perceptual hash."""
img = Image.open(image_path)
img_hash = imagehash.phash(img)
# Check against all blocked hashes
rows = self.conn.execute("""
SELECT hash FROM blocked_content WHERE hash_type = 'phash'
""").fetchall()
for (stored_hash,) in rows:
stored = imagehash.hex_to_hash(stored_hash)
# Hamming distance
if img_hash - stored <= threshold:
return True
return False
def is_blocked_text(self, content: str) -> bool:
"""Check if text content is blocked."""
import hashlib
content_hash = hashlib.sha256(content.encode()).hexdigest()
row = self.conn.execute("""
SELECT 1 FROM blocked_content
WHERE hash = ? AND hash_type = 'sha256'
""", (content_hash,)).fetchone()
return row is not None
def filter_training_batch(
self,
image_paths: List[str]
) -> List[str]:
"""Filter a batch of images, removing blocked ones."""
return [
path for path in image_paths
if not self.is_blocked_image(path)
]
def get_statistics(self) -> dict:
"""Get takedown statistics."""
stats = {}
for status in ["pending", "approved", "denied", "processed"]:
count = self.conn.execute("""
SELECT COUNT(*) FROM takedown_requests WHERE status = ?
""", (status,)).fetchone()[0]
stats[f"requests_{status}"] = count
stats["total_blocked"] = self.conn.execute("""
SELECT COUNT(*) FROM blocked_content
""").fetchone()[0]
return stats
# Usage
handler = TakedownHandler()
# Artist submits request
request = handler.submit_request(
request_id="TR-2024-001",
owner="Jane Artist",
owner_email="jane@artist.com",
content_type="image",
content_samples=["artwork1.jpg", "artwork2.jpg"],
reason="I did not consent to AI training"
)
# Legal reviews and approves
handler.approve_request("TR-2024-001")
# Training pipeline checks
if handler.is_blocked_image("some_image.jpg"):
print("Skipping blocked image")
32.4.8. Compliance Audit Trail
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, List
import json
import hashlib
@dataclass
class AuditEvent:
event_id: str
event_type: str # "data_ingestion", "license_scan", "training_start", etc.
timestamp: str
actor: str # user or system
resource: str # dataset, model, etc.
action: str
outcome: str # "success", "failure", "blocked"
details: dict
def to_dict(self) -> dict:
return asdict(self)
class ComplianceAuditor:
"""Maintain audit trail for compliance."""
def __init__(self, log_path: str = "audit_log.jsonl"):
self.log_path = log_path
def log_event(self, event: AuditEvent) -> None:
"""Append event to audit log."""
with open(self.log_path, 'a') as f:
f.write(json.dumps(event.to_dict()) + "\n")
def log_data_ingestion(
self,
dataset_name: str,
source: str,
license: str,
actor: str,
zone: str
) -> AuditEvent:
"""Log data ingestion event."""
event = AuditEvent(
event_id=self._generate_id(),
event_type="data_ingestion",
timestamp=datetime.utcnow().isoformat(),
actor=actor,
resource=dataset_name,
action="ingest",
outcome="success",
details={
"source": source,
"license": license,
"assigned_zone": zone
}
)
self.log_event(event)
return event
def log_training_run(
self,
model_name: str,
datasets: List[str],
actor: str,
config: dict
) -> AuditEvent:
"""Log training run event."""
event = AuditEvent(
event_id=self._generate_id(),
event_type="training_start",
timestamp=datetime.utcnow().isoformat(),
actor=actor,
resource=model_name,
action="train",
outcome="started",
details={
"datasets": datasets,
"config_hash": hashlib.sha256(json.dumps(config).encode()).hexdigest()[:12]
}
)
self.log_event(event)
return event
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())[:8]
def query_by_dataset(self, dataset_name: str) -> List[AuditEvent]:
"""Query all events related to a dataset."""
events = []
with open(self.log_path, 'r') as f:
for line in f:
event_dict = json.loads(line)
if (event_dict.get("resource") == dataset_name or
dataset_name in event_dict.get("details", {}).get("datasets", [])):
events.append(AuditEvent(**event_dict))
return events
32.4.9. Summary Checklist
| Step | Action | Owner | Frequency |
|---|---|---|---|
| 1 | Define license zones (Green/Yellow/Red/Black) | Legal + Platform | Once |
| 2 | Implement zone-based storage with IAM | Platform | Once |
| 3 | Set up license scanning in CI/CD | Platform | Once |
| 4 | Create attribution index for CC-BY data | Data Engineering | Ongoing |
| 5 | Maintain DataBOM for all training runs | ML Engineering | Per run |
| 6 | Implement takedown request handling | Legal + Platform | Ongoing |
| 7 | Add license metadata to released models | ML Engineering | Per release |
| 8 | Audit trail for compliance | Platform | Ongoing |
| 9 | Quarterly license compliance review | Legal | Quarterly |
| 10 | Update license classifications as law evolves | Legal | Bi-annually |
Decision Quick Reference
| If data is… | Then… | Risk Level |
|---|---|---|
| CC0/MIT/Apache | Use freely for commercial | ✅ Low |
| CC-BY | Use with attribution | ⚠️ Low-Medium |
| CC-BY-SA | Consult legal on model licensing | ⚠️ Medium |
| GPL/LGPL | Quarantine, consult legal | 🔴 High |
| CC-NC/ND | Do not use for commercial models | ⛔ Critical |
| Unknown source | Quarantine until verified | 🔴 High |
| Web scrape | Consult legal, consider robots.txt | 🔴 High |
[End of Section 32.4]