Part 2: The 'T' - Transforming & Enriching

Scene: It’s time to transform the extracted data into the desired format for analysis

With our raw data sources properly validated and loaded, María needs to combine and transform this data to answer her policy questions. This involves merging datasets, calculating derived metrics, and creating regional aggregates for comparison.

Illustration of María, a data analyst at a UN-affiliated organization Source of generated image on this page: Google Gemini

What We Will Learn

  • Transform and combine multiple datasets for analysis pandas
  • Create regional aggregates and derived metrics
  • Visualize data dependencies and pipeline lineage
  • Move from ad-hoc scripts to structured, maintainable workflows using Dagster assets

ℹ️ If you have fallen behind, you can git checkout part_1 before starting this section.

For the advanced section, you can the re-run dg dev to restart your Dagster webserver.


Data Architecture Evolution

We will transform and enrich our starting data to create meaningful insights, the “T” in ETL - Transform.


%%{
  init: {
    'themeVariables': {
      'lineColor': '#d3d3d3'
    }
  }
}%%

flowchart TD
    P[Population]
    C[Energy Consumption]
    R[Renewable Energy Coverage]
    M[New Regions Mapping]

    F[Energy Consumption Breakdown Fossil vs Renewable]
    C --> F
    R --> F

    A[Energy Consumption Breakdown Fossil vs Renewable With Population]

    F --> A
    P --> A

    MC[New Regions Energy Consumption Breakdown Fossil vs Renewable With Population]
    A --> MC
    M --> MC

    PC[Per capita consumption]

    MC --> PC
    A --> PC

    RE[Iberia vs North America Report]

    PC --> RE

    classDef greyedOut stroke:#d3d3d3,stroke-width:2px,fill:#f9f9f9,color:#d3d3d3
    class P,C,R,M,RE greyedOut

Basic Approach: Complex Pandas Transformations

In this section, we’ll demonstrate a traditional approach to data transformation using pandas. María needs to combine multiple datasets - energy consumption, renewable energy percentages, population data, and regional taxonomies - to create comprehensive analytics ready for policy analysis. We’ll walk through a series of pandas operations that merge, calculate, and aggregate this data, but we’ll also highlight the challenges that arise when managing complex transformation pipelines without proper orchestration tools.

Combine Energy Data Sources

# Merge energy consumption with renewable percentages
energy_breakdown = energy_consumption_df.merge(
    renewable_energy_df, how="left", on=["entity", "entity_code", "year"]
).assign(
    renewable_energy_pct=lambda x: x["renewable_energy_pct"].fillna(0),
    fossil_energy_pct=lambda x: 1 - x["renewable_energy_pct"],
    renewable_energy_consumption_gwh=lambda x: x["energy_consumption_gwh"] * x["renewable_energy_pct"],
    fossil_energy_consumption_gwh=lambda x: x["energy_consumption_gwh"] * x["fossil_energy_pct"],
)

Add Population Data

# Merge with population data
energy_with_population = energy_breakdown.merge(
    population_df, how="left", on=["entity", "entity_code", "year"]
)

Create Regional Aggregates

# Manually define regions using taxonomy
entities_of_interest = energy_with_population.merge(
    regional_grouping, on="entity_code"
).assign(
    relative_contrib_in_region=lambda x: x["energy_consumption_gwh"]
    / x.groupby(["region_entity_code", "year"])["energy_consumption_gwh"].transform("sum")
)

regional_aggregates = (
    entities_of_interest.assign(
        renewable_energy_pct=lambda x: x["renewable_energy_pct"] * x["relative_contrib_in_region"],
        fossil_energy_pct=lambda x: x["fossil_energy_pct"] * x["relative_contrib_in_region"],
    )
    .groupby(["region_entity_code", "region_name", "year"], as_index=False)
    .agg({
        "population": "sum",
        "energy_consumption_gwh": "sum",
        "renewable_energy_consumption_gwh": "sum",
        "fossil_energy_consumption_gwh": "sum",
        "renewable_energy_pct": "sum",
        "fossil_energy_pct": "sum",
    })
    .rename(columns={"region_name": "entity", "region_entity_code": "entity_code"})
)

Hands-on

Let’s create the per-capita metrics DataFrame by ourselves.

💡 Click to reveal solution
# Calculate per-capita consumption
per_capita_consumption = pd.concat([regional_aggregates, energy_with_population]).assign(
    energy_consumption_per_capita_gwh=lambda x: x["energy_consumption_gwh"] / x["population"],
    renewable_energy_per_capita_gwh=lambda x: x["renewable_energy_consumption_gwh"] / x["population"],
    fossil_energy_per_capita_gwh=lambda x: x["fossil_energy_consumption_gwh"] / x["population"],
)

Problem Highlight

While pandas is powerful, managing a sequence of complex transformations, tracking intermediate steps, and ensuring reproducibility becomes a ‘spaghetti code’ challenge. How do we know if a change upstream impacts a calculation far downstream?


Advanced Approach: Dagster Assets for Transformations

In this advanced approach, we’ll restructure our complex pandas transformations into a series of well-defined Dagster assets. This strategy transforms our monolithic transformation code into discrete, trackable components that can be monitored, validated, and reused independently. Each transformation step becomes an asset with clear inputs and outputs, making the entire pipeline more maintainable and debuggable.

Setting Up Transform Assets

Break down the complex transformations into discrete, trackable Dagster assets:

models.py

class EnergyBreakdownDataModel(EnergyConsumptionDataModel):
    renewable_energy_pct: float = pa.Field(description="Renewable energy coverage in %")
    fossil_energy_pct: float = pa.Field(description="Fossil energy coverage in %")
    renewable_energy_consumption: float = pa.Field(
        description="Renewable energy consumption in GWh"
    )
    fossil_energy_consumption: float = pa.Field(
        description="Fossil energy consumption in GWh"
    )

assets.py

@dg.asset(
    dagster_type=pandera_schema_to_dagster_type(EnergyBreakdownDataModel.to_schema())
)
def energy_breakdown(energy_consumption, renewable_coverage):
    """Combine energy consumption with renewable percentages to calculate fossil vs renewable breakdown"""
    return energy_consumption.merge(
        renewable_coverage, how="left", on=["entity", "entity_code", "year"]
    ).assign(
        renewable_energy_pct=lambda x: x["renewable_energy_pct"].fillna(0),
        fossil_energy_pct=lambda x: 1 - x["renewable_energy_pct"],
        renewable_energy_consumption=lambda x: x["energy_consumption"] * x["renewable_energy_pct"],
        fossil_energy_consumption=lambda x: x["energy_consumption"] * x["fossil_energy_pct"],
    )

Demonstrate Dagster’s Lineage & UI

The Dagster UI now clearly visualizes the dependencies between these transformed assets, offering a clear “map” of María’s data pipeline: Our pipeline

Hands-on

Let’s convert our complex pandas transformations into structured Dagster assets and add asset checks to ensure the quality of our intermediate and final calculated metrics.

💡 Click to reveal solution

assets.py

import dagster as dg
import pandas as pd
from dagster_pandera import pandera_schema_to_dagster_type
from energy_analysis.defs.utils import get_dagster_type

from energy_analysis.defs.models import (
    PopulationDataModel,
    EnergyConsumptionDataModel,
    RenewableCoverageDataModel,
    RegionalGroupingDataModel,
    EnergyBreakdownDataModel,
    EnergyBreakdownWithPopulationDataModel,
    EnergyBreakdownPerCapitaDataModel,
)


@dg.asset(dagster_type=pandera_schema_to_dagster_type(PopulationDataModel.to_schema()))
def population():
    """Population by country from UN projections"""
    return (
        pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/population-with-un-projections.csv")
        .rename(
            columns={
                "population__sex_all__age_all__variant_estimates": "population",
                "Entity": "entity",
                "Code": "entity_code",
                "Year": "year",
            }
        )
        .dropna(subset=["population"])
        .astype({"year": int, "population": int, "entity_code": str, "entity": str})
    )


@dg.asset(
    dagster_type=pandera_schema_to_dagster_type(EnergyConsumptionDataModel.to_schema())
)
def energy_consumption():
    """Energy consumption by country from UN projections"""
    return (
        pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/primary-energy-cons.csv")
        .rename(
            columns={
                "primary_energy_consumption__twh": "energy_consumption",
                "Entity": "entity",
                "Code": "entity_code",
                "Year": "year",
            }
        )
        .astype(
            {
                "year": int,
                "energy_consumption": float,
                "entity_code": str,
                "entity": str,
            }
        )
    )


@dg.asset(
    dagster_type=pandera_schema_to_dagster_type(RenewableCoverageDataModel.to_schema())
)
def renewable_coverage():
    """Renewable energy coverage by country from UN projections"""
    return (
        pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/renewable-share-energy.csv")
        .rename(
            columns={
                "renewables__pct_equivalent_primary_energy": "renewable_energy_pct",
                "Entity": "entity",
                "Code": "entity_code",
                "Year": "year",
            }
        )
        .assign(renewable_energy_pct=lambda x: x["renewable_energy_pct"] / 100)
    )


@dg.asset(
    dagster_type=pandera_schema_to_dagster_type(RegionalGroupingDataModel.to_schema())
)
def regional_grouping():
    """Regional grouping taxonomy"""
    return pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/regional-grouping.csv")


@dg.asset(
    dagster_type=pandera_schema_to_dagster_type(EnergyBreakdownDataModel.to_schema())
)
def energy_breakdown(energy_consumption, renewable_coverage):
    """Combine energy consumption with renewable percentages to calculate fossil vs renewable breakdown"""
    return energy_consumption.merge(
        renewable_coverage, how="left", on=["entity", "entity_code", "year"]
    ).assign(
        renewable_energy_pct=lambda x: x["renewable_energy_pct"].fillna(0),
        fossil_energy_pct=lambda x: 1 - x["renewable_energy_pct"],
        renewable_energy_consumption=lambda x: x["energy_consumption"]
        * x["renewable_energy_pct"],
        fossil_energy_consumption=lambda x: x["energy_consumption"]
        * x["fossil_energy_pct"],
    )


@dg.asset(
    dagster_type=pandera_schema_to_dagster_type(
        EnergyBreakdownWithPopulationDataModel.to_schema()
    )
)
def energy_breakdown_with_population(energy_breakdown, population):
    """Combine energy breakdown with population data"""
    return energy_breakdown.merge(
        population, how="left", on=["entity", "entity_code", "year"]
    ).astype({"population": "Int64"})


@dg.asset(
    dagster_type=get_dagster_type(
        EnergyBreakdownWithPopulationDataModel, "energy_breakdown_with_new_regions"
    )
)
def energy_breakdown_with_new_regions(
    energy_breakdown_with_population, regional_grouping
):
    """Combine energy breakdown with new regional data"""
    entities_of_interest = energy_breakdown_with_population.merge(
        regional_grouping, on="entity_code"
    )

    return (
        entities_of_interest.groupby(
            [
                "region_entity_code",
                "region_name",
                "year",
            ],
            as_index=False,
        )
        .agg(
            {
                "population": "sum",
                "energy_consumption": "sum",
                "renewable_energy_consumption": "sum",
                "fossil_energy_consumption": "sum",
            }
        )
        .assign(
            renewable_energy_pct=lambda x: x["renewable_energy_consumption"]
            / x["energy_consumption"],
            fossil_energy_pct=lambda x: x["fossil_energy_consumption"]
            / x["energy_consumption"],
        )
        .rename(columns={"region_name": "entity", "region_entity_code": "entity_code"})
    )


@dg.asset(dagster_type=get_dagster_type(EnergyBreakdownPerCapitaDataModel))
def energy_breakdown_per_capita(
    energy_breakdown_with_population, energy_breakdown_with_new_regions
):
    """Compute per-capita energy consumption metrics"""
    all_breakdowns = pd.concat(
        [energy_breakdown_with_population, energy_breakdown_with_new_regions]
    )
    return all_breakdowns.assign(
        energy_consumption_per_capita=lambda x: x["energy_consumption"]
        / x["population"],
        renewable_energy_per_capita=lambda x: x["renewable_energy_consumption"]
        / x["population"],
        fossil_energy_per_capita=lambda x: x["fossil_energy_consumption"]
        / x["population"],
    )

models.py

import pandera as pa


class PopulationDataModel(pa.DataFrameModel):
    entity: str = pa.Field(description="Entity name")
    entity_code: str = pa.Field(description="Country code")
    year: int = pa.Field(description="Year of the population estimate")
    population: int = pa.Field(description="Population estimate")


class EnergyConsumptionDataModel(pa.DataFrameModel):
    entity: str = pa.Field(description="Entity name")
    entity_code: str = pa.Field(description="Country code")
    year: int = pa.Field(description="Year of the consumption")
    energy_consumption: float = pa.Field(description="Energy consumption in TWh")


class RenewableCoverageDataModel(pa.DataFrameModel):
    entity: str = pa.Field(description="Entity name")
    entity_code: str = pa.Field(description="Country code", nullable=True)
    year: int = pa.Field(description="Year of the estimate")
    renewable_energy_pct: float = pa.Field(description="Renewable energy coverage in %")


class RegionalGroupingDataModel(pa.DataFrameModel):
    region_entity_code: str = pa.Field(description="Region entity code")
    region_name: str = pa.Field(description="Region name")
    entity_code: str = pa.Field(description="Country code")


class EnergyBreakdownDataModel(EnergyConsumptionDataModel):
    renewable_energy_pct: float = pa.Field(description="Renewable energy coverage in %")
    fossil_energy_pct: float = pa.Field(description="Fossil energy coverage in %")
    renewable_energy_consumption: float = pa.Field(
        description="Renewable energy consumption in GWh"
    )
    fossil_energy_consumption: float = pa.Field(
        description="Fossil energy consumption in GWh"
    )


class EnergyBreakdownWithPopulationDataModel(EnergyBreakdownDataModel):
    population: int = pa.Field(description="Population", nullable=True)


class EnergyBreakdownPerCapitaDataModel(EnergyBreakdownDataModel):
    population: int = pa.Field(description="Population", nullable=True)
    energy_consumption_per_capita: float = pa.Field(
        description="Energy consumption per capita in GWh", nullable=True
    )
    renewable_energy_per_capita: float = pa.Field(
        description="Renewable energy consumption per capita in GWh", nullable=True
    )
    fossil_energy_per_capita: float = pa.Field(
        description="Fossil energy consumption per capita in GWh", nullable=True
    )

utils.py

from dagster_pandera import pandera_schema_to_dagster_type
import pandera as pa


def get_dagster_type(model: pa.DataFrameModel, asset_name: str | None = None) -> None:
    dagster_type = None
    schema = model.to_schema()
    if asset_name:
        schema.title = f"{model.Config.name}_{asset_name}"
    else:
        schema.title = model.Config.name
    dagster_type = pandera_schema_to_dagster_type(schema)
    return dagster_type

Moment of Discovery

Dagster provides the ‘architect’s blueprint’ for our transformations, ensuring clarity and traceability, while the UI gives us complete visibility into our data lineage.

  1. Raw Data AssetsEnergy BreakdownEnergy with PopulationRegional AggregatesPer-Capita Metrics
  2. Lineage View shows how changes propagate through the pipeline
  3. Asset Materialization History tracks when each step was last updated

Next: Move on to the final step - Loading and generating insights from our transformed data!

results matching ""

    No results matching ""