Google Vertex AI for MLOps
The BigQuery ML Team That Outgrew BigQuery
Your team has been running a churn prediction model in BigQuery ML for two years. It started as a proof of concept - a data analyst wrote a CREATE MODEL statement, connected it to a Looker dashboard, and suddenly the business had a churn score on every customer. Leadership loved it. The model got promoted from experiment to product without anyone noticing the transition.
Now it is running in production, updated monthly by a cron job, and influencing the retention team's outreach strategy for 800,000 users. And now the problems are showing. The model cannot be versioned properly. There is no way to A/B test a new algorithm against the current one without a complicated BigQuery table swap. The only monitoring is a weekly email from a data analyst who spot-checks aggregate accuracy. There is no feature store - every training run recomputes the same transformations from raw tables at a cost of $300 in BigQuery slot hours. When the ML team wants to experiment with gradient boosting or a neural network, BigQuery ML cannot help them.
You have been asked to migrate this workflow to something more capable. The company is already on GCP. The data lives in BigQuery. The obvious destination is Vertex AI - but the surface area is enormous. There is Vertex AI Workbench, Vertex AI Pipelines, Vertex AI Training, Vertex AI Prediction, Vertex AI Feature Store, Vertex AI Experiments, Vertex AI Model Registry, Vertex AI Matching Engine, and more. The documentation covers each service individually. It does not tell you how to connect them for your specific use case.
This lesson answers that question. We will build a mental model of the entire Vertex AI ecosystem, connect the pieces into a production MLOps workflow, and implement the BigQuery-to-Vertex migration with real Python SDK code. By the end, you will know exactly which Vertex AI service to reach for at each stage of the ML lifecycle, and why.
:::tip 🎮 Interactive Playground Visualize this concept: Try the Cloud ML Platforms Compared demo on the EngineersOfAI Playground - no code required. :::
Why This Exists - The Problem with GCP Before Vertex AI
Before Vertex AI, Google Cloud's ML offering was fragmented across a half-dozen separate products that did not talk to each other. There was Cloud ML Engine (later renamed AI Platform) for training. There was AI Platform Prediction for serving. There was AI Platform Pipelines (Kubeflow) for orchestration. There was Data Catalog for metadata. There was BigQuery for data, Cloud Storage for artifacts, and Cloud Monitoring for metrics. Each product had its own API, its own SDK, its own IAM model, and its own set of gotchas.
The result was that building a production ML pipeline on GCP required stitching together five or six products with glue code. Teams spent more time managing the integration than building ML. Google's enterprise customers - who were comparing GCP to AWS SageMaker's increasingly integrated story - gave direct feedback: this is too fragmented.
In May 2021, Google announced Vertex AI as a unified ML platform. The goal was to consolidate everything under one API surface, one SDK (google-cloud-aiplatform), one IAM model, and one console UI. Services that existed separately were absorbed and redesigned to integrate properly. New services - Feature Store, Experiments, Model Registry, Matching Engine - were added to fill gaps.
:::note Historical Context Vertex AI was announced at Google I/O 2021 and reached general availability in late 2021. It replaced AI Platform (formerly Cloud ML Engine), which had launched in 2017. The Kubeflow Pipelines project, which Vertex AI Pipelines is based on, was open-sourced by Google in 2018. Vizier, the hyperparameter optimization service, was described in a Google Research paper in 2017 and later productized as Vertex AI Vizier. :::
The Vertex AI Ecosystem
Vertex AI is a platform of interconnected services. For MLOps, the ones that matter most are:
Vertex AI Workbench
Vertex AI Workbench is the managed notebook environment. It replaces the old AI Platform Notebooks. There are two modes:
Managed Notebooks are fully managed by Google - automatic updates, idle shutdown, scheduled execution. Best for data scientists who want zero ops overhead.
User-Managed Notebooks give you more control - you can customize the VM, install system packages, attach GPU accelerators. Best for ML engineers who need specific environments.
For MLOps work, you will mostly use Workbench for development and pipeline authoring, then promote tested code into Vertex AI Pipelines for production execution.
# Environment setup for Vertex AI development
# Run from Workbench or any GCP-authenticated environment
pip install google-cloud-aiplatform kfp google-cloud-bigquery
import vertexai
from google.cloud import aiplatform
# Initialize with your project and region
PROJECT_ID = "my-ml-project"
REGION = "us-central1"
BUCKET = f"gs://{PROJECT_ID}-vertex-staging"
vertexai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET)
# Also initialize the lower-level aiplatform client
aiplatform.init(project=PROJECT_ID, location=REGION)
print(f"Vertex AI SDK version: {aiplatform.__version__}")
Vertex AI Custom Training
Prebuilt Containers vs Custom Containers
Vertex AI Training runs your code inside a container. You have two choices:
Prebuilt containers are maintained by Google and include common ML frameworks (TensorFlow, PyTorch, scikit-learn, XGBoost). You provide only your training script. This is the fastest path to a working training job.
Custom containers let you define the entire Docker image. Use these when you have unusual dependencies, need specific library versions, or want to use a framework Google does not prebuilt for.
# training/train.py - runs inside the Vertex AI container
import argparse
import os
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from google.cloud import bigquery, storage
import joblib
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--project", type=str, required=True)
parser.add_argument("--bq_dataset", type=str, required=True)
parser.add_argument("--bq_table", type=str, required=True)
parser.add_argument("--max_depth", type=int, default=6)
parser.add_argument("--n_estimators", type=int, default=100)
parser.add_argument("--learning_rate", type=float, default=0.1)
parser.add_argument("--model_dir", type=str,
default=os.environ.get("AIP_MODEL_DIR", "/tmp/model"))
return parser.parse_args()
def load_from_bigquery(project, dataset, table):
"""Load training data directly from BigQuery."""
client = bigquery.Client(project=project)
query = f"""
SELECT *
FROM `{project}.{dataset}.{table}`
WHERE split = 'train'
"""
df = client.query(query).to_dataframe()
print(f"Loaded {len(df):,} training rows from BigQuery")
return df
def train(args):
# Load data
df = load_from_bigquery(args.project, args.bq_dataset, args.bq_table)
feature_cols = [c for c in df.columns
if c not in ["customer_id", "churned", "split"]]
X = df[feature_cols]
y = df["churned"]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train XGBoost model
model = xgb.XGBClassifier(
max_depth=args.max_depth,
n_estimators=args.n_estimators,
learning_rate=args.learning_rate,
use_label_encoder=False,
eval_metric="logloss",
random_state=42,
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=20,
verbose=50,
)
# Evaluate
val_auc = roc_auc_score(y_val, model.predict_proba(X_val)[:, 1])
print(f"Validation AUC: {val_auc:.4f}")
# Save model - Vertex AI reads from AIP_MODEL_DIR
os.makedirs(args.model_dir, exist_ok=True)
model_path = os.path.join(args.model_dir, "model.joblib")
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")
return val_auc
if __name__ == "__main__":
args = parse_args()
train(args)
Launching a Custom Training Job
from google.cloud import aiplatform
def launch_custom_training_job(
project: str,
region: str,
display_name: str,
training_script: str,
staging_bucket: str,
bq_dataset: str,
bq_table: str,
machine_type: str = "n1-standard-8",
max_depth: int = 6,
n_estimators: int = 200,
learning_rate: float = 0.05,
):
aiplatform.init(project=project, location=region)
job = aiplatform.CustomTrainingJob(
display_name=display_name,
script_path=training_script,
container_uri="us-docker.pkg.dev/vertex-ai/training/xgboost-cpu.1-7:latest",
requirements=["xgboost==1.7.0", "scikit-learn", "joblib"],
model_serving_container_image_uri=(
"us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest"
),
)
model = job.run(
model_display_name=f"{display_name}-model",
args=[
f"--project={project}",
f"--bq_dataset={bq_dataset}",
f"--bq_table={bq_table}",
f"--max_depth={max_depth}",
f"--n_estimators={n_estimators}",
f"--learning_rate={learning_rate}",
],
replica_count=1,
machine_type=machine_type,
accelerator_type=None,
base_output_dir=f"{staging_bucket}/training-output/{display_name}",
sync=True, # Block until complete
)
print(f"Training complete. Model resource name: {model.resource_name}")
return model
model = launch_custom_training_job(
project="my-ml-project",
region="us-central1",
display_name="churn-xgboost-v1",
training_script="training/train.py",
staging_bucket="gs://my-ml-project-vertex-staging",
bq_dataset="ml_features",
bq_table="churn_training_data",
)
Vertex AI Vizier - Hyperparameter Tuning
Vertex AI Vizier is Google's hyperparameter optimization service. It uses Bayesian optimization by default, significantly more efficient than random search for expensive experiments.
from google.cloud import aiplatform
from google.cloud.aiplatform import hyperparameter_tuning as hpt
def run_hyperparameter_tuning(
project: str,
region: str,
training_script: str,
staging_bucket: str,
bq_dataset: str,
bq_table: str,
):
aiplatform.init(project=project, location=region)
# Define the metric to optimize
worker_pool_specs = [
{
"machine_spec": {"machine_type": "n1-standard-8"},
"replica_count": 1,
"python_package_spec": {
"executor_image_uri": (
"us-docker.pkg.dev/vertex-ai/training/xgboost-cpu.1-7:latest"
),
"package_uris": [f"{staging_bucket}/trainer.tar.gz"],
"python_module": "trainer.train",
"args": [
f"--project={project}",
f"--bq_dataset={bq_dataset}",
f"--bq_table={bq_table}",
],
},
}
]
hp_job = aiplatform.HyperparameterTuningJob(
display_name="churn-hptuning",
custom_job=aiplatform.CustomJob(
display_name="churn-hptuning-trial",
worker_pool_specs=worker_pool_specs,
),
metric_spec={"val_auc": "maximize"},
parameter_spec={
"max_depth": hpt.IntegerParameterSpec(min=3, max=10, scale="linear"),
"n_estimators": hpt.IntegerParameterSpec(min=50, max=500, scale="log"),
"learning_rate": hpt.DoubleParameterSpec(min=0.01, max=0.3, scale="log"),
},
max_trial_count=40,
parallel_trial_count=5, # Run 5 trials concurrently
search_algorithm=None, # None = Bayesian optimization (default)
)
hp_job.run(sync=True)
# Get best trial
best_trial = max(
hp_job.trials,
key=lambda t: t.final_measurement.metrics[0].value
)
print(f"Best AUC: {best_trial.final_measurement.metrics[0].value:.4f}")
print(f"Best params: {best_trial.parameters}")
return best_trial
Vertex AI Feature Store
The Feature Store solves a critical MLOps problem: the same features are often computed multiple times, by multiple teams, in subtly different ways. One team uses a 30-day revenue window; another uses 28 days. The results are slightly different, the models are inconsistent, and nobody knows why.
Vertex AI Feature Store centralizes feature computation and serving. Features are computed once, stored in the Feature Store, and served from there - both for training (batch serving from BigQuery) and for online inference (low-latency key-value lookup).
Setting Up the Feature Store
from google.cloud import aiplatform
def create_feature_store(project: str, region: str):
aiplatform.init(project=project, location=region)
# Create the Feature Store
fs = aiplatform.Featurestore.create(
featurestore_id="churn_features",
online_store_fixed_node_count=1, # Bigtable nodes for online serving
labels={"team": "ml-platform", "domain": "churn"},
sync=True,
)
print(f"Feature Store created: {fs.resource_name}")
# Create an entity type (like a table in the feature store)
customer_et = fs.create_entity_type(
entity_type_id="customer",
description="Customer-level features for churn prediction",
sync=True,
)
# Create individual features
customer_et.batch_create_features(
feature_configs={
"days_since_last_login": aiplatform.Feature.FeatureConfig(
value_type="INT64",
description="Days since customer last logged in",
),
"total_revenue_30d": aiplatform.Feature.FeatureConfig(
value_type="DOUBLE",
description="Total revenue in last 30 days",
),
"support_tickets_90d": aiplatform.Feature.FeatureConfig(
value_type="INT64",
description="Number of support tickets in 90 days",
),
"plan_tier": aiplatform.Feature.FeatureConfig(
value_type="STRING",
description="Current subscription tier",
),
"account_age_days": aiplatform.Feature.FeatureConfig(
value_type="INT64",
description="Days since account creation",
),
}
)
print("Features created successfully")
return fs, customer_et
def ingest_features_from_bigquery(
fs: "aiplatform.Featurestore",
entity_type: "aiplatform.EntityType",
bq_source: str,
):
"""Ingest pre-computed features from BigQuery into the Feature Store."""
entity_type.ingest_from_bq(
feature_ids=[
"days_since_last_login",
"total_revenue_30d",
"support_tickets_90d",
"plan_tier",
"account_age_days",
],
feature_time="feature_timestamp", # Column in BQ table with feature time
bq_source_uri=bq_source,
entity_id_field="customer_id", # Column in BQ table with entity ID
sync=True,
)
print("Feature ingestion complete")
def batch_serve_features_for_training(
project: str,
region: str,
output_bq_table: str,
):
"""Batch serve features for model training with point-in-time correctness."""
aiplatform.init(project=project, location=region)
fs = aiplatform.Featurestore("churn_features", project=project, location=region)
# Read entities + their timestamps from BigQuery (training labels table)
# This ensures point-in-time correctness: features are as-of the label time
fs.batch_serve_to_bq(
bq_destination_output_uri=output_bq_table,
serving_feature_ids={"customer": [
"days_since_last_login",
"total_revenue_30d",
"support_tickets_90d",
"plan_tier",
"account_age_days",
]},
read_instances_uri=f"bq://{project}.ml_labels.churn_labels",
# ^^^ BQ table with (customer_id, timestamp, churned) columns
sync=True,
)
print(f"Training features served to {output_bq_table}")
def online_feature_lookup(project: str, region: str, customer_ids: list):
"""Low-latency online feature lookup for real-time inference."""
aiplatform.init(project=project, location=region)
customer_et = aiplatform.EntityType(
entity_type_name="customer",
featurestore_name="churn_features",
)
features = customer_et.read(
entity_ids=customer_ids,
feature_ids=[
"days_since_last_login",
"total_revenue_30d",
"support_tickets_90d",
"plan_tier",
"account_age_days",
],
)
return features
Vertex AI Pipelines
Vertex AI Pipelines is the orchestration layer. It is built on Kubeflow Pipelines v2, which means pipelines are defined in Python using the KFP SDK, compiled to YAML, and submitted to Vertex AI for execution.
Vertex AI Pipelines vs Kubeflow Pipelines
| Dimension | Vertex AI Pipelines | Self-hosted Kubeflow |
|---|---|---|
| Infrastructure | Fully managed by Google | You manage Kubernetes |
| Scaling | Automatic | Manual cluster scaling |
| Cost model | Pay per pipeline step | Pay for cluster uptime |
| Integration | Deep Vertex AI integration | Generic Kubernetes |
| Portability | GCP-only | Cloud-agnostic |
For most teams on GCP, Vertex AI Pipelines is the right choice. The managed experience is significantly easier, and the integration with other Vertex AI services is seamless.
Building a Complete Pipeline
from kfp import dsl, compiler
from kfp.dsl import component, pipeline, Output, Input, Model, Dataset, Metrics
from google.cloud import aiplatform
# ── Component 1: Extract and validate features ──────────────────────────────
@component(
base_image="python:3.10-slim",
packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow"],
)
def extract_features(
project: str,
bq_source_table: str,
feature_store_name: str,
output_bq_table: str,
) -> None:
"""Batch serve features from Feature Store for this training run."""
from google.cloud import aiplatform
aiplatform.init(project=project, location="us-central1")
fs = aiplatform.Featurestore(feature_store_name, project=project, location="us-central1")
fs.batch_serve_to_bq(
bq_destination_output_uri=output_bq_table,
serving_feature_ids={"customer": [
"days_since_last_login", "total_revenue_30d",
"support_tickets_90d", "plan_tier", "account_age_days",
]},
read_instances_uri=f"bq://{bq_source_table}",
sync=True,
)
# ── Component 2: Train model ─────────────────────────────────────────────────
@component(
base_image="python:3.10-slim",
packages_to_install=[
"google-cloud-aiplatform", "google-cloud-bigquery",
"xgboost==1.7.0", "scikit-learn", "pandas", "pyarrow",
],
)
def train_model(
project: str,
region: str,
bq_features_table: str,
staging_bucket: str,
max_depth: int,
n_estimators: int,
learning_rate: float,
model: Output[Model],
metrics: Output[Metrics],
) -> None:
import pandas as pd
import xgboost as xgb
import joblib
import os
from google.cloud import bigquery
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
client = bigquery.Client(project=project)
df = client.query(f"SELECT * FROM `{bq_features_table}`").to_dataframe()
feature_cols = [c for c in df.columns
if c not in ["customer_id", "churned", "feature_timestamp"]]
X = df[feature_cols]
y = df["churned"]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
clf = xgb.XGBClassifier(
max_depth=max_depth,
n_estimators=n_estimators,
learning_rate=learning_rate,
use_label_encoder=False,
eval_metric="logloss",
)
clf.fit(X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=20)
val_auc = roc_auc_score(y_val, clf.predict_proba(X_val)[:, 1])
print(f"Validation AUC: {val_auc:.4f}")
# Log metrics to KFP
metrics.log_metric("val_auc", val_auc)
metrics.log_metric("n_features", len(feature_cols))
# Save model artifact
os.makedirs(model.path, exist_ok=True)
joblib.dump(clf, os.path.join(model.path, "model.joblib"))
# ── Component 3: Upload to Model Registry ───────────────────────────────────
@component(
base_image="python:3.10-slim",
packages_to_install=["google-cloud-aiplatform"],
)
def register_model(
project: str,
region: str,
display_name: str,
model: Input[Model],
staging_bucket: str,
min_auc_threshold: float = 0.75,
metrics: Input[Metrics] = None,
) -> str:
from google.cloud import aiplatform
# Check quality gate
val_auc = metrics.metadata.get("val_auc", 0) if metrics else 0
if val_auc < min_auc_threshold:
raise ValueError(
f"Model AUC {val_auc:.4f} below threshold {min_auc_threshold}. "
"Not registering."
)
aiplatform.init(project=project, location=region, staging_bucket=staging_bucket)
registered_model = aiplatform.Model.upload(
display_name=display_name,
artifact_uri=model.uri,
serving_container_image_uri=(
"us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest"
),
labels={"pipeline": "churn-training", "team": "ml-platform"},
)
print(f"Model registered: {registered_model.resource_name}")
return registered_model.resource_name
# ── Pipeline definition ──────────────────────────────────────────────────────
@pipeline(
name="churn-prediction-training",
description="End-to-end churn model training pipeline",
pipeline_root="gs://my-ml-project-vertex-staging/pipelines",
)
def churn_training_pipeline(
project: str = "my-ml-project",
region: str = "us-central1",
bq_labels_table: str = "my-ml-project.ml_labels.churn_labels",
bq_features_output: str = "my-ml-project.ml_features.training_batch",
feature_store_name: str = "churn_features",
staging_bucket: str = "gs://my-ml-project-vertex-staging",
max_depth: int = 6,
n_estimators: int = 200,
learning_rate: float = 0.05,
min_auc_threshold: float = 0.75,
):
# Step 1: Extract features
extract_task = extract_features(
project=project,
bq_source_table=bq_labels_table,
feature_store_name=feature_store_name,
output_bq_table=bq_features_output,
)
# Step 2: Train model (depends on feature extraction)
train_task = train_model(
project=project,
region=region,
bq_features_table=bq_features_output,
staging_bucket=staging_bucket,
max_depth=max_depth,
n_estimators=n_estimators,
learning_rate=learning_rate,
).after(extract_task)
# Step 3: Register model (depends on training)
register_task = register_model(
project=project,
region=region,
display_name="churn-xgboost",
model=train_task.outputs["model"],
staging_bucket=staging_bucket,
min_auc_threshold=min_auc_threshold,
metrics=train_task.outputs["metrics"],
)
# Compile and submit the pipeline
def submit_pipeline(project: str, region: str):
from kfp import compiler
from google.cloud import aiplatform
# Compile to YAML
compiler.Compiler().compile(
pipeline_func=churn_training_pipeline,
package_path="churn_pipeline.yaml",
)
# Submit to Vertex AI
aiplatform.init(project=project, location=region)
pipeline_job = aiplatform.PipelineJob(
display_name="churn-training-pipeline",
template_path="churn_pipeline.yaml",
pipeline_root=f"gs://{project}-vertex-staging/pipelines",
parameter_values={
"project": project,
"region": region,
"max_depth": 6,
"n_estimators": 300,
"learning_rate": 0.05,
},
enable_caching=True, # Cache steps that haven't changed
)
pipeline_job.submit()
pipeline_job.wait()
print(f"Pipeline complete. Status: {pipeline_job.state}")
return pipeline_job
Vertex AI Model Registry and Endpoints
Model Registry
The Model Registry stores model versions with metadata, labels, and lineage. Unlike a plain GCS bucket, it tracks which pipeline produced which model, with what parameters, achieving what metrics.
from google.cloud import aiplatform
def manage_model_versions(project: str, region: str):
aiplatform.init(project=project, location=region)
# List all versions of a model
models = aiplatform.Model.list(
filter='display_name="churn-xgboost"',
order_by="create_time desc",
)
for model in models:
print(f"Version: {model.version_id}")
print(f" Create time: {model.create_time}")
print(f" Labels: {model.labels}")
print(f" Resource: {model.resource_name}")
# Get the latest model
latest = models[0]
# Add version aliases for easy reference
# (In production, you'd promote to "champion" after validation)
latest.manage_labels(add_labels={"stage": "staging", "validated": "false"})
return latest
Deploying to an Online Endpoint
def deploy_model_to_endpoint(
project: str,
region: str,
model_name: str,
endpoint_name: str,
machine_type: str = "n1-standard-4",
min_replica_count: int = 1,
max_replica_count: int = 5,
traffic_split: dict = None,
):
aiplatform.init(project=project, location=region)
# Create or get endpoint
endpoints = aiplatform.Endpoint.list(
filter=f'display_name="{endpoint_name}"'
)
if endpoints:
endpoint = endpoints[0]
print(f"Using existing endpoint: {endpoint.resource_name}")
else:
endpoint = aiplatform.Endpoint.create(
display_name=endpoint_name,
description="Churn prediction endpoint",
labels={"team": "ml-platform", "model": "churn"},
)
print(f"Created endpoint: {endpoint.resource_name}")
# Get the model to deploy
models = aiplatform.Model.list(
filter=f'display_name="{model_name}"',
order_by="create_time desc",
)
model = models[0]
# Deploy with traffic split
# Traffic split: {deployed_model_id: percentage}
# For a new endpoint, omit traffic_split to get 100% to new model
endpoint.deploy(
model=model,
deployed_model_display_name=f"{model_name}-v{model.version_id}",
machine_type=machine_type,
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
accelerator_type=None,
traffic_split=traffic_split, # e.g., {"0": 80, "new_model_id": 20}
sync=True,
)
print(f"Model deployed. Endpoint: {endpoint.resource_name}")
return endpoint
def predict_churn(endpoint_resource_name: str, customer_features: list):
"""Make predictions using the deployed endpoint."""
aiplatform.init()
endpoint = aiplatform.Endpoint(endpoint_resource_name)
# Features must match the training order
instances = [
{
"days_since_last_login": feat["days_since_last_login"],
"total_revenue_30d": feat["total_revenue_30d"],
"support_tickets_90d": feat["support_tickets_90d"],
"plan_tier": feat["plan_tier"],
"account_age_days": feat["account_age_days"],
}
for feat in customer_features
]
response = endpoint.predict(instances=instances)
return response.predictions
Explainability with SHAP
Vertex AI Prediction supports built-in explainability via SHAP. You configure it at deployment time:
from google.cloud.aiplatform_v1.types import explanation as explanation_v1
def deploy_with_explanations(
project: str,
region: str,
model_resource_name: str,
endpoint_resource_name: str,
):
from google.cloud import aiplatform_v1
client = aiplatform_v1.EndpointServiceClient(
client_options={"api_endpoint": f"{region}-aiplatform.googleapis.com"}
)
# Explanation metadata maps feature names to input indices
explanation_metadata = explanation_v1.ExplanationMetadata(
inputs={
"features": explanation_v1.ExplanationMetadata.InputMetadata(
index_feature_mapping=[
"days_since_last_login",
"total_revenue_30d",
"support_tickets_90d",
"plan_tier_encoded",
"account_age_days",
]
)
},
outputs={
"churn_probability": explanation_v1.ExplanationMetadata.OutputMetadata()
},
)
# SHAP-based explanation parameters
explanation_spec = explanation_v1.ExplanationSpec(
metadata=explanation_metadata,
parameters=explanation_v1.ExplanationParameters(
sampled_shapley_attribution=explanation_v1.SampledShapleyAttribution(
path_count=25 # Number of SHAP samples - higher = more accurate
)
),
)
print("Explanation spec configured. Deploy model with this spec for SHAP explanations.")
return explanation_spec
Vertex AI Experiments and TensorBoard
Vertex AI Experiments tracks training runs and integrates with TensorBoard for visualization.
import vertexai
from vertexai.preview import experiments as vertex_experiments
def track_experiment(
project: str,
region: str,
experiment_name: str,
run_name: str,
params: dict,
metrics: dict,
):
vertexai.init(project=project, location=region, experiment=experiment_name)
with vertex_experiments.start_run(run=run_name) as run:
# Log hyperparameters
run.log_params(params)
# Log metrics
run.log_metrics(metrics)
# Log time series metrics (for TensorBoard)
for epoch in range(100):
run.log_time_series_metrics({
"train_loss": 0.5 * (0.97 ** epoch),
"val_auc": 0.65 + 0.15 * (1 - 0.97 ** epoch),
})
print(f"Run '{run_name}' logged to experiment '{experiment_name}'")
def compare_experiments(project: str, region: str, experiment_name: str):
"""Get a DataFrame of all runs in an experiment for comparison."""
vertexai.init(project=project, location=region)
experiment = vertex_experiments.Experiment(experiment_name)
runs_df = experiment.get_data_frame()
# Sort by validation AUC to find best run
if "metric.val_auc" in runs_df.columns:
runs_df = runs_df.sort_values("metric.val_auc", ascending=False)
print(f"Best run: {runs_df.iloc[0]['run_name']}")
print(f"Best AUC: {runs_df.iloc[0]['metric.val_auc']:.4f}")
return runs_df
AutoML Tables
For teams that want a strong baseline fast, Vertex AI AutoML Tables trains a high-quality tabular model without writing training code. It runs an automated search over model architectures and hyperparameters.
from google.cloud import aiplatform
def train_automl_model(project: str, region: str, bq_source: str):
aiplatform.init(project=project, location=region)
# Create a TabularDataset pointing to BigQuery
dataset = aiplatform.TabularDataset.create(
display_name="churn-automl-dataset",
bq_source=bq_source,
)
# Launch AutoML training
job = aiplatform.AutoMLTabularTrainingJob(
display_name="churn-automl-training",
optimization_prediction_type="classification",
optimization_objective="maximize-au-roc",
column_transformations=[
{"numeric": {"column_name": "days_since_last_login"}},
{"numeric": {"column_name": "total_revenue_30d"}},
{"numeric": {"column_name": "support_tickets_90d"}},
{"categorical": {"column_name": "plan_tier"}},
{"numeric": {"column_name": "account_age_days"}},
],
)
model = job.run(
dataset=dataset,
target_column="churned",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
budget_milli_node_hours=1000, # ~1 hour of compute
model_display_name="churn-automl-model",
disable_early_stopping=False,
sync=True,
)
print(f"AutoML training complete. Model: {model.resource_name}")
return model
:::tip When to Use AutoML vs Custom Training Use AutoML Tables when you need a strong baseline quickly, when you lack ML engineering time, or when the data science team is predominantly analysts. Use Custom Training when you need full control over the model architecture, when you are doing research, or when AutoML's best model isn't good enough. Many teams use AutoML as the benchmark that custom models must beat. :::
Production Engineering Notes
Pipeline Caching
Vertex AI Pipelines caches the output of each step. If you re-run a pipeline with the same inputs and same component code, cached steps are skipped. This is powerful for iterative development - change only the training component and it skips the data extraction step.
# Disable caching for specific steps when debugging
@component(...)
def my_component(...):
...
task = my_component(...)
task.set_caching_options(enable_caching=False)
IAM and Service Accounts
Vertex AI runs pipeline components under a service account. The default service account has limited permissions. For production, create a dedicated service account:
# Create service account
gcloud iam service-accounts create vertex-pipeline-sa \
--display-name="Vertex AI Pipeline Service Account"
# Grant required permissions
PROJECT_ID="my-ml-project"
SA="vertex-pipeline-sa@${PROJECT_ID}.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA}" \
--role="roles/aiplatform.user"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA}" \
--role="roles/bigquery.dataViewer"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA}" \
--role="roles/storage.objectAdmin"
Region Selection
Vertex AI is not available in all GCP regions. For production:
- us-central1 (Iowa) - most features, largest quota
- us-east4 (Virginia) - good for east-coast latency
- europe-west4 (Netherlands) - GDPR compliance
- asia-east1 (Taiwan) - Asia-Pacific
:::warning Data Residency
If your data is in BigQuery in us-central1, your Vertex AI pipeline should also run in us-central1. Cross-region egress for BigQuery batch exports is expensive. Always match your Vertex AI region to your BigQuery dataset location.
:::
Endpoint Traffic Splits for Blue-Green Deployment
def canary_deploy_new_model(
endpoint: "aiplatform.Endpoint",
new_model: "aiplatform.Model",
canary_traffic_pct: int = 10,
):
"""Deploy new model to endpoint with canary traffic split."""
# Get existing deployed model IDs
existing_deployed = endpoint.list_models()
current_model_id = existing_deployed[0].id if existing_deployed else None
# Deploy new model with canary traffic
if current_model_id:
traffic_split = {
current_model_id: 100 - canary_traffic_pct,
"0": canary_traffic_pct, # "0" means the newly deployed model
}
else:
traffic_split = None # 100% to new model
endpoint.deploy(
model=new_model,
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=5,
traffic_split=traffic_split,
sync=True,
)
print(f"Canary deployed: {canary_traffic_pct}% traffic to new model")
Common Mistakes
:::danger Storing Credentials in Pipeline Components Never hardcode GCP credentials or service account keys inside pipeline component functions. Components run in isolated containers. Use Workload Identity or pass credentials via Secret Manager. Hardcoded credentials are a security vulnerability and will fail when the key rotates.
# WRONG - never do this
@component(...)
def my_component():
import json
creds = json.loads('{"type": "service_account", "private_key": "..."}')
# RIGHT - rely on the service account attached to the pipeline job
@component(...)
def my_component(project: str):
from google.cloud import bigquery
client = bigquery.Client(project=project) # Uses workload identity
:::
:::danger Feature Store Training-Serving Skew The most insidious Feature Store mistake is using different feature computation logic for training and serving. Training reads from the offline store (BigQuery); serving reads from the online store (Bigtable). If you update the ingestion pipeline without reingesting to the online store, your training and serving features diverge.
Always ingest to both offline and online stores in the same pipeline step. Never update one without the other. :::
:::warning Pipeline Component Size Keep component Docker images small. Large images slow down pipeline startup because Vertex AI pulls the image fresh for each component execution. A common mistake is using a monolithic image with all dependencies when individual components need only a subset. Split heavy dependencies (PyTorch, TensorFlow) into separate components with appropriate base images. :::
:::warning Caching Stale Features
Vertex AI Pipeline caching is based on component inputs and code. If your extract_features component reads from a BigQuery table that changes daily, but the component signature hasn't changed, the cache will serve stale data. Pass a run_date parameter to force cache invalidation:
extract_task = extract_features(
project=project,
run_date=datetime.today().strftime("%Y-%m-%d"), # Breaks cache daily
...
)
:::
Interview Q&A
Q1: What is the difference between Vertex AI Pipelines and Kubeflow Pipelines?
Answer: They are closely related. Vertex AI Pipelines is Google's managed execution backend for KFP v2 pipelines. You write pipelines using the same KFP SDK - the same @component and @pipeline decorators, the same compiler.Compiler().compile() call. The difference is where the pipeline runs. With self-hosted Kubeflow, you manage the Kubernetes cluster, the Kubeflow installation, and the scaling. With Vertex AI Pipelines, Google manages all of that - you just submit a compiled pipeline YAML to the Vertex AI API. This means zero Kubernetes administration, automatic scaling, integration with other Vertex AI services, and per-step billing instead of cluster uptime billing. For teams on GCP who don't have a compelling reason for self-hosted Kubeflow, Vertex AI Pipelines is strictly better.
Q2: How does Vertex AI Feature Store handle point-in-time correctness for training data?
Answer: Point-in-time correctness means: when you look up features for a training example, you should only see features that were available at the time the label was generated, not future information. Vertex AI Feature Store handles this through the batch serving API. You provide a "read instances" table - typically your labels table - with (entity_id, timestamp) pairs. The Feature Store performs a point-in-time join: for each entity at each timestamp, it looks up the most recent feature values that existed at or before that timestamp. Features ingested after the label timestamp are excluded. This prevents label leakage and ensures your training distribution matches your serving distribution.
Q3: What is Vertex Vizier, and when should you use it instead of Optuna or Ray Tune?
Answer: Vertex Vizier is Google's managed hyperparameter optimization service, originally described in a 2017 Google Research paper. It supports Bayesian optimization, grid search, and random search. You should use Vizier when your training runs are expensive (hours, not minutes), when you want optimization to persist across pipeline runs and not restart from scratch, and when you want managed infrastructure without running an optimization server yourself. Optuna and Ray Tune are better choices for fast iteration during development - they run locally, have richer visualization, and have a larger community. The pragmatic pattern: use Optuna locally to narrow the search space, then use Vizier for the final large-scale tuning run that spans many expensive GPU jobs.
Q4: How do you implement a quality gate in a Vertex AI Pipeline that stops the pipeline if model accuracy is too low?
Answer: Raise an exception inside the component function. If a KFP component raises an exception, the pipeline step fails and all downstream steps are skipped. The clean pattern is to add a dedicated validate_model component that reads the metrics from the training step and raises ValueError if the metric is below threshold. You can also use KFP's dsl.Condition to conditionally run the registration and deployment steps based on the metric output, making the pipeline skip deployment while still completing successfully. The exception approach is simpler; the dsl.Condition approach gives you more flexibility (e.g., register the model but mark it as failed validation rather than crashing the whole pipeline).
Q5: How does traffic splitting on Vertex AI Endpoints work, and what is the right process for promoting a canary to full production?
Answer: A Vertex AI Endpoint can host multiple deployed models simultaneously, with traffic split across them by percentage. The split is configured as a dict of {deployed_model_id: traffic_percentage}. To do a canary release, deploy the new model to the existing endpoint with, say, a 10/90 split, monitor error rates and latency for 24-48 hours, then gradually shift traffic - 10%, 25%, 50%, 100%. To complete the promotion, update the traffic split to 100% for the new model and 0% for the old, then undeploy the old model to stop paying for it. The key operational detail: you update the traffic split and undeploy the old model as separate API calls. Always verify the new model is serving correctly before calling undeploy() on the old one - once undeployed, it is gone from the endpoint.
Q6: Describe a scenario where using Vertex AI AutoML is the wrong choice.
Answer: AutoML is wrong when you need interpretability at a level AutoML cannot provide. AutoML trains an ensemble of models - the exact architecture is opaque. If you are in a regulated domain (credit scoring, insurance, medical) where regulators require you to explain every prediction in terms of a linear combination of features with auditable coefficients, AutoML cannot satisfy that requirement. AutoML is also wrong when you need to incorporate domain-specific inductive biases - custom loss functions, structured prediction, or architectures that require modifying the training loop. And it is wrong when budget constraints are tight for iteration: AutoML experiments are expensive (1,000 for serious tuning). For quick baselines on non-regulated tabular problems with a team short on ML engineering time, AutoML is excellent.
