Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

20.2 GCP Pipelines: Vertex AI Pipelines & Cloud Composer

Google Cloud Platform (GCP) approaches MLOps with a philosophy deeply rooted in its engineering heritage: everything is a container, and everything is scalable. While AWS provides a toolkit of primitives, GCP provides a platform heavily influenced by the internal tooling of Google DeepMind and Core Google Search.

This section covers the two giants of GCP orchestration:

  1. Vertex AI Pipelines: A fully managed implementation of the open-source Kubeflow Pipelines (KFP). This is the standard for modern ML workflows on GCP.
  2. Cloud Composer: A fully managed Apache Airflow environment. This is the bridge between traditional data engineering (DataOps) and machine learning (MLOps).

20.2.1. Vertex AI Pipelines: The Serverless ML Engine

Vertex AI Pipelines is serverless. Unlike the old days of deploying Kubeflow on a GKE cluster and managing the control plane, Vertex AI Pipelines allows you to submit a compiled pipeline specification, and Google runs it. You pay only for the compute used by the steps, plus a small invocation fee.

Architecture Overview

graph TB
    subgraph "Development"
        A[KFP SDK Python] --> B[Compile]
        B --> C[pipeline.json]
    end
    
    subgraph "Vertex AI"
        D[Pipeline Service] --> E[Argo Workflows]
        E --> F[Custom Training]
        E --> G[AutoML]
        E --> H[Model Upload]
        E --> I[Endpoint Deploy]
    end
    
    subgraph "Artifacts"
        J[GCS: Pipeline Root]
        K[Vertex AI Registry]
        L[Vertex AI Experiments]
    end
    
    C -->|Submit| D
    F --> J
    H --> K
    F --> L

Component Types Comparison

Component TypeBuild TimeBest ForExample
Lightweight PythonAt compilePython functions, quick iterationData validation
Custom ContainerManual Docker buildComplex dependencies, GPU workloadsTraining
Pre-built GoogleNoneStandard Vertex AI operationsModel upload, deploy

20.2.2. Deep Dive: KFP v2 SDK

The Complete Component Lifecycle

# kfp_v2_complete.py - Production-ready KFP components

from typing import NamedTuple, List, Dict, Optional
from kfp import dsl
from kfp.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Model,
    Metrics,
    Artifact,
    ClassificationMetrics,
    HTML
)
from kfp import compiler


# =============================================================================
# COMPONENT 1: Data Extraction from BigQuery
# =============================================================================

@component(
    base_image="python:3.10-slim",
    packages_to_install=[
        "google-cloud-bigquery==3.13.0",
        "pandas==2.0.3",
        "pyarrow==14.0.0",
        "db-dtypes==1.1.1"
    ]
)
def extract_training_data(
    project_id: str,
    dataset_id: str,
    table_id: str,
    sample_fraction: float,
    output_dataset: Output[Dataset],
    metadata: Output[Artifact]
) -> NamedTuple("Outputs", [("num_rows", int), ("num_features", int)]):
    """
    Extract training data from BigQuery.
    
    Implements:
    - Sampling for development runs
    - Schema validation
    - Automatic artifact logging
    """
    from google.cloud import bigquery
    import pandas as pd
    import json
    from datetime import datetime
    
    client = bigquery.Client(project=project_id)
    
    # Query with sampling
    query = f"""
    SELECT * FROM `{project_id}.{dataset_id}.{table_id}`
    WHERE RAND() < {sample_fraction}
    """
    
    df = client.query(query).to_dataframe()
    
    # Schema validation
    required_columns = ['feature_1', 'feature_2', 'target']
    missing = [c for c in required_columns if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    
    # Save dataset
    df.to_parquet(output_dataset.path, index=False)
    
    # Log metadata
    meta = {
        "extraction_time": datetime.utcnow().isoformat(),
        "source_table": f"{project_id}.{dataset_id}.{table_id}",
        "sample_fraction": sample_fraction,
        "num_rows": len(df),
        "num_features": len(df.columns) - 1,  # Exclude target
        "columns": list(df.columns),
        "dtypes": {k: str(v) for k, v in df.dtypes.items()}
    }
    
    with open(metadata.path, 'w') as f:
        json.dump(meta, f, indent=2)
    
    return (len(df), len(df.columns) - 1)


# =============================================================================
# COMPONENT 2: Data Validation with Great Expectations
# =============================================================================

@component(
    base_image="python:3.10-slim",
    packages_to_install=[
        "pandas==2.0.3",
        "great-expectations==0.18.0",
        "pyarrow==14.0.0"
    ]
)
def validate_data(
    input_dataset: Input[Dataset],
    validation_report: Output[HTML],
    expectations_config: str
) -> NamedTuple("ValidationResult", [("passed", bool), ("failure_count", int)]):
    """
    Validate data quality using Great Expectations.
    """
    import pandas as pd
    import json
    import great_expectations as gx
    from great_expectations.dataset import PandasDataset
    
    # Load data
    df = pd.read_parquet(input_dataset.path)
    ge_df = PandasDataset(df)
    
    # Parse expectations
    expectations = json.loads(expectations_config)
    
    results = []
    
    # Run expectations
    for exp in expectations:
        method = getattr(ge_df, exp["expectation_type"])
        result = method(**exp.get("kwargs", {}))
        results.append({
            "expectation": exp["expectation_type"],
            "success": result["success"],
            "details": str(result.get("result", {}))
        })
    
    # Generate HTML report
    passed = all(r["success"] for r in results)
    failures = sum(1 for r in results if not r["success"])
    
    html = f"""
    <html>
    <head><title>Data Validation Report</title></head>
    <body>
        <h1>Data Validation Report</h1>
        <p>Status: {"✅ PASSED" if passed else "❌ FAILED"}</p>
        <p>Total Expectations: {len(results)}</p>
        <p>Failures: {failures}</p>
        <table border="1">
            <tr><th>Expectation</th><th>Result</th><th>Details</th></tr>
    """
    
    for r in results:
        status = "✅" if r["success"] else "❌"
        html += f"<tr><td>{r['expectation']}</td><td>{status}</td><td>{r['details'][:100]}</td></tr>"
    
    html += "</table></body></html>"
    
    with open(validation_report.path, 'w') as f:
        f.write(html)
    
    return (passed, failures)


# =============================================================================
# COMPONENT 3: Feature Engineering
# =============================================================================

@component(
    base_image="gcr.io/deeplearning-platform-release/base-gpu.py310:latest",
    packages_to_install=["pandas>=2.0", "scikit-learn>=1.3"]
)
def engineer_features(
    input_dataset: Input[Dataset],
    output_dataset: Output[Dataset],
    feature_config: str
) -> NamedTuple("FeatureStats", [("num_features", int), ("feature_names", str)]):
    """
    Engineer features with configurable transformations.
    """
    import pandas as pd
    import json
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    import pickle
    
    df = pd.read_parquet(input_dataset.path)
    config = json.loads(feature_config)
    
    # Numeric features
    numeric_cols = config.get("numeric_features", [])
    if numeric_cols:
        scaler = StandardScaler()
        df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
    
    # Categorical features
    categorical_cols = config.get("categorical_features", [])
    for col in categorical_cols:
        dummies = pd.get_dummies(df[col], prefix=col, drop_first=True)
        df = pd.concat([df.drop(columns=[col]), dummies], axis=1)
    
    # Save
    df.to_parquet(output_dataset.path, index=False)
    
    feature_names = [c for c in df.columns if c != 'target']
    
    return (len(feature_names), ",".join(feature_names[:10]))


# =============================================================================
# COMPONENT 4: Model Training (GPU)
# =============================================================================

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-gpu.2-13.py310:latest",
    packages_to_install=["pandas>=2.0", "scikit-learn>=1.3"]
)
def train_model(
    training_data: Input[Dataset],
    model_artifact: Output[Model],
    metrics: Output[Metrics],
    classification_metrics: Output[ClassificationMetrics],
    hyperparameters: str,
    epochs: int = 50,
    batch_size: int = 32
) -> NamedTuple("TrainingResult", [("best_epoch", int), ("best_loss", float)]):
    """
    Train TensorFlow model with comprehensive logging.
    """
    import pandas as pd
    import tensorflow as tf
    import json
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import classification_report, confusion_matrix
    import numpy as np
    
    # Load data
    df = pd.read_parquet(training_data.path)
    X = df.drop(columns=['target']).values
    y = df['target'].values
    
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Parse hyperparameters
    hparams = json.loads(hyperparameters)
    
    # Build model
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(
            hparams.get("hidden_units", 128),
            activation='relu',
            input_shape=(X.shape[1],)
        ),
        tf.keras.layers.Dropout(hparams.get("dropout", 0.3)),
        tf.keras.layers.Dense(
            hparams.get("hidden_units", 128) // 2,
            activation='relu'
        ),
        tf.keras.layers.Dropout(hparams.get("dropout", 0.3)),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            learning_rate=hparams.get("learning_rate", 0.001)
        ),
        loss='binary_crossentropy',
        metrics=['accuracy', tf.keras.metrics.AUC(name='auc')]
    )
    
    # Train
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[
            tf.keras.callbacks.EarlyStopping(
                patience=5,
                restore_best_weights=True
            )
        ],
        verbose=1
    )
    
    # Evaluate
    y_pred = (model.predict(X_val) > 0.5).astype(int)
    cm = confusion_matrix(y_val, y_pred)
    
    # Log metrics
    final_loss = min(history.history['val_loss'])
    best_epoch = history.history['val_loss'].index(final_loss)
    final_accuracy = history.history['val_accuracy'][best_epoch]
    final_auc = history.history['val_auc'][best_epoch]
    
    metrics.log_metric("val_loss", final_loss)
    metrics.log_metric("val_accuracy", final_accuracy)
    metrics.log_metric("val_auc", final_auc)
    metrics.log_metric("epochs_trained", len(history.history['loss']))
    
    # Log classification metrics
    classification_metrics.log_confusion_matrix(
        categories=["Negative", "Positive"],
        matrix=cm.tolist()
    )
    
    # Save model
    model.save(model_artifact.path)
    
    return (best_epoch, final_loss)


# =============================================================================
# COMPONENT 5: Model Evaluation Gate
# =============================================================================

@component(
    base_image="python:3.10-slim",
    packages_to_install=["pandas>=2.0"]
)
def evaluate_model(
    metrics_artifact: Input[Metrics],
    min_accuracy: float,
    min_auc: float
) -> NamedTuple("EvalResult", [("passed", bool), ("reason", str)]):
    """
    Quality gate for model promotion.
    """
    import json
    
    # Read metrics
    with open(metrics_artifact.path) as f:
        metrics = json.load(f)
    
    accuracy = metrics.get("val_accuracy", 0)
    auc = metrics.get("val_auc", 0)
    
    # Check thresholds
    checks = []
    
    if accuracy < min_accuracy:
        checks.append(f"Accuracy {accuracy:.4f} < {min_accuracy}")
    
    if auc < min_auc:
        checks.append(f"AUC {auc:.4f} < {min_auc}")
    
    passed = len(checks) == 0
    reason = "All checks passed" if passed else "; ".join(checks)
    
    return (passed, reason)


# =============================================================================
# PIPELINE DEFINITION
# =============================================================================

@dsl.pipeline(
    name="churn-prediction-ct-pipeline",
    description="Continuous Training pipeline for customer churn prediction"
)
def churn_ct_pipeline(
    project_id: str = "my-project",
    bq_dataset: str = "analytics",
    bq_table: str = "customer_features",
    sample_fraction: float = 1.0,
    epochs: int = 50,
    min_accuracy: float = 0.85,
    min_auc: float = 0.80
):
    """
    End-to-end CT pipeline with quality gates.
    """
    
    # Data validation expectations
    expectations = json.dumps([
        {"expectation_type": "expect_column_to_exist", "kwargs": {"column": "target"}},
        {"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "feature_1"}},
        {"expectation_type": "expect_column_values_to_be_between", "kwargs": {"column": "feature_1", "min_value": 0, "max_value": 100}},
    ])
    
    # Feature engineering config
    feature_config = json.dumps({
        "numeric_features": ["feature_1", "feature_2"],
        "categorical_features": ["category"]
    })
    
    # Hyperparameters
    hyperparams = json.dumps({
        "hidden_units": 128,
        "dropout": 0.3,
        "learning_rate": 0.001
    })
    
    # Step 1: Extract data
    extract_op = extract_training_data(
        project_id=project_id,
        dataset_id=bq_dataset,
        table_id=bq_table,
        sample_fraction=sample_fraction
    )
    
    # Step 2: Validate data
    validate_op = validate_data(
        input_dataset=extract_op.outputs["output_dataset"],
        expectations_config=expectations
    )
    
    # Step 3: Feature engineering (only if validation passed)
    with dsl.Condition(validate_op.outputs["passed"] == True):
        
        feature_op = engineer_features(
            input_dataset=extract_op.outputs["output_dataset"],
            feature_config=feature_config
        )
        
        # Step 4: Train model
        train_op = train_model(
            training_data=feature_op.outputs["output_dataset"],
            hyperparameters=hyperparams,
            epochs=epochs
        ).set_cpu_limit("4").set_memory_limit("16G").set_gpu_limit(1)
        
        # Step 5: Evaluate
        eval_op = evaluate_model(
            metrics_artifact=train_op.outputs["metrics"],
            min_accuracy=min_accuracy,
            min_auc=min_auc
        )
        
        # Step 6: Deploy (only if evaluation passed)
        with dsl.Condition(eval_op.outputs["passed"] == True):
            
            # Use Google Cloud components for deployment
            from google_cloud_pipeline_components.v1.model import ModelUploadOp
            from google_cloud_pipeline_components.v1.endpoint import (
                EndpointCreateOp, ModelDeployOp
            )
            
            upload_op = ModelUploadOp(
                project=project_id,
                display_name="churn-model",
                artifact_uri=train_op.outputs["model_artifact"].uri,
                serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest"
            )
            
            # Check for existing endpoint or create new
            endpoint_op = EndpointCreateOp(
                project=project_id,
                display_name="churn-prediction-endpoint",
            )
            
            deploy_op = ModelDeployOp(
                model=upload_op.outputs["model"],
                endpoint=endpoint_op.outputs["endpoint"],
                dedicated_resources_machine_type="n1-standard-4",
                dedicated_resources_min_replica_count=1,
                dedicated_resources_max_replica_count=5,
                traffic_percentage=100
            )


# =============================================================================
# COMPILE AND RUN
# =============================================================================

import json

# Compile
compiler.Compiler().compile(
    pipeline_func=churn_ct_pipeline,
    package_path="churn_ct_pipeline.json"
)

# Submit
from google.cloud import aiplatform

def submit_pipeline(
    project_id: str,
    location: str,
    pipeline_root: str,
    service_account: str
):
    """Submit pipeline to Vertex AI."""
    
    aiplatform.init(
        project=project_id,
        location=location
    )
    
    job = aiplatform.PipelineJob(
        display_name="churn-ct-run",
        template_path="churn_ct_pipeline.json",
        pipeline_root=pipeline_root,
        parameter_values={
            "project_id": project_id,
            "bq_dataset": "analytics",
            "bq_table": "customer_features",
            "sample_fraction": 0.1,  # Dev mode
            "epochs": 10,
            "min_accuracy": 0.80,
            "min_auc": 0.75
        },
        enable_caching=True
    )
    
    job.submit(
        service_account=service_account,
        network=f"projects/{project_id}/global/networks/default"
    )
    
    return job

20.2.3. Cloud Composer (Airflow): The DataOps Powerhouse

While Vertex AI Pipelines is built for the specific semantics of ML (Models, Metrics), Cloud Composer is built for the broad orchestration of the entire data estate.

When to Use What

graph TB
    A[New ML Pipeline] --> B{Data Prep Needed?}
    B -->|No| C[Vertex AI Pipelines Only]
    B -->|Yes| D{Complex ETL?}
    D -->|Simple| E[Vertex AI with DataPrep]
    D -->|Complex| F[Composer + Vertex AI]
    
    F --> G[Composer: ETL Orchestration]
    G --> H[Vertex AI: ML Training]

The Hybrid Pattern: Airflow Triggers Vertex AI

# dags/ml_pipeline_orchestrator.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.operators.vertex_ai.pipeline_job import (
    CreatePipelineJobOperator,
    GetPipelineJobOperator
)
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta

default_args = {
    'owner': 'mlops-team',
    'depends_on_past': False,
    'email': ['ml-alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}


with DAG(
    'ml_pipeline_orchestrator',
    default_args=default_args,
    description='Orchestrate data prep and ML training',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'production']
) as dag:

    # =========================================================================
    # PHASE 1: DATA PREPARATION (Airflow Domain)
    # =========================================================================
    
    with TaskGroup("data_preparation") as data_prep:
        
        # Wait for upstream data
        wait_for_data = GCSObjectExistenceSensor(
            task_id='wait_for_raw_data',
            bucket='raw-data-bucket',
            object=f'events/dt={{{{ ds }}}}/data.parquet',
            timeout=3600,
            poke_interval=60
        )
        
        # Run dbt transformations
        run_dbt = BashOperator(
            task_id='dbt_run_features',
            bash_command='''
                cd /home/airflow/gcs/dags/dbt_project &&
                dbt run --select marts.ml.customer_features \
                    --vars '{"run_date": "{{ ds }}"}'
            '''
        )
        
        # Run data quality checks
        run_dbt_test = BashOperator(
            task_id='dbt_test_features',
            bash_command='''
                cd /home/airflow/gcs/dags/dbt_project &&
                dbt test --select marts.ml.customer_features
            '''
        )
        
        wait_for_data >> run_dbt >> run_dbt_test
    
    # =========================================================================
    # PHASE 2: ML TRAINING (Vertex AI Domain)
    # =========================================================================
    
    with TaskGroup("ml_training") as ml_training:
        
        # Trigger Vertex AI Pipeline
        trigger_training = CreatePipelineJobOperator(
            task_id='trigger_vertex_pipeline',
            project_id='{{ var.value.gcp_project }}',
            location='us-central1',
            display_name='churn-training-{{ ds_nodash }}',
            template_path='gs://ml-pipelines/v2/churn_ct_pipeline.json',
            parameter_values={
                'project_id': '{{ var.value.gcp_project }}',
                'bq_dataset': 'marts',
                'bq_table': 'customer_features',
                'sample_fraction': 1.0,
                'epochs': 50,
                'min_accuracy': 0.85,
                'min_auc': 0.80
            },
            enable_caching=True
        )
        
        # Wait for pipeline completion
        wait_for_training = GetPipelineJobOperator(
            task_id='wait_for_training',
            project_id='{{ var.value.gcp_project }}',
            location='us-central1',
            pipeline_job_id="{{ task_instance.xcom_pull(task_ids='ml_training.trigger_vertex_pipeline')['name'].split('/')[-1] }}",
            deferrable=True,  # Use Airflow 2.6+ deferrable operators
            polling_period_seconds=60
        )
        
        trigger_training >> wait_for_training
    
    # =========================================================================
    # PHASE 3: POST-DEPLOYMENT VALIDATION
    # =========================================================================
    
    with TaskGroup("validation") as validation:
        
        # Run smoke tests against new model
        smoke_test = PythonOperator(
            task_id='endpoint_smoke_test',
            python_callable=run_endpoint_smoke_test,
            op_kwargs={
                'endpoint_id': '{{ var.value.churn_endpoint_id }}',
                'project': '{{ var.value.gcp_project }}',
                'location': 'us-central1'
            }
        )
        
        # Update model metadata
        update_registry = PythonOperator(
            task_id='update_model_registry',
            python_callable=update_model_metadata,
            op_kwargs={
                'training_date': '{{ ds }}',
                'pipeline_run_id': "{{ task_instance.xcom_pull(task_ids='ml_training.trigger_vertex_pipeline')['name'] }}"
            }
        )
        
        smoke_test >> update_registry
    
    # =========================================================================
    # DAG DEPENDENCIES
    # =========================================================================
    
    data_prep >> ml_training >> validation


# Helper functions
def run_endpoint_smoke_test(endpoint_id: str, project: str, location: str):
    """Run smoke tests against deployed model."""
    from google.cloud import aiplatform
    
    aiplatform.init(project=project, location=location)
    
    endpoint = aiplatform.Endpoint(endpoint_id)
    
    # Test prediction
    test_instances = [
        {"feature_1": 0.5, "feature_2": 0.3, "category_A": 1, "category_B": 0}
    ]
    
    response = endpoint.predict(instances=test_instances)
    
    # Validate response
    assert len(response.predictions) == 1
    assert 0 <= response.predictions[0] <= 1
    
    print(f"Smoke test passed. Prediction: {response.predictions[0]}")


def update_model_metadata(training_date: str, pipeline_run_id: str):
    """Update model registry with training metadata."""
    # Implementation depends on your registry
    pass

20.2.4. Advanced Vertex AI Features

Caching and Lineage

Vertex AI automatically tracks metadata. If you run the pipeline twice with the same inputs, steps will “Cache Hit” and skip execution.

# Control caching behavior

# Disable caching for a specific component
@component(...)
def always_run_component(...):
    pass

# In pipeline
op = always_run_component(...)
op.set_caching_options(False)

# Enable caching with custom staleness
op.set_caching_options(
    enable_caching=True,
    staleness_days=7  # Use cached result if < 7 days old
)

Pipeline Templates and Versioning

# Version and publish pipelines

from google.cloud import aiplatform

def publish_pipeline_template(
    project: str,
    location: str,
    template_uri: str,
    display_name: str,
    version_tag: str
):
    """
    Publish a versioned pipeline template.
    
    Templates allow:
    - Version control of pipelines
    - Easy access for data scientists
    - Consistent production runs
    """
    
    aiplatform.init(project=project, location=location)
    
    # Create template from local file
    template = aiplatform.PipelineJobTemplate(
        template_path=template_uri,
        display_name=f"{display_name}-{version_tag}",
        labels={
            "version": version_tag,
            "team": "mlops"
        }
    )
    
    # The template is now accessible via:
    # - Console UI
    # - aiplatform.PipelineJob.from_template()
    # - REST API
    
    return template

Conditional Execution and Parallelism

# Advanced control flow in KFP

from kfp import dsl

@dsl.pipeline(name="advanced-control-flow")
def advanced_pipeline(model_type: str, run_parallel: bool):
    
    # Conditional based on parameter
    with dsl.Condition(model_type == "xgboost"):
        xgb_train = train_xgboost(...)
    
    with dsl.Condition(model_type == "tensorflow"):
        tf_train = train_tensorflow(...)
    
    # Parallel execution
    with dsl.ParallelFor(items=["us", "eu", "asia"]) as region:
        deploy_regional = deploy_model(region=region)
    
    # Exit handler (always runs, even on failure)
    with dsl.ExitHandler(cleanup_step()):
        main_computation = heavy_training(...)

20.2.5. Terraform Infrastructure for Vertex AI

# vertex_ai_infrastructure.tf

# Enable required APIs
resource "google_project_service" "vertex_ai" {
  for_each = toset([
    "aiplatform.googleapis.com",
    "ml.googleapis.com",
    "bigquery.googleapis.com",
    "storage.googleapis.com",
    "cloudfunctions.googleapis.com",
    "cloudscheduler.googleapis.com"
  ])
  
  service            = each.value
  disable_on_destroy = false
}

# Service account for pipelines
resource "google_service_account" "vertex_pipelines" {
  account_id   = "vertex-pipelines-sa"
  display_name = "Vertex AI Pipelines Service Account"
}

# IAM Roles
resource "google_project_iam_member" "vertex_roles" {
  for_each = toset([
    "roles/aiplatform.user",
    "roles/bigquery.dataViewer",
    "roles/bigquery.jobUser",
    "roles/storage.objectViewer",
    "roles/storage.objectCreator"
  ])
  
  project = var.project_id
  role    = each.value
  member  = "serviceAccount:${google_service_account.vertex_pipelines.email}"
}

# Pipeline root bucket
resource "google_storage_bucket" "pipeline_root" {
  name     = "${var.project_id}-vertex-pipelines"
  location = var.region
  
  uniform_bucket_level_access = true
  
  lifecycle_rule {
    condition {
      age = 90  # Clean up old pipeline artifacts
    }
    action {
      type = "Delete"
    }
  }
}

# Model registry bucket
resource "google_storage_bucket" "model_artifacts" {
  name     = "${var.project_id}-model-artifacts"
  location = var.region
  
  uniform_bucket_level_access = true
  
  versioning {
    enabled = true
  }
  
  lifecycle_rule {
    condition {
      num_newer_versions = 5  # Keep last 5 versions
    }
    action {
      type = "Delete"
    }
  }
}

# VPC Network for pipeline jobs
resource "google_compute_network" "vertex_network" {
  name                    = "vertex-ai-network"
  auto_create_subnetworks = false
}

resource "google_compute_subnetwork" "vertex_subnet" {
  name          = "vertex-ai-subnet"
  ip_cidr_range = "10.0.0.0/24"
  region        = var.region
  network       = google_compute_network.vertex_network.id
  
  private_ip_google_access = true
}

# Cloud Scheduler for scheduled pipelines
resource "google_cloud_scheduler_job" "daily_training" {
  name        = "daily-training-trigger"
  description = "Triggers daily model retraining"
  schedule    = "0 6 * * *"
  time_zone   = "UTC"
  
  http_target {
    uri         = google_cloudfunctions2_function.trigger_pipeline.service_config[0].uri
    http_method = "POST"
    
    body = base64encode(jsonencode({
      pipeline_spec = "gs://${google_storage_bucket.pipeline_root.name}/templates/churn_ct_pipeline.json"
      parameters = {
        sample_fraction = 1.0
        epochs          = 50
      }
    }))
    
    oidc_token {
      service_account_email = google_service_account.vertex_pipelines.email
    }
  }
}

20.2.6. Comparison: Vertex AI Pipelines vs. Cloud Composer

FeatureVertex AI PipelinesCloud Composer (Airflow)
EngineArgo (on K8s) - ServerlessAirflow (on GKE) - Managed Cluster
BillingPay-per-runAlways-on cluster cost
Data PassingArtifact-based (GCS)XComs (Small metadata)
ML IntegrationNative (Models, Metrics)Via operators
CachingBuilt-in, automaticManual implementation
VisualizationML-centricTask-centric
Best ForPure ML workflowsData + ML orchestration

20.2.7. Common Pitfalls

PitfallSymptomSolution
Large data in XComsAirflow DB bloatedUse GCS artifacts
Wrong service accountPermission deniedConfigure Workload Identity
Hardcoded regionsPipeline breaks in new regionsParameterize location
Missing GPU quotaPipeline stuck pendingRequest quota in advance
No caching strategySlow, expensive runsDesign for cacheability

Conclusion

GCP offers a powerful but bifurcated orchestration story:

  • Vertex AI Pipelines: Best for pure ML workflows with artifact-centric design
  • Cloud Composer: Best for complex data orchestration with ML as one component

For most production systems, the recommended pattern is:

  1. Composer manages the data lifecycle (ETL, data quality)
  2. Vertex AI handles the ML lifecycle (training, evaluation, deployment)
  3. A single trigger connects them

[End of Section 20.2]