20.2 GCP Pipelines: Vertex AI Pipelines & Cloud Composer
Google Cloud Platform (GCP) approaches MLOps with a philosophy deeply rooted in its engineering heritage: everything is a container, and everything is scalable. While AWS provides a toolkit of primitives, GCP provides a platform heavily influenced by the internal tooling of Google DeepMind and Core Google Search.
This section covers the two giants of GCP orchestration:
- Vertex AI Pipelines: A fully managed implementation of the open-source Kubeflow Pipelines (KFP). This is the standard for modern ML workflows on GCP.
- Cloud Composer: A fully managed Apache Airflow environment. This is the bridge between traditional data engineering (DataOps) and machine learning (MLOps).
20.2.1. Vertex AI Pipelines: The Serverless ML Engine
Vertex AI Pipelines is serverless. Unlike the old days of deploying Kubeflow on a GKE cluster and managing the control plane, Vertex AI Pipelines allows you to submit a compiled pipeline specification, and Google runs it. You pay only for the compute used by the steps, plus a small invocation fee.
Architecture Overview
graph TB
subgraph "Development"
A[KFP SDK Python] --> B[Compile]
B --> C[pipeline.json]
end
subgraph "Vertex AI"
D[Pipeline Service] --> E[Argo Workflows]
E --> F[Custom Training]
E --> G[AutoML]
E --> H[Model Upload]
E --> I[Endpoint Deploy]
end
subgraph "Artifacts"
J[GCS: Pipeline Root]
K[Vertex AI Registry]
L[Vertex AI Experiments]
end
C -->|Submit| D
F --> J
H --> K
F --> L
Component Types Comparison
| Component Type | Build Time | Best For | Example |
|---|---|---|---|
| Lightweight Python | At compile | Python functions, quick iteration | Data validation |
| Custom Container | Manual Docker build | Complex dependencies, GPU workloads | Training |
| Pre-built Google | None | Standard Vertex AI operations | Model upload, deploy |
20.2.2. Deep Dive: KFP v2 SDK
The Complete Component Lifecycle
# kfp_v2_complete.py - Production-ready KFP components
from typing import NamedTuple, List, Dict, Optional
from kfp import dsl
from kfp.dsl import (
component,
Input,
Output,
Dataset,
Model,
Metrics,
Artifact,
ClassificationMetrics,
HTML
)
from kfp import compiler
# =============================================================================
# COMPONENT 1: Data Extraction from BigQuery
# =============================================================================
@component(
base_image="python:3.10-slim",
packages_to_install=[
"google-cloud-bigquery==3.13.0",
"pandas==2.0.3",
"pyarrow==14.0.0",
"db-dtypes==1.1.1"
]
)
def extract_training_data(
project_id: str,
dataset_id: str,
table_id: str,
sample_fraction: float,
output_dataset: Output[Dataset],
metadata: Output[Artifact]
) -> NamedTuple("Outputs", [("num_rows", int), ("num_features", int)]):
"""
Extract training data from BigQuery.
Implements:
- Sampling for development runs
- Schema validation
- Automatic artifact logging
"""
from google.cloud import bigquery
import pandas as pd
import json
from datetime import datetime
client = bigquery.Client(project=project_id)
# Query with sampling
query = f"""
SELECT * FROM `{project_id}.{dataset_id}.{table_id}`
WHERE RAND() < {sample_fraction}
"""
df = client.query(query).to_dataframe()
# Schema validation
required_columns = ['feature_1', 'feature_2', 'target']
missing = [c for c in required_columns if c not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Save dataset
df.to_parquet(output_dataset.path, index=False)
# Log metadata
meta = {
"extraction_time": datetime.utcnow().isoformat(),
"source_table": f"{project_id}.{dataset_id}.{table_id}",
"sample_fraction": sample_fraction,
"num_rows": len(df),
"num_features": len(df.columns) - 1, # Exclude target
"columns": list(df.columns),
"dtypes": {k: str(v) for k, v in df.dtypes.items()}
}
with open(metadata.path, 'w') as f:
json.dump(meta, f, indent=2)
return (len(df), len(df.columns) - 1)
# =============================================================================
# COMPONENT 2: Data Validation with Great Expectations
# =============================================================================
@component(
base_image="python:3.10-slim",
packages_to_install=[
"pandas==2.0.3",
"great-expectations==0.18.0",
"pyarrow==14.0.0"
]
)
def validate_data(
input_dataset: Input[Dataset],
validation_report: Output[HTML],
expectations_config: str
) -> NamedTuple("ValidationResult", [("passed", bool), ("failure_count", int)]):
"""
Validate data quality using Great Expectations.
"""
import pandas as pd
import json
import great_expectations as gx
from great_expectations.dataset import PandasDataset
# Load data
df = pd.read_parquet(input_dataset.path)
ge_df = PandasDataset(df)
# Parse expectations
expectations = json.loads(expectations_config)
results = []
# Run expectations
for exp in expectations:
method = getattr(ge_df, exp["expectation_type"])
result = method(**exp.get("kwargs", {}))
results.append({
"expectation": exp["expectation_type"],
"success": result["success"],
"details": str(result.get("result", {}))
})
# Generate HTML report
passed = all(r["success"] for r in results)
failures = sum(1 for r in results if not r["success"])
html = f"""
<html>
<head><title>Data Validation Report</title></head>
<body>
<h1>Data Validation Report</h1>
<p>Status: {"✅ PASSED" if passed else "❌ FAILED"}</p>
<p>Total Expectations: {len(results)}</p>
<p>Failures: {failures}</p>
<table border="1">
<tr><th>Expectation</th><th>Result</th><th>Details</th></tr>
"""
for r in results:
status = "✅" if r["success"] else "❌"
html += f"<tr><td>{r['expectation']}</td><td>{status}</td><td>{r['details'][:100]}</td></tr>"
html += "</table></body></html>"
with open(validation_report.path, 'w') as f:
f.write(html)
return (passed, failures)
# =============================================================================
# COMPONENT 3: Feature Engineering
# =============================================================================
@component(
base_image="gcr.io/deeplearning-platform-release/base-gpu.py310:latest",
packages_to_install=["pandas>=2.0", "scikit-learn>=1.3"]
)
def engineer_features(
input_dataset: Input[Dataset],
output_dataset: Output[Dataset],
feature_config: str
) -> NamedTuple("FeatureStats", [("num_features", int), ("feature_names", str)]):
"""
Engineer features with configurable transformations.
"""
import pandas as pd
import json
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import pickle
df = pd.read_parquet(input_dataset.path)
config = json.loads(feature_config)
# Numeric features
numeric_cols = config.get("numeric_features", [])
if numeric_cols:
scaler = StandardScaler()
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
# Categorical features
categorical_cols = config.get("categorical_features", [])
for col in categorical_cols:
dummies = pd.get_dummies(df[col], prefix=col, drop_first=True)
df = pd.concat([df.drop(columns=[col]), dummies], axis=1)
# Save
df.to_parquet(output_dataset.path, index=False)
feature_names = [c for c in df.columns if c != 'target']
return (len(feature_names), ",".join(feature_names[:10]))
# =============================================================================
# COMPONENT 4: Model Training (GPU)
# =============================================================================
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-gpu.2-13.py310:latest",
packages_to_install=["pandas>=2.0", "scikit-learn>=1.3"]
)
def train_model(
training_data: Input[Dataset],
model_artifact: Output[Model],
metrics: Output[Metrics],
classification_metrics: Output[ClassificationMetrics],
hyperparameters: str,
epochs: int = 50,
batch_size: int = 32
) -> NamedTuple("TrainingResult", [("best_epoch", int), ("best_loss", float)]):
"""
Train TensorFlow model with comprehensive logging.
"""
import pandas as pd
import tensorflow as tf
import json
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
# Load data
df = pd.read_parquet(training_data.path)
X = df.drop(columns=['target']).values
y = df['target'].values
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Parse hyperparameters
hparams = json.loads(hyperparameters)
# Build model
model = tf.keras.Sequential([
tf.keras.layers.Dense(
hparams.get("hidden_units", 128),
activation='relu',
input_shape=(X.shape[1],)
),
tf.keras.layers.Dropout(hparams.get("dropout", 0.3)),
tf.keras.layers.Dense(
hparams.get("hidden_units", 128) // 2,
activation='relu'
),
tf.keras.layers.Dropout(hparams.get("dropout", 0.3)),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer=tf.keras.optimizers.Adam(
learning_rate=hparams.get("learning_rate", 0.001)
),
loss='binary_crossentropy',
metrics=['accuracy', tf.keras.metrics.AUC(name='auc')]
)
# Train
history = model.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=epochs,
batch_size=batch_size,
callbacks=[
tf.keras.callbacks.EarlyStopping(
patience=5,
restore_best_weights=True
)
],
verbose=1
)
# Evaluate
y_pred = (model.predict(X_val) > 0.5).astype(int)
cm = confusion_matrix(y_val, y_pred)
# Log metrics
final_loss = min(history.history['val_loss'])
best_epoch = history.history['val_loss'].index(final_loss)
final_accuracy = history.history['val_accuracy'][best_epoch]
final_auc = history.history['val_auc'][best_epoch]
metrics.log_metric("val_loss", final_loss)
metrics.log_metric("val_accuracy", final_accuracy)
metrics.log_metric("val_auc", final_auc)
metrics.log_metric("epochs_trained", len(history.history['loss']))
# Log classification metrics
classification_metrics.log_confusion_matrix(
categories=["Negative", "Positive"],
matrix=cm.tolist()
)
# Save model
model.save(model_artifact.path)
return (best_epoch, final_loss)
# =============================================================================
# COMPONENT 5: Model Evaluation Gate
# =============================================================================
@component(
base_image="python:3.10-slim",
packages_to_install=["pandas>=2.0"]
)
def evaluate_model(
metrics_artifact: Input[Metrics],
min_accuracy: float,
min_auc: float
) -> NamedTuple("EvalResult", [("passed", bool), ("reason", str)]):
"""
Quality gate for model promotion.
"""
import json
# Read metrics
with open(metrics_artifact.path) as f:
metrics = json.load(f)
accuracy = metrics.get("val_accuracy", 0)
auc = metrics.get("val_auc", 0)
# Check thresholds
checks = []
if accuracy < min_accuracy:
checks.append(f"Accuracy {accuracy:.4f} < {min_accuracy}")
if auc < min_auc:
checks.append(f"AUC {auc:.4f} < {min_auc}")
passed = len(checks) == 0
reason = "All checks passed" if passed else "; ".join(checks)
return (passed, reason)
# =============================================================================
# PIPELINE DEFINITION
# =============================================================================
@dsl.pipeline(
name="churn-prediction-ct-pipeline",
description="Continuous Training pipeline for customer churn prediction"
)
def churn_ct_pipeline(
project_id: str = "my-project",
bq_dataset: str = "analytics",
bq_table: str = "customer_features",
sample_fraction: float = 1.0,
epochs: int = 50,
min_accuracy: float = 0.85,
min_auc: float = 0.80
):
"""
End-to-end CT pipeline with quality gates.
"""
# Data validation expectations
expectations = json.dumps([
{"expectation_type": "expect_column_to_exist", "kwargs": {"column": "target"}},
{"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "feature_1"}},
{"expectation_type": "expect_column_values_to_be_between", "kwargs": {"column": "feature_1", "min_value": 0, "max_value": 100}},
])
# Feature engineering config
feature_config = json.dumps({
"numeric_features": ["feature_1", "feature_2"],
"categorical_features": ["category"]
})
# Hyperparameters
hyperparams = json.dumps({
"hidden_units": 128,
"dropout": 0.3,
"learning_rate": 0.001
})
# Step 1: Extract data
extract_op = extract_training_data(
project_id=project_id,
dataset_id=bq_dataset,
table_id=bq_table,
sample_fraction=sample_fraction
)
# Step 2: Validate data
validate_op = validate_data(
input_dataset=extract_op.outputs["output_dataset"],
expectations_config=expectations
)
# Step 3: Feature engineering (only if validation passed)
with dsl.Condition(validate_op.outputs["passed"] == True):
feature_op = engineer_features(
input_dataset=extract_op.outputs["output_dataset"],
feature_config=feature_config
)
# Step 4: Train model
train_op = train_model(
training_data=feature_op.outputs["output_dataset"],
hyperparameters=hyperparams,
epochs=epochs
).set_cpu_limit("4").set_memory_limit("16G").set_gpu_limit(1)
# Step 5: Evaluate
eval_op = evaluate_model(
metrics_artifact=train_op.outputs["metrics"],
min_accuracy=min_accuracy,
min_auc=min_auc
)
# Step 6: Deploy (only if evaluation passed)
with dsl.Condition(eval_op.outputs["passed"] == True):
# Use Google Cloud components for deployment
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (
EndpointCreateOp, ModelDeployOp
)
upload_op = ModelUploadOp(
project=project_id,
display_name="churn-model",
artifact_uri=train_op.outputs["model_artifact"].uri,
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest"
)
# Check for existing endpoint or create new
endpoint_op = EndpointCreateOp(
project=project_id,
display_name="churn-prediction-endpoint",
)
deploy_op = ModelDeployOp(
model=upload_op.outputs["model"],
endpoint=endpoint_op.outputs["endpoint"],
dedicated_resources_machine_type="n1-standard-4",
dedicated_resources_min_replica_count=1,
dedicated_resources_max_replica_count=5,
traffic_percentage=100
)
# =============================================================================
# COMPILE AND RUN
# =============================================================================
import json
# Compile
compiler.Compiler().compile(
pipeline_func=churn_ct_pipeline,
package_path="churn_ct_pipeline.json"
)
# Submit
from google.cloud import aiplatform
def submit_pipeline(
project_id: str,
location: str,
pipeline_root: str,
service_account: str
):
"""Submit pipeline to Vertex AI."""
aiplatform.init(
project=project_id,
location=location
)
job = aiplatform.PipelineJob(
display_name="churn-ct-run",
template_path="churn_ct_pipeline.json",
pipeline_root=pipeline_root,
parameter_values={
"project_id": project_id,
"bq_dataset": "analytics",
"bq_table": "customer_features",
"sample_fraction": 0.1, # Dev mode
"epochs": 10,
"min_accuracy": 0.80,
"min_auc": 0.75
},
enable_caching=True
)
job.submit(
service_account=service_account,
network=f"projects/{project_id}/global/networks/default"
)
return job
20.2.3. Cloud Composer (Airflow): The DataOps Powerhouse
While Vertex AI Pipelines is built for the specific semantics of ML (Models, Metrics), Cloud Composer is built for the broad orchestration of the entire data estate.
When to Use What
graph TB
A[New ML Pipeline] --> B{Data Prep Needed?}
B -->|No| C[Vertex AI Pipelines Only]
B -->|Yes| D{Complex ETL?}
D -->|Simple| E[Vertex AI with DataPrep]
D -->|Complex| F[Composer + Vertex AI]
F --> G[Composer: ETL Orchestration]
G --> H[Vertex AI: ML Training]
The Hybrid Pattern: Airflow Triggers Vertex AI
# dags/ml_pipeline_orchestrator.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.operators.vertex_ai.pipeline_job import (
CreatePipelineJobOperator,
GetPipelineJobOperator
)
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
default_args = {
'owner': 'mlops-team',
'depends_on_past': False,
'email': ['ml-alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_pipeline_orchestrator',
default_args=default_args,
description='Orchestrate data prep and ML training',
schedule_interval='0 6 * * *', # Daily at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'production']
) as dag:
# =========================================================================
# PHASE 1: DATA PREPARATION (Airflow Domain)
# =========================================================================
with TaskGroup("data_preparation") as data_prep:
# Wait for upstream data
wait_for_data = GCSObjectExistenceSensor(
task_id='wait_for_raw_data',
bucket='raw-data-bucket',
object=f'events/dt={{{{ ds }}}}/data.parquet',
timeout=3600,
poke_interval=60
)
# Run dbt transformations
run_dbt = BashOperator(
task_id='dbt_run_features',
bash_command='''
cd /home/airflow/gcs/dags/dbt_project &&
dbt run --select marts.ml.customer_features \
--vars '{"run_date": "{{ ds }}"}'
'''
)
# Run data quality checks
run_dbt_test = BashOperator(
task_id='dbt_test_features',
bash_command='''
cd /home/airflow/gcs/dags/dbt_project &&
dbt test --select marts.ml.customer_features
'''
)
wait_for_data >> run_dbt >> run_dbt_test
# =========================================================================
# PHASE 2: ML TRAINING (Vertex AI Domain)
# =========================================================================
with TaskGroup("ml_training") as ml_training:
# Trigger Vertex AI Pipeline
trigger_training = CreatePipelineJobOperator(
task_id='trigger_vertex_pipeline',
project_id='{{ var.value.gcp_project }}',
location='us-central1',
display_name='churn-training-{{ ds_nodash }}',
template_path='gs://ml-pipelines/v2/churn_ct_pipeline.json',
parameter_values={
'project_id': '{{ var.value.gcp_project }}',
'bq_dataset': 'marts',
'bq_table': 'customer_features',
'sample_fraction': 1.0,
'epochs': 50,
'min_accuracy': 0.85,
'min_auc': 0.80
},
enable_caching=True
)
# Wait for pipeline completion
wait_for_training = GetPipelineJobOperator(
task_id='wait_for_training',
project_id='{{ var.value.gcp_project }}',
location='us-central1',
pipeline_job_id="{{ task_instance.xcom_pull(task_ids='ml_training.trigger_vertex_pipeline')['name'].split('/')[-1] }}",
deferrable=True, # Use Airflow 2.6+ deferrable operators
polling_period_seconds=60
)
trigger_training >> wait_for_training
# =========================================================================
# PHASE 3: POST-DEPLOYMENT VALIDATION
# =========================================================================
with TaskGroup("validation") as validation:
# Run smoke tests against new model
smoke_test = PythonOperator(
task_id='endpoint_smoke_test',
python_callable=run_endpoint_smoke_test,
op_kwargs={
'endpoint_id': '{{ var.value.churn_endpoint_id }}',
'project': '{{ var.value.gcp_project }}',
'location': 'us-central1'
}
)
# Update model metadata
update_registry = PythonOperator(
task_id='update_model_registry',
python_callable=update_model_metadata,
op_kwargs={
'training_date': '{{ ds }}',
'pipeline_run_id': "{{ task_instance.xcom_pull(task_ids='ml_training.trigger_vertex_pipeline')['name'] }}"
}
)
smoke_test >> update_registry
# =========================================================================
# DAG DEPENDENCIES
# =========================================================================
data_prep >> ml_training >> validation
# Helper functions
def run_endpoint_smoke_test(endpoint_id: str, project: str, location: str):
"""Run smoke tests against deployed model."""
from google.cloud import aiplatform
aiplatform.init(project=project, location=location)
endpoint = aiplatform.Endpoint(endpoint_id)
# Test prediction
test_instances = [
{"feature_1": 0.5, "feature_2": 0.3, "category_A": 1, "category_B": 0}
]
response = endpoint.predict(instances=test_instances)
# Validate response
assert len(response.predictions) == 1
assert 0 <= response.predictions[0] <= 1
print(f"Smoke test passed. Prediction: {response.predictions[0]}")
def update_model_metadata(training_date: str, pipeline_run_id: str):
"""Update model registry with training metadata."""
# Implementation depends on your registry
pass
20.2.4. Advanced Vertex AI Features
Caching and Lineage
Vertex AI automatically tracks metadata. If you run the pipeline twice with the same inputs, steps will “Cache Hit” and skip execution.
# Control caching behavior
# Disable caching for a specific component
@component(...)
def always_run_component(...):
pass
# In pipeline
op = always_run_component(...)
op.set_caching_options(False)
# Enable caching with custom staleness
op.set_caching_options(
enable_caching=True,
staleness_days=7 # Use cached result if < 7 days old
)
Pipeline Templates and Versioning
# Version and publish pipelines
from google.cloud import aiplatform
def publish_pipeline_template(
project: str,
location: str,
template_uri: str,
display_name: str,
version_tag: str
):
"""
Publish a versioned pipeline template.
Templates allow:
- Version control of pipelines
- Easy access for data scientists
- Consistent production runs
"""
aiplatform.init(project=project, location=location)
# Create template from local file
template = aiplatform.PipelineJobTemplate(
template_path=template_uri,
display_name=f"{display_name}-{version_tag}",
labels={
"version": version_tag,
"team": "mlops"
}
)
# The template is now accessible via:
# - Console UI
# - aiplatform.PipelineJob.from_template()
# - REST API
return template
Conditional Execution and Parallelism
# Advanced control flow in KFP
from kfp import dsl
@dsl.pipeline(name="advanced-control-flow")
def advanced_pipeline(model_type: str, run_parallel: bool):
# Conditional based on parameter
with dsl.Condition(model_type == "xgboost"):
xgb_train = train_xgboost(...)
with dsl.Condition(model_type == "tensorflow"):
tf_train = train_tensorflow(...)
# Parallel execution
with dsl.ParallelFor(items=["us", "eu", "asia"]) as region:
deploy_regional = deploy_model(region=region)
# Exit handler (always runs, even on failure)
with dsl.ExitHandler(cleanup_step()):
main_computation = heavy_training(...)
20.2.5. Terraform Infrastructure for Vertex AI
# vertex_ai_infrastructure.tf
# Enable required APIs
resource "google_project_service" "vertex_ai" {
for_each = toset([
"aiplatform.googleapis.com",
"ml.googleapis.com",
"bigquery.googleapis.com",
"storage.googleapis.com",
"cloudfunctions.googleapis.com",
"cloudscheduler.googleapis.com"
])
service = each.value
disable_on_destroy = false
}
# Service account for pipelines
resource "google_service_account" "vertex_pipelines" {
account_id = "vertex-pipelines-sa"
display_name = "Vertex AI Pipelines Service Account"
}
# IAM Roles
resource "google_project_iam_member" "vertex_roles" {
for_each = toset([
"roles/aiplatform.user",
"roles/bigquery.dataViewer",
"roles/bigquery.jobUser",
"roles/storage.objectViewer",
"roles/storage.objectCreator"
])
project = var.project_id
role = each.value
member = "serviceAccount:${google_service_account.vertex_pipelines.email}"
}
# Pipeline root bucket
resource "google_storage_bucket" "pipeline_root" {
name = "${var.project_id}-vertex-pipelines"
location = var.region
uniform_bucket_level_access = true
lifecycle_rule {
condition {
age = 90 # Clean up old pipeline artifacts
}
action {
type = "Delete"
}
}
}
# Model registry bucket
resource "google_storage_bucket" "model_artifacts" {
name = "${var.project_id}-model-artifacts"
location = var.region
uniform_bucket_level_access = true
versioning {
enabled = true
}
lifecycle_rule {
condition {
num_newer_versions = 5 # Keep last 5 versions
}
action {
type = "Delete"
}
}
}
# VPC Network for pipeline jobs
resource "google_compute_network" "vertex_network" {
name = "vertex-ai-network"
auto_create_subnetworks = false
}
resource "google_compute_subnetwork" "vertex_subnet" {
name = "vertex-ai-subnet"
ip_cidr_range = "10.0.0.0/24"
region = var.region
network = google_compute_network.vertex_network.id
private_ip_google_access = true
}
# Cloud Scheduler for scheduled pipelines
resource "google_cloud_scheduler_job" "daily_training" {
name = "daily-training-trigger"
description = "Triggers daily model retraining"
schedule = "0 6 * * *"
time_zone = "UTC"
http_target {
uri = google_cloudfunctions2_function.trigger_pipeline.service_config[0].uri
http_method = "POST"
body = base64encode(jsonencode({
pipeline_spec = "gs://${google_storage_bucket.pipeline_root.name}/templates/churn_ct_pipeline.json"
parameters = {
sample_fraction = 1.0
epochs = 50
}
}))
oidc_token {
service_account_email = google_service_account.vertex_pipelines.email
}
}
}
20.2.6. Comparison: Vertex AI Pipelines vs. Cloud Composer
| Feature | Vertex AI Pipelines | Cloud Composer (Airflow) |
|---|---|---|
| Engine | Argo (on K8s) - Serverless | Airflow (on GKE) - Managed Cluster |
| Billing | Pay-per-run | Always-on cluster cost |
| Data Passing | Artifact-based (GCS) | XComs (Small metadata) |
| ML Integration | Native (Models, Metrics) | Via operators |
| Caching | Built-in, automatic | Manual implementation |
| Visualization | ML-centric | Task-centric |
| Best For | Pure ML workflows | Data + ML orchestration |
20.2.7. Common Pitfalls
| Pitfall | Symptom | Solution |
|---|---|---|
| Large data in XComs | Airflow DB bloated | Use GCS artifacts |
| Wrong service account | Permission denied | Configure Workload Identity |
| Hardcoded regions | Pipeline breaks in new regions | Parameterize location |
| Missing GPU quota | Pipeline stuck pending | Request quota in advance |
| No caching strategy | Slow, expensive runs | Design for cacheability |
Conclusion
GCP offers a powerful but bifurcated orchestration story:
- Vertex AI Pipelines: Best for pure ML workflows with artifact-centric design
- Cloud Composer: Best for complex data orchestration with ML as one component
For most production systems, the recommended pattern is:
- Composer manages the data lifecycle (ETL, data quality)
- Vertex AI handles the ML lifecycle (training, evaluation, deployment)
- A single trigger connects them
[End of Section 20.2]