Part 4: The 'QC' - Ensuring Quality
Scene: The Trust Question
María’s renewable energy coverage analysis pipeline is running beautifully. During her first UN policy presentation, a senior analyst asks: Can we trust the energy consumption estimates are accurate?
Source of generated image on this page: Google Gemini
María realizes: A successful pipeline run isn’t the same as trustworthy results. Quality Control must be explicit and actionable.
What We Will Learn
- Recognize why quality control is critical at different pipeline stages
- Build validation checks using
pandas
for your energy data - Create systematic quality monitoring with
Dagster
asset checks
ℹ️ If you have fallen behind, you can git checkout part_3
before starting this section.
For the advanced
section, you can the re-run dg dev
to restart your Dagster
webserver.
Data Architecture Evolution
In this section, we’ll enhance our pipeline by adding systematic quality validation at critical points.
flowchart TD
P[Population]
C[Energy Consumption]
R[Renewable Energy Coverage]
M[New Regions Mapping]
F[Energy Consumption Breakdown Fossil vs Renewable]
C --> F
R --> F
A[Energy Consumption Breakdown Fossil vs Renewable With Population]
F --> A
P --> A
MC[New Regions Energy Consumption Breakdown Fossil vs Renewable With Population]
A --> MC
M --> MC
PC[Per capita consumption]
MC -->|Quality Control Check| PC
A -->|Quality Control Check| PC
RE[Iberia vs North America Report]
PC -->|Quality Control Check| RE
classDef inputs stroke:#2196F3,stroke-width:3px
class P,C,R,M inputs
classDef outputs stroke:#3db35c,stroke-width:3px
class RE outputs
What We Mean by Quality Control
We have two complementary pillars:
Semantic QC (Already Covered in Part 1)
Using Pandera we ensured structure, types, and basic constraints:
- Required columns exist with correct types
- Values within expected ranges (e.g., population > 0)
- Schema consistency across pipeline runs
Statistical QC (Today’s Focus)
Goes deeper into content relationships within our energy data:
- Conservation laws: Do fossil + renewable percentages sum to 1.0?
- Realistic bounds: Are per-capita values within believable ranges?
- Consistency: Do calculations reconcile across transformations?
QC at Multiple Levels: Input and Output
Level | Purpose | Example Risk |
---|---|---|
Input (post-extract) | Build trust in sources early | Wrong data types slip through |
Output (post-transform) | Certify calculations for consumption | Per-capita spikes from division errors |
The later a defect is detected, the more expensive it is to fix.
Basic Approach: Manual Pandas Assertions
María’s first instinct: sprinkle assert
statements throughout her code.
Energy Conservation Check
def calculate_energy_breakdown(df):
# ...transformation logic...
# Manual assertion scattered in code
tolerance = 1e-6
total_pct = df['fossil_energy_pct'] + df['renewable_energy_pct']
assert abs(total_pct - 1.0).max() < tolerance, "Energy percentages don't sum to 1!"
return df
Per-Capita Threshold Check
def calculate_per_capita_metrics(df):
# ...calculations...
# More scattered assertions
max_threshold = 0.1 # 0.1 GWh per person per year ≈ 100 MWh
assert df['energy_consumption_per_capita'].max() < max_threshold, "Per-capita too high!"
return df
Problems with This Approach:
- Mixed concerns: QC tangled with business logic
- Poor visibility: Generic error messages, no historical tracking
- All-or-nothing: Every assertion is blocking, no severity levels
- Hard to maintain: Changes require modifying transformation code
Hands-on
Let’s see the problems with scattered assertions in action. Try implementing a basic energy conservation check directly in your transformation and observe how it tangles QC concerns with business logic.
💡 Click to reveal solution
def calculate_energy_breakdown(df):
# Business logic - calculate percentages
df = df.assign(
fossil_energy_pct=lambda x: x['fossil_energy_gwh'] / x['total_energy_gwh'],
renewable_energy_pct=lambda x: x['renewable_energy_gwh'] / x['total_energy_gwh']
)
# QC logic mixed in - this is the problem!
tolerance = 1e-6
total_pct = df['fossil_energy_pct'] + df['renewable_energy_pct']
assert abs(total_pct - 1.0).max() < tolerance, "Energy percentages don't sum to 1!"
# More business logic continues...
return df
Notice how the QC assertion is tangled with the business transformation logic, making both harder to understand and maintain.
Advanced Approach: Dagster Asset Checks
Dagster
treats quality control as first-class citizens through Asset Checks. Asset checks are tests that verify specific properties of your data assets and are directly integrated within the orchestration workflow.
Example: Per-Capita Plausibility Check
@dg.asset_check(asset=energy_breakdown_per_capita)
def per_capita_plausibility_check(energy_breakdown_per_capita):
"""Validate realistic per-capita energy consumption bounds"""
max_threshold = 0.1 # 100 MWh per person per year
violations = (energy_breakdown_per_capita["energy_consumption_per_capita"] > max_threshold).sum()
is_valid = int(violations) == 0
return dg.AssetCheckResult(
passed=is_valid,
description=f"Per-capita bounds: {violations} violations (max allowed: {max_threshold} GWh)",
)
Hands-on
Implement an energy conservation check as part of your pipeline:
- This check should enforce your estimated fossil vs renewable breakdown add up to
100%
, within a small tolerance. - Observe how the metadata and descriptions provide much better visibility than simple assertions.
- Try triggering a validation failure and see the detailed error reporting in the
Dagster
UI.
💡 Click to reveal solution
@dg.asset_check(asset=energy_breakdown_with_new_regions)
def energy_conservation_check(energy_breakdown_with_new_regions):
"""Ensure fossil + renewable percentages sum to 1.0"""
tolerance = 1e-6
df_with_totals = energy_breakdown_with_new_regions.assign(
total_pct=lambda x: x["fossil_energy_pct"] + x["renewable_energy_pct"]
)
invalid_count = int((abs(df_with_totals["total_pct"] - 1.0) > tolerance).sum())
is_valid = invalid_count == 0
total = len(energy_breakdown_with_new_regions)
return dg.AssetCheckResult(
passed=is_valid,
description=f"Energy conservation: {invalid_count} invalid rows out of {total}",
)
To trigger a validation failure for testing, modify your data to include unrealistic values:
# Add to your energy_breakdown asset to test failure
df.loc[0, 'fossil_energy_pct'] = 0.6
df.loc[0, 'renewable_energy_pct'] = 0.5 # Now sums to 1.1, not 1.0
Blocking vs Non-Blocking: Strategic Control
Key Insight: Not all quality issues should stop your pipeline.
Blocking Checks (Pipeline Stops)
For critical violations that make downstream analysis meaningless:
@dg.asset_check(asset=energy_breakdown, blocking=True)
def energy_conservation_check(energy_breakdown):
"""BLOCKS pipeline if energy percentages don't sum to 1"""
...
Non-Blocking Checks (Monitor & Alert)
For warnings and trend monitoring:
@dg.asset_check(asset=per_capita_energy_metrics, blocking=False)
def per_capita_plausibility_check(per_capita_energy_metrics):
"""MONITORS per-capita bounds but doesn't stop pipeline"""
...
When to Use Each?
Strategic Decision Framework:
Choose Blocking when:
- The violation makes downstream calculations mathematically incorrect
- Policy decisions would be fundamentally flawed with bad data
- Manual intervention is required to fix the root cause
Choose Non-Blocking when:
- Data can be corrected automatically (interpolation, defaults)
- The issue is for monitoring trends rather than stopping work
- You want to collect violation patterns over time
Examples from María’s Energy Pipeline:
- Energy conservation (blocking): If fossil + renewable ≠ 1.0, all per-capita calculations are wrong → Stop pipeline
- Per-capita outliers (non-blocking): High values might be real (oil-rich nations) → Monitor but continue
- Missing population (blocking): Can’t calculate per-capita without population → Stop pipeline
- Year-over-year spikes (non-blocking): Could indicate policy changes → Monitor trends
Hands-on
- Take the energy conservation check and configure it as blocking.
- Take the per-capita plausability check and configure it as non-blocking.
Run your pipeline with intentionally bad data to see how blocking stops execution while non-blocking allows continuation with warnings.
💡 Click to reveal solution
@dg.asset_check(asset=energy_breakdown_per_capita, blocking=False)
def per_capita_plausibility_check(energy_breakdown_per_capita):
"""Validate realistic per-capita energy consumption bounds"""
max_threshold = 0.1 # 100 MWh per person per year
violations = (energy_breakdown_per_capita["energy_consumption_per_capita"] > max_threshold).sum()
is_valid = int(violations) == 0
return dg.AssetCheckResult(
passed=is_valid,
description=f"Per-capita bounds: {violations} violations (max allowed: {max_threshold} GWh)",
)
@dg.asset_check(asset=energy_breakdown_with_new_regions, blocking=True)
def energy_conservation_check(energy_breakdown_with_new_regions):
"""Ensure fossil + renewable percentages sum to 1.0"""
tolerance = 1e-6
df_with_totals = energy_breakdown_with_new_regions.assign(
total_pct=lambda x: x["fossil_energy_pct"] + x["renewable_energy_pct"]
)
invalid_count = int((abs(df_with_totals["total_pct"] - 1.0) > tolerance).sum())
is_valid = invalid_count == 0
total = len(energy_breakdown_with_new_regions)
return dg.AssetCheckResult(
passed=is_valid,
description=f"Energy conservation: {invalid_count} invalid rows out of {total}",
)
Test with intentionally bad data:
# In your energy_breakdown asset, add invalid percentages
df.loc[0, 'fossil_energy_pct'] = 0.7
df.loc[0, 'renewable_energy_pct'] = 0.4 # Sums to 1.1 - will BLOCK pipeline
# In your per_capita_energy_metrics asset, add unrealistic values
df.loc[0, 'energy_consumption_per_capita_gwh'] = 0.5 # Will log warning but continue
Extras: Scaling QC with External Tools Within Your Python Workflow
As María’s renewable energy analysis grows in scope and organizational adoption, she encounters scenarios where Dagster’s built-in asset checks need to be supplemented:
Why External QC Tools?
- Multi-language teams: SQL analysts who can’t read Python validation logic
- Centralized governance: QC rules managed separately from pipeline code
- Performance at scale: Large datasets requiring specialized analytical engines
- Cross-pipeline consistency: Same validation rules across different data workflows
Can she create a mixed data system that supports multiple tooling within the same Dagster pipeline?
Soda Core + DuckDB: Combined External Validation
A well-known quality control framework is soda-core. On its own, it offers the flexibility to choose the backend for data extraction and processing.
For example, you can combine it with another powerful tool, DuckDB, which provides a highly efficient analytical engine.
Although this runs outside your native python
context, you can still orchestrate the workflow seamlessly with Dagster
.
# energy_checks.yml - Simple, business-readable rules
checks for energy_per_capita:
- max(energy_consumption_per_capita) <= 0.1
- missing_count(renewable_energy_pct) = 0
- min(fossil_energy_pct + renewable_energy_pct) >= 0.99
import duckdb
from soda.scan import Scan
import tempfile
@dg.asset_check(asset=energy_breakdown_per_capita)
def external_energy_validation(energy_breakdown_per_capita):
"""Combined Soda Core + DuckDB validation"""
# Export to parquet for external validation
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as temp_file:
per_capita_energy_metrics.to_parquet(temp_file.name)
parquet_file_path = temp_file.name
# Use DuckDB as analytical engine + Soda Core for rule management
with duckdb.connect(":memory:") as con:
con.sql(f"CREATE VIEW energy_per_capita AS SELECT * FROM read_parquet('{parquet_file_path}')")
scan = Scan()
scan.add_duckdb_connection(con)
scan.set_data_source_name("duckdb")
my_checks = ... load checks file ...
scan.add_sodacl_yaml_str(my_checks)
scan.execute()
return AssetCheckResult(
passed=not scan.has_check_fails(),
description="External Soda Core + DuckDB validation",
)
Combined Value:
- DuckDB: High-performance analytical engine for large datasets
- Soda Core: Business-readable validation rules in YAML
- Separation of concerns: QC rules managed independently from pipeline code
- Cross-team collaboration: SQL analysts can modify rules without touching Python
When to Graduate to External Tools?
- Multi-team ownership of QC rules
- Need centralized governance & audit trails
- Higher volume datasets requiring specialized analytical engines
Hands-on
Create a simple YAML file with Soda Core rules for your energy data. Run the external validation and observe how the rules are separated from your Python pipeline code. Discuss when this approach would be valuable for your organization.
💡 Click to reveal solution
# Create energy_checks.yml file
checks for energy_per_capita:
- max(energy_consumption_per_capita_gwh) <= 0.1:
name: Per-capita energy consumption within realistic bounds
- missing_count(energy_consumption_per_capita_gwh) = 0:
name: No missing energy consumption values
- min(energy_consumption_per_capita_gwh) >= 0:
name: Energy consumption cannot be negative
- row_count between 50 and 10000:
name: Reasonable number of data points
# Add to your assets.py file
@dg.asset_check(asset=energy_breakdown_per_capita)
def external_energy_validation(energy_breakdown_per_capita):
"""Run external Soda Core validation"""
parquet_file_path = 'metrics.parquet'
per_capita_energy_metrics.to_parquet(parquet_file_path)
max_per_capita = 0.1
with duckdb.connect(":memory:") as con:
con.sql(
f"""
CREATE VIEW energy_per_capita AS
SELECT * FROM read_parquet('{parquet_file_path}')
"""
)
scan = Scan()
scan.add_duckdb_connection(con)
scan.set_data_source_name("duckdb")
sodacl = ... read checks file ...
scan.add_sodacl_yaml_str(sodacl)
scan.execute()
# Print results matching original format
passed = not scan.has_check_fails()
return AssetCheckResult(
passed=passed,
description="External Soda Core + DuckDB validation",
)
Moment of Discovery
By implementing systematic quality control at strategic pipeline stages with both blocking and non-blocking checks, we’ve transformed from “hoping our data is correct” to “knowing our data meets business requirements” - enabling confident decision-making.
Next: Explore advanced, but essential concepts in orchestration and Dagster.