Skip to main content

:::tip šŸŽ® Interactive Playground Visualize this concept: Try the Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::

Dagster for Data Assets

The Day Nobody Knew What the Pipeline Produced​

An engineer ran an Airflow DAG. It succeeded. The green checkmark appeared. The team moved on.

But downstream, a data scientist opened their feature store and saw stale data. A model retraining job had kicked off against yesterday's features. An A/B test had been analyzing results computed from a broken join that nobody had noticed because the DAG said "success." The engineer who built the pipeline had moved to a different team six months ago. The documentation said "see the code." The code said nothing about which tables it wrote to, in what format, or what schema version they expected.

This is the fundamental problem with task-centric orchestration. A task DAG answers one question: did the computation run? It cannot answer the questions that actually matter in production: what data did it produce, is that data fresh, who depends on it, and is it correct? The DAG is a recipe, not an inventory. You can run a recipe successfully and still produce inedible food.

Dagster was built to invert this model entirely. Instead of defining tasks that happen to produce data as a side effect, you define data assets - the actual tables, files, and models that your pipeline produces - and Dagster figures out which computations to run to materialize them. The execution graph is derived from the asset dependency graph, not the other way around. When a pipeline runs, you're not asking "did task 7 succeed?" You're asking "is the user_features asset fresh, and does it satisfy its SLA?"

After migrating to Dagster, that team had a catalog. Every data asset had a known owner, a freshness SLA, a dependency graph visible in the UI, and a history of every materialization. When something broke, they could answer in 30 seconds which downstream assets were affected and which teams needed to be notified. The green checkmark meant something.


Why This Exists - The Problem With Task-Centric Orchestration​

Before Dagster, all major orchestrators shared the same mental model inherited from Apache Oozie and early Airflow: a pipeline is a directed acyclic graph of tasks. Each task is a unit of computation. Tasks have dependencies. The orchestrator runs tasks in dependency order and tracks which succeeded or failed.

This model works fine for simple pipelines. It breaks down at scale for several reasons:

There is no connection between tasks and their outputs. An Airflow task called load_user_events might write to prod.user_events, prod.event_counts, and three S3 paths. None of this is declared in the task definition. You discover it by reading the code, reading the docs (if they exist), or by tracing what broke when the task changed.

Re-runs are destructive by default. If you re-run a task, it overwrites whatever it wrote before. If another task already consumed that output, the consumer is now reading inconsistent data. Airflow has no built-in mechanism to coordinate this.

Freshness is invisible. When was prod.user_features last updated? Is it fresh enough for today's model run? Airflow tracks task execution time, but it doesn't track the state of what the task produced. You need a separate metadata store - and most teams don't have one.

Lineage is reconstructed manually. Knowing that trained_model depends on user_features which depends on raw_events requires reading three DAG definitions and tracing XCom messages. In Dagster, this lineage is explicit and first-class.

Dagster's founding insight, articulated in the Dagster Labs engineering blog in 2020, was that data pipelines are fundamentally about data, not about computation. The right abstraction is the asset - the data product - not the task that produces it.


Historical Context - From ETL Scripts to Asset Graphs​

The evolution of pipeline orchestration follows a clear arc toward higher-level abstractions.

1990s–2000s: Cron + Shell Scripts. ETL was shell scripts scheduled by cron. Dependencies were managed by file existence checks. Debugging meant SSH-ing into the server and reading log files.

2007–2010: Oozie and First-Generation DAG Orchestrators. Apache Oozie introduced the DAG model for Hadoop workflows. Tasks had explicit dependencies. The orchestrator tracked execution state. Still fundamentally task-centric, with no notion of what the task produced.

2014: Airflow. Maxime Beauchemin at Airbnb built Airflow to make pipeline definitions code rather than XML. The DAG-as-code model was a massive improvement in expressiveness and testability. Airflow dominated the market for nearly a decade. But it inherited the task-centric model from its predecessors.

2019: Prefect. Built as "Airflow for the modern Python era" - better dynamic DAGs, better local testing, better UI. Still task-centric at its core.

2019: Dagster. Nick Schrock (former Facebook infrastructure engineer) co-founded Elementl and built Dagster from the ground up with the asset model as its central concept. The key insight: every task should declare what it produces, and the orchestrator should use those declarations to build a data catalog, track freshness, and compute lineage automatically.

2021: dbt and the Asset Model Converge. dbt had independently arrived at a similar insight for the transformation layer: SQL models are assets with explicit dependencies. Dagster's dagster-dbt integration made the entire dbt project a set of first-class Dagster assets. This convergence validated the asset model as the right abstraction for modern data platforms.

2022–2024: The Asset Model Spreads. Airflow 2.4 introduced Dataset-triggered scheduling - a partial step toward asset awareness. Prefect introduced Artifacts. The industry was converging on the insight Dagster had built around from the start.


Core Concept - Software-Defined Assets​

A Software-Defined Asset (SDA) is a Python function decorated with @asset that declares:

  1. What it produces - the asset key (which becomes the asset's name in the catalog)
  2. What it depends on - other assets it needs as inputs (declared via function parameters)
  3. How to produce it - the computation logic in the function body
  4. Metadata about it - owner, description, freshness policy, tags

The @asset decorator is the fundamental building block. Here is the simplest possible asset:

from dagster import asset

@asset
def raw_events():
"""Fetch raw events from the source database."""
import pandas as pd
import sqlalchemy as sa

engine = sa.create_engine("postgresql://prod-db/events")
df = pd.read_sql("SELECT * FROM raw.events WHERE date = CURRENT_DATE", engine)
return df

This looks like a plain Python function. The decorator transforms it into a Dagster asset named raw_events. Dagster will track every time this asset is materialized, store its metadata, and make it available in the asset catalog.

Now define a downstream asset that depends on raw_events:

@asset
def user_features(raw_events):
"""Compute per-user aggregate features from raw events."""
return (
raw_events
.groupby("user_id")
.agg(
event_count=("event_id", "count"),
session_count=("session_id", "nunique"),
last_active=("timestamp", "max"),
avg_session_duration=("duration_seconds", "mean"),
)
.reset_index()
)

The function parameter raw_events is the dependency declaration. Dagster sees that user_features takes raw_events as input and automatically builds the dependency edge: to materialize user_features, Dagster must first materialize raw_events. You did not define a task graph - you defined what you wanted to compute, and Dagster derived the execution order.

@asset
def training_dataset(user_features):
"""Join user features with labels to build a training dataset."""
import pandas as pd
import sqlalchemy as sa

engine = sa.create_engine("postgresql://prod-db/labels")
labels = pd.read_sql("SELECT user_id, converted FROM labels.conversions", engine)
return user_features.merge(labels, on="user_id", how="inner")

@asset
def trained_model(training_dataset):
"""Train an XGBoost model on the training dataset."""
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import mlflow

X = training_dataset.drop(columns=["user_id", "converted"])
y = training_dataset["converted"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

with mlflow.start_run():
model = XGBClassifier(n_estimators=200, max_depth=6, learning_rate=0.1)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
mlflow.log_metric("accuracy", score)
mlflow.xgboost.log_model(model, "model")

return model

Four assets, four @asset decorators, zero explicit task graph definition. Dagster builds the entire execution plan from the dependency declarations.


The Asset Graph vs. The Task Graph​

The difference between Dagster's asset graph and a traditional task DAG is not just cosmetic. It is a fundamentally different answer to the question: what does this visualization represent?

The task DAG tells you: these computations ran, in this order. The asset graph tells you: these data products exist, these are their current states, and these are their dependencies. The asset graph is a living inventory of your data platform. The task DAG is a historical record of execution.

In the Dagster UI, each asset node shows:

  • Materialization status: fresh, stale, failed, never materialized
  • Last materialization time and the run that produced it
  • Metadata emitted during the last run (row count, schema, size, custom metrics)
  • Downstream assets affected by a re-materialization
  • Owner and freshness policy compliance

When raw_events is re-materialized, Dagster immediately marks user_features, training_dataset, and trained_model as stale. You don't need to know the dependency graph - Dagster knows it, and it tells you.


Resources - Injectable Dependencies​

Production assets need to connect to databases, S3, MLflow, and other services. Hardcoding these connections inside asset functions makes the code impossible to test locally and impossible to switch between environments.

Dagster's Resources solve this. A resource is a Python object - a database connection, an S3 client, an MLflow tracking server - that is declared at the job level and injected into assets at runtime.

from dagster import resource, asset, ResourceDefinition
import sqlalchemy as sa
import boto3

class DatabaseResource:
def __init__(self, connection_string: str):
self.engine = sa.create_engine(connection_string)

def query(self, sql: str):
import pandas as pd
return pd.read_sql(sql, self.engine)

def execute(self, sql: str):
with self.engine.begin() as conn:
conn.execute(sa.text(sql))

class S3Resource:
def __init__(self, bucket: str, prefix: str = ""):
self.client = boto3.client("s3")
self.bucket = bucket
self.prefix = prefix

def upload_dataframe(self, df, key: str):
import io
buffer = io.BytesIO()
df.to_parquet(buffer, index=False)
self.client.put_object(
Bucket=self.bucket,
Key=f"{self.prefix}/{key}",
Body=buffer.getvalue()
)

Now use these resources in assets by declaring them as parameters with type annotations:

from dagster import asset

@asset(required_resource_keys={"database", "s3"})
def raw_events(context):
df = context.resources.database.query(
"SELECT * FROM raw.events WHERE date = CURRENT_DATE"
)
context.resources.s3.upload_dataframe(df, "raw_events/today.parquet")
context.log.info(f"Fetched {len(df):,} events")
context.add_output_metadata({"row_count": len(df)})
return df

Configure different resource implementations per environment:

from dagster import Definitions

# Development: use local SQLite + local filesystem
dev_resources = {
"database": DatabaseResource(connection_string="sqlite:///dev.db"),
"s3": LocalFilesystemResource(base_path="/tmp/dagster-dev"),
}

# Production: use PostgreSQL + real S3
prod_resources = {
"database": DatabaseResource(
connection_string="postgresql://prod-db.internal/datawarehouse"
),
"s3": S3Resource(bucket="my-company-data", prefix="pipeline/v2"),
}

defs = Definitions(
assets=[raw_events, user_features, training_dataset, trained_model],
resources=prod_resources,
)

The asset code does not change between environments. Only the resource configuration changes. This is what makes Dagster assets genuinely testable without mocking infrastructure.


Asset Metadata - Making Assets Observable​

One of Dagster's most practical features is the ability to emit structured metadata during a materialization run. This metadata is stored in Dagster's event log and displayed in the UI next to each materialization.

from dagster import asset, Output, MetadataValue
import pandas as pd

@asset
def user_features(context, raw_events: pd.DataFrame) -> Output:
features = (
raw_events
.groupby("user_id")
.agg(
event_count=("event_id", "count"),
session_count=("session_id", "nunique"),
last_active=("timestamp", "max"),
)
.reset_index()
)

# Emit metadata that appears in the Dagster UI
return Output(
value=features,
metadata={
"row_count": MetadataValue.int(len(features)),
"columns": MetadataValue.json(list(features.columns)),
"preview": MetadataValue.md(features.head(5).to_markdown()),
"null_rate": MetadataValue.float(
features.isnull().sum().sum() / features.size
),
}
)

Every time user_features materializes, the UI shows the row count, column list, a preview of the first 5 rows, and the null rate. When the row count drops 40% compared to the previous materialization, you notice immediately. No separate monitoring tool required.


Jobs and Schedules - Materializing Asset Groups​

Assets are not automatically materialized - you define jobs that specify which assets to materialize, and schedules or sensors that trigger those jobs.

from dagster import define_asset_job, ScheduleDefinition, AssetSelection

# Materialize all assets in the feature engineering group
feature_engineering_job = define_asset_job(
name="feature_engineering_job",
selection=AssetSelection.groups("feature_engineering"),
)

# Materialize only the model training assets
model_training_job = define_asset_job(
name="model_training_job",
selection=AssetSelection.assets(training_dataset, trained_model),
)

# Run feature engineering daily at 2 AM
daily_features_schedule = ScheduleDefinition(
job=feature_engineering_job,
cron_schedule="0 2 * * *",
)

# Run model training weekly on Monday at 6 AM
weekly_training_schedule = ScheduleDefinition(
job=model_training_job,
cron_schedule="0 6 * * 1",
)

Sensors allow event-driven triggering - react to new files in S3, new rows in a database, or external API events:

from dagster import sensor, RunRequest, SensorEvaluationContext
import boto3

@sensor(job=feature_engineering_job, minimum_interval_seconds=300)
def new_events_sensor(context: SensorEvaluationContext):
"""Fire a run when new event data arrives in S3."""
s3 = boto3.client("s3")
cursor = context.cursor or "0"

response = s3.list_objects_v2(
Bucket="my-company-data",
Prefix="raw/events/",
)

new_objects = [
obj for obj in response.get("Contents", [])
if obj["LastModified"].timestamp() > float(cursor)
]

if new_objects:
latest_timestamp = max(obj["LastModified"].timestamp() for obj in new_objects)
context.update_cursor(str(latest_timestamp))
yield RunRequest(
run_key=str(latest_timestamp),
run_config={"ops": {"raw_events": {"config": {"mode": "incremental"}}}},
)

Partitions - Processing Data by Date Slice​

Most production data pipelines process data in time-based slices. Yesterday's events should not be mixed with the day before yesterday's. Dagster's partition system makes this first-class.

from dagster import asset, DailyPartitionsDefinition, Output, MetadataValue
import pandas as pd
from datetime import datetime

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partitions)
def raw_events(context) -> pd.DataFrame:
"""Fetch events for a specific date partition."""
partition_date = context.asset_partition_key_for_output()
date = datetime.strptime(partition_date, "%Y-%m-%d").date()

import sqlalchemy as sa
engine = sa.create_engine("postgresql://prod-db/events")
df = pd.read_sql(
f"SELECT * FROM raw.events WHERE event_date = '{date}'",
engine
)
context.log.info(f"Loaded {len(df):,} events for partition {partition_date}")
return df

@asset(partitions_def=daily_partitions)
def user_features(context, raw_events: pd.DataFrame) -> pd.DataFrame:
"""Compute features for the same date partition as raw_events."""
partition_date = context.asset_partition_key_for_output()

features = raw_events.groupby("user_id").agg(
event_count=("event_id", "count"),
total_spend=("amount", "sum"),
).reset_index()
features["feature_date"] = partition_date
return features

With partitions defined, you can backfill historical data by selecting a date range in the Dagster UI and triggering parallel materialization across all selected partitions. Dagster tracks which partitions are materialized and which are missing or stale.

from dagster import define_asset_job, AssetSelection

# Job that materializes a specific partition range
backfill_job = define_asset_job(
name="backfill_features",
selection=AssetSelection.assets(raw_events, user_features),
partitions_def=daily_partitions,
)

Partition-aware dependencies work correctly: when you materialize user_features for 2024-01-15, Dagster knows it needs raw_events for 2024-01-15 - not the latest raw_events partition.


Dagster + dbt - First-Class Integration​

dbt has independently converged on the asset model: SQL models are named, versioned, tested data products with explicit dependencies. Dagster's dagster-dbt integration makes every dbt model a first-class Dagster asset with zero manual wrapping code.

from dagster import Definitions
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
from pathlib import Path

# Point Dagster at your dbt project
dbt_project = DbtProject(
project_dir=Path("/opt/dagster/app/dbt_project"),
packaged_project_dir=Path("/opt/dagster/app/dbt_project"),
)

@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context, dbt: DbtCliResource):
"""All dbt models as Dagster assets."""
yield from dbt.cli(["build"], context=context).stream()

# Define upstream Python assets that feed into dbt
@asset
def raw_user_events():
"""Python asset that writes to the raw schema that dbt reads from."""
import pandas as pd
df = pd.read_csv("s3://my-bucket/events/today.csv")
df.to_sql("user_events", engine, schema="raw", if_exists="replace", index=False)
return df

defs = Definitions(
assets=[raw_user_events, my_dbt_assets],
resources={
"dbt": DbtCliResource(project_dir=dbt_project),
},
)

With this setup, the Dagster UI shows a unified asset graph where your Python ingestion assets flow into your dbt transformation models. Lineage spans the Python-SQL boundary. If raw_user_events is re-materialized, all downstream dbt models are marked stale.


Testing Dagster Assets - No Infrastructure Required​

Testing Airflow DAGs requires standing up a database, a scheduler, and often mocking half the Airflow internals. Testing Dagster assets requires calling a function.

# tests/test_feature_engineering.py
import pytest
import pandas as pd
from dagster import materialize, ResourceDefinition
from my_pipeline.assets import raw_events, user_features, training_dataset

def make_test_events():
return pd.DataFrame({
"event_id": range(100),
"user_id": [f"user_{i % 10}" for i in range(100)],
"session_id": [f"session_{i // 5}" for i in range(100)],
"event_date": ["2024-01-15"] * 100,
"amount": [10.0 + i * 0.5 for i in range(100)],
"timestamp": pd.date_range("2024-01-15", periods=100, freq="1min"),
"duration_seconds": [30 + i for i in range(100)],
})

def test_user_features_shape():
"""user_features should have one row per user."""
events = make_test_events()
result = materialize(
[raw_events, user_features],
resources={
"database": ResourceDefinition.mock_resource(),
},
)
# For pure Python assets with no resource dependencies:
features = user_features(events)
assert len(features) == 10 # 10 unique users
assert "event_count" in features.columns
assert features["event_count"].sum() == 100

def test_no_null_user_ids():
"""user_features must never have null user_ids."""
events = make_test_events()
features = user_features(events)
assert features["user_id"].notna().all()

def test_feature_counts_are_positive():
"""All aggregate counts must be positive."""
events = make_test_events()
features = user_features(events)
assert (features["event_count"] > 0).all()

def test_full_pipeline_with_materialize():
"""Full pipeline integration test using Dagster's materialize() helper."""
result = materialize(
assets=[raw_events, user_features, training_dataset],
resources={
"database": ResourceDefinition.hardcoded_resource(
make_synthetic_database()
),
},
)
assert result.success

The materialize() function runs the full asset graph in-process, with substitutable resources. No Dagster daemon required. No Airflow webserver required. Just Python.


Dagster vs. Airflow vs. Prefect - Decision Matrix​

DimensionAirflowPrefectDagster
Core abstractionTask DAGFlow + TaskSoftware-Defined Asset
Data lineageNone (manual)Artifacts (partial)First-class, automatic
Freshness trackingNoneNoneBuilt-in freshness policies
TestingHard (mock everything)Moderate (flow.run())Easy (materialize())
Local developmentComplex setupSimpleSimple
PartitionsVia XCom + templatesVia parametersFirst-class, partition-aware deps
dbt integrationdbt-airflow (task-level)prefect-dbt (task-level)dagster-dbt (asset-level)
UIBasic DAG viewGood flow viewBest-in-class asset catalog
Sensor ecosystemVery largeModerateGrowing
Operational maturityVery high (10+ years)HighHigh
Managed hostingAstronomerPrefect CloudDagster Cloud
Best forLarge teams, complex sensorsPython-first rapid devAsset-heavy, dbt shops

:::tip When to Choose Dagster Choose Dagster when your team cares about data quality and lineage as much as execution. If you run dbt and want to orchestrate it alongside Python ingestion and model training with a unified lineage view, Dagster is the strongest choice available in 2025. :::

:::warning When Dagster May Not Be Right Dagster's asset model adds cognitive overhead for very simple pipelines. If you are running a handful of shell scripts on a schedule, Dagster's concepts (assets, resources, partitions, jobs, sensors) are more complexity than the problem warrants. Prefect or even a simple cron job may be more appropriate. :::


Mermaid Diagram - Asset Graph vs. Task DAG​


Production Engineering Notes​

Start with asset groups. Organize assets into logical groups using the group_name parameter on @asset. This lets you materialize a coherent subset of your pipeline, define group-level schedules, and navigate large asset catalogs more easily.

@asset(group_name="feature_engineering", owners=["[email protected]"])
def user_features(raw_events):
...

@asset(group_name="model_training", owners=["[email protected]"])
def trained_model(training_dataset):
...

Use FreshnessPolicy to declare SLAs. This makes the freshness tracking in the UI meaningful:

from dagster import asset, FreshnessPolicy

@asset(
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 25) # 25 hours
)
def user_features(raw_events):
...

Emit row count metadata on every materialization. This single practice catches most data quality issues before they become incidents. Row count drops of 20%+ relative to the previous day are almost always bugs.

Use io_managers for large assets. By default, Dagster passes asset values between steps in memory. For large DataFrames or models, configure a Parquet or S3 I/O manager to serialize outputs to storage between steps.

from dagster import Definitions
from dagster_aws.s3 import S3PickleIOManager, S3Resource

defs = Definitions(
assets=[raw_events, user_features, trained_model],
resources={
"io_manager": S3PickleIOManager(
s3_resource=S3Resource(),
s3_bucket="my-company-dagster-io",
s3_prefix="pipeline-outputs",
),
},
)

Common Mistakes​

:::danger Naming Assets by Their Computation, Not Their Output Naming an asset compute_user_features instead of user_features defeats the asset model. Assets are named after what they produce. The computation is the implementation detail. The data product is the name. :::

:::danger Ignoring Resource Injection for Database Connections Hardcoding database connection strings inside asset functions means you cannot test locally without a production database and cannot switch environments without changing code. Always inject connections via resources. :::

:::warning Materializing the Entire Asset Graph on Every Run A single AssetSelection.all() job that materializes everything on a schedule is an anti-pattern. Different assets have different freshness requirements. Define separate jobs with appropriate schedules: hourly for critical raw ingestion, daily for feature engineering, weekly for model retraining. :::

:::warning Not Emitting Metadata Assets that emit no metadata become black boxes in the UI. At minimum, every asset should emit row count and materialization timestamp. This transforms the Dagster UI from a pretty graph into a genuine data observability tool. :::


Interview Questions and Answers​

Q1: What is a Software-Defined Asset in Dagster, and how does it differ from an Airflow task?

An Airflow task defines a unit of computation - what code to run. It does not declare what data it produces or consumes. A Dagster Software-Defined Asset declares a specific data product that exists in your system: which table, file, or model it represents. The @asset decorator captures the name, ownership, freshness policy, and dependency graph of that data product. The key difference: an Airflow task DAG shows you whether computations ran; a Dagster asset graph shows you whether your data products are fresh and correct.

Q2: How does Dagster infer execution order if you never define a task graph?

Dagster infers execution order from Python function signatures. When user_features(raw_events) is decorated with @asset, Dagster sees that user_features takes raw_events as a parameter and automatically creates a dependency edge: to materialize user_features, Dagster must first materialize raw_events. The execution plan is derived from the asset dependency graph, which is itself derived from function parameters.

Q3: How do Dagster resources enable environment-independent asset code?

Resources encapsulate infrastructure dependencies - database connections, S3 clients, MLflow tracking URIs. Assets declare which resources they need, but they do not construct those resources themselves. The Dagster Definitions object maps resource keys to concrete implementations for each deployment environment. Development uses SQLite and local filesystem; production uses PostgreSQL and S3. The asset code is identical in both environments.

Q4: What is partition-aware dependency resolution, and why does it matter for daily pipelines?

When assets are defined with a DailyPartitionsDefinition, each materialization is associated with a specific date partition. Partition-aware dependency resolution means that when you materialize user_features for 2024-01-15, Dagster automatically requests the raw_events asset for 2024-01-15 - not the latest available partition. Without this, you risk computing features for January 15th using events from January 16th, introducing silent time leakage bugs into your feature engineering.

Q5: How does the dagster-dbt integration work, and what lineage does it provide?

@dbt_assets converts the entire dbt project manifest into a set of Dagster assets. Each dbt model becomes an asset node in the Dagster asset graph with the same key as the dbt model name. Dependencies between dbt models are reflected as asset dependencies in Dagster. The result is a unified lineage graph that spans Python ingestion assets, dbt transformation models, and downstream ML assets - all in the same Dagster UI with consistent freshness tracking and ownership metadata.

Q6: What does materialize() do in Dagster tests, and why is it better than Airflow's testing approach?

materialize() runs a specified set of assets in-process, with injectable test resources, and returns a MaterializeResult you can assert on. It does not require a running Dagster daemon, a database, or any external infrastructure. You pass in mock or test-double resources and call the function. Airflow tests require setting up a test database, mocking the XCom backend, patching operators, and often running a full DAG with dag.test(). Dagster's approach makes pipeline testing as simple as testing any other Python function.

Q7: When would you NOT use Dagster?

Three scenarios: (1) Your team already has deep Airflow expertise and a large library of custom Airflow operators - migrating means rebuilding that operator library. (2) Your pipelines are simple enough that the asset model adds more cognitive overhead than value - three cron-scheduled scripts don't need a full asset catalog. (3) You need Airflow's mature sensor ecosystem - Airflow has 1000+ built-in operators and sensors; Dagster's library is growing but narrower.


Advanced Dagster - Asset Checks and Auto-Materialization​

Asset Checks - Inline Data Quality Assertions​

Dagster 1.5+ introduced asset checks: data quality assertions that are attached directly to an asset and run after the asset is materialized. Unlike external validation tools that are bolted on as a separate pipeline step, asset checks are first-class in the Dagster asset graph. They appear alongside the asset in the UI, have their own pass/fail history, and can block downstream materialization when they fail.

from dagster import asset, asset_check, AssetCheckResult
import pandas as pd

@asset
def user_features(raw_events: pd.DataFrame) -> pd.DataFrame:
return (
raw_events
.groupby("user_id")
.agg(event_count=("event_id", "count"), total_spend=("amount", "sum"))
.reset_index()
)

@asset_check(asset=user_features, blocking=True)
def user_features_no_nulls(user_features: pd.DataFrame) -> AssetCheckResult:
"""Fail downstream materialization if user_features has any null user_ids."""
null_count = user_features["user_id"].isna().sum()
return AssetCheckResult(
passed=null_count == 0,
metadata={"null_count": null_count},
description=f"{null_count} null user_ids found" if null_count else "No nulls",
)

@asset_check(asset=user_features, blocking=True)
def user_features_row_count(user_features: pd.DataFrame) -> AssetCheckResult:
"""Verify the feature count is within expected range."""
row_count = len(user_features)
passed = 100 <= row_count <= 10_000_000
return AssetCheckResult(
passed=passed,
metadata={"row_count": row_count},
description=f"Row count {row_count} {'within' if passed else 'outside'} expected range",
)

@asset_check(asset=user_features, blocking=False) # warning only
def user_features_spend_distribution(user_features: pd.DataFrame) -> AssetCheckResult:
"""Warn if average spend per user seems anomalous."""
avg_spend = user_features["total_spend"].mean()
passed = 1.0 <= avg_spend <= 10_000.0
return AssetCheckResult(
passed=passed,
severity=AssetCheckSeverity.WARN,
metadata={"avg_spend": round(float(avg_spend), 2)},
)

When user_features_no_nulls or user_features_row_count fail (both are blocking=True), Dagster marks the user_features asset as failed and prevents training_dataset and trained_model from being materialized. The failure is visible in the UI alongside the asset, not buried in a log file.

Auto-Materialization - Declarative Asset Freshness​

Instead of scheduling jobs to materialize assets, you can declare freshness policies on assets and let Dagster automatically materialize them when they become stale.

from dagster import asset, AutoMaterializePolicy, FreshnessPolicy

@asset(
auto_materialize_policy=AutoMaterializePolicy.eager(),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 24), # max 24 hours stale
)
def user_features(raw_events: pd.DataFrame) -> pd.DataFrame:
"""
With eager auto-materialization, Dagster will automatically
materialize user_features whenever raw_events is materialized.
No job or schedule needed.
"""
return (
raw_events
.groupby("user_id")
.agg(event_count=("event_id", "count"))
.reset_index()
)

AutoMaterializePolicy.eager() tells Dagster: whenever an upstream asset is materialized, automatically queue a materialization of this asset too. Combined with a FreshnessPolicy, Dagster will also alert you via the UI when the asset is stale relative to its declared SLA - even if no explicit error occurred.

Branching and Conditional Logic​

Some pipelines require conditional execution: run path A for high-value users, path B for low-value users, merge the results. Dagster handles this with standard Python control flow:

from dagster import asset, Output, DynamicOutput, DynamicOut

@asset(out={"high_value_features": AssetOut(), "low_value_features": AssetOut()})
def segment_features(raw_events: pd.DataFrame):
"""Split user features into high-value and low-value segments."""
features = compute_features(raw_events)
high_value = features[features["total_spend"] >= 100]
low_value = features[features["total_spend"] < 100]
return high_value, low_value

@asset
def high_value_model(high_value_features: pd.DataFrame):
"""Train a specialized model for high-value users."""
return train_model(high_value_features, model_type="gradient_boost")

@asset
def low_value_model(low_value_features: pd.DataFrame):
"""Train a lightweight model for low-value users."""
return train_model(low_value_features, model_type="logistic_regression")

@asset
def production_model_config(high_value_model, low_value_model):
"""Combine both models into a routing configuration."""
return {
"high_value": serialize_model(high_value_model),
"low_value": serialize_model(low_value_model),
"routing_threshold": 100.0,
}

The asset graph captures the fan-out (one asset producing two) and fan-in (two assets converging into one) naturally. Dagster materializes each branch in parallel when possible.


Deployment and Infrastructure​

Dagster on Kubernetes​

For production deployments, Dagster runs on Kubernetes via the official Helm chart. The deployment consists of:

  • dagster-webserver: the UI and GraphQL API
  • dagster-daemon: the scheduler, sensor executor, and auto-materialization engine
  • User code deployments: Docker images containing your asset definitions, deployed separately from the Dagster infrastructure

This separation is a key architectural advantage: upgrading the Dagster version (webserver + daemon) does not require redeploying your pipeline code. Rolling out new pipeline code does not require restarting the Dagster scheduler.

# dagster-values.yaml (Helm values for production deployment)
dagsterWebserver:
replicaCount: 2
resources:
limits:
memory: "2Gi"
cpu: "1000m"

dagsterDaemon:
replicaCount: 1 # Exactly one daemon instance
resources:
limits:
memory: "2Gi"

postgresql:
enabled: false # Use external managed PostgreSQL
externalDatabase:
host: "postgres.internal"
port: 5432
database: "dagster"
secretName: "dagster-postgres-secret"

userDeployments:
enabled: true
deployments:
- name: "feature-pipeline"
image:
repository: "my-registry/feature-pipeline"
tag: "v1.2.3"
pullPolicy: Always
port: 3030

Dagster Cloud - Managed Hosting​

Dagster Cloud (formerly Dagster+) provides a fully managed deployment:

  • Hybrid architecture: Dagster Cloud hosts the webserver and daemon. Your code runs in your own infrastructure (Kubernetes cluster, ECS, or serverless via Dagster Serverless). Your data never leaves your environment.
  • Branch deployments: each pull request gets its own isolated Dagster environment for testing. Validate asset changes before merging to main.
  • Insights: cross-run analytics, asset freshness trends, cost attribution per asset group.

For teams without dedicated platform engineering, Dagster Cloud eliminates the operational overhead of self-hosting while preserving full control over where pipeline code executes.


Summary - Why Dagster's Model Wins for Data-Heavy Teams​

The central insight of Dagster is that the relevant question in a data platform is not "did this computation succeed?" but "is this data product correct and fresh?" Task-centric orchestrators answer the first question. Dagster answers the second.

For teams running ML platforms where data quality directly impacts model performance, this distinction matters. Stale features cause model degradation. Schema drift breaks inference pipelines. Incorrect aggregations produce wrong training labels. Dagster makes these failures visible and preventable in a way that task-centric orchestrators cannot - because task-centric orchestrators don't model data products at all.

The practical cost of Dagster's model is a steeper learning curve for teams coming from Airflow and a smaller operator ecosystem. The practical benefit is a data platform where every asset has a known owner, a declared freshness SLA, a complete materialization history, and inline data quality checks. That combination makes incidents shorter and prevents many incidents entirely.


Reference - Dagster Concepts Cheat Sheet​

ConceptWhat it isWhen to use
@assetDecorated function that produces a named data productEvery pipeline computation that produces persistent output
@asset_checkInline data quality assertion attached to an assetAfter any asset that feeds downstream models or reports
DailyPartitionsDefinitionSplits an asset into per-date slicesAny asset that processes time-series data
@resourceInjectable infrastructure dependency (DB, S3, etc.)Any asset that connects to external systems
define_asset_jobGroups assets into a materializable unitScheduling related assets together
ScheduleDefinitionCron-triggered jobRegular batch materialization
@sensorEvent-driven job triggerReact to S3 events, API changes, external state
FreshnessPolicyDeclares maximum allowed stalenessSLA enforcement on critical assets
AutoMaterializePolicyDeclarative freshness - auto-materialize on upstream changeEliminating explicit schedule definitions
@dbt_assetsConverts dbt project to Dagster assetsdbt-heavy transformation stacks
materialize()In-process asset execution for testingUnit and integration tests
DefinitionsTop-level configuration objectRegistering all assets, jobs, and resources
Ā© 2026 EngineersOfAI. All rights reserved.