Part 1: The 'E' - Extracting & Initial Cleaning
Scene: The journey begins by getting the raw data into a usable format
Before orchestrating anything, María needs to get her hands on the raw data: population, energy consumption, renewable share, and country taxonomy. Let’s see how this is typically done.
Source of generated image on this page: Google Gemini
What We Will Learn
- Load raw
CSV
files intopandas
DataFrames - Standardize column names and handle missing values
- Convert data to appropriate types and handle conversion errors
- Use
pandera
schemas to define and enforce data expectations - Transform ad-hoc scripts into trackable
Dagster
assets
ℹ️ If you have fallen behind, you can git checkout part_0
before starting this section.
Data Architecture Evolution
We will retrieve the foundational data sources to feed our analysis pipeline, the “E” in ETL - Extract.
%%{
init: {
'themeVariables': {
'lineColor': '#d3d3d3'
}
}
}%%
flowchart TD
P[Population]
C[Energy Consumption]
R[Renewable Energy Coverage]
M[New Regions Mapping]
F[Energy Consumption Breakdown Fossil vs Renewable]
C --> F
R --> F
A[Energy Consumption Breakdown Fossil vs Renewable With Population]
F --> A
P --> A
MC[New Regions Energy Consumption Breakdown Fossil vs Renewable With Population]
A --> MC
M --> MC
PC[Per capita consumption]
MC --> PC
A --> PC
RE[Iberia vs North America Report]
PC --> RE
classDef inputs stroke:#2196F3,stroke-width:3px
class P,C,R,M inputs
classDef greyedOut stroke:#d3d3d3,stroke-width:2px,fill:#f9f9f9,color:#d3d3d3
class F,A,PC,RE,MC greyedOut
Basic Approach (Plain Python & Pandas)
For this section we will continue the work started in workshop-project/basic/renewable_coverage_analysis.py
during Part 0.
How to Load Data with Pandas
import pandas as pd
# Load all datasets
population_df = pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/population-with-un-projections.csv")
energy_df = pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/primary-energy-cons.csv")
renewable_df = pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/renewable-share-energy.csv")
taxonomy_df = pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/regional-grouping.csv")
Quick Exploration
print(population_df.head())
print(population_df.info())
print(population_df.describe())
Standardize the Column Names
# Rename columns for consistency
population_df = population_df.rename(
columns={
"population__sex_all__age_all__variant_estimates": "population",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
)
# Select only the standardized columns
population_df = population_df[["entity", "entity_code", "year", "population"]]
Assign Types
# Convert to appropriate types
population_df = population_df.astype({"year": int, "population": int, "entity_code": str, "entity": str})
Type convertion errors
There are some types that you might need to carefully think about, check the dtypes documentation to learn more.
After reading our data, we can notice the population contains NULL
values. How can we handle this?
Solution
In the context of our exercise, the simplest solution is to clean-up the missing data as part of your extraction process.
💡 Click to reveal solution
population_df = (
population_df.rename(
columns={
"population__sex_all__age_all__variant_estimates": "population",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
)
.dropna(subset=["population"])
.astype({"year": int, "population": int, "entity_code": str, "entity": str})
)
Demonstrate Failure
If you change a population value to "unknown"
or add a country not in the taxonomy, the pipeline will either:
- Throw a generic exception (e.g.,
ValueError: could not convert string to float
) - Or silently produce incorrect results, with no clear error reporting.
Basic Approach Solution
💡 Click to reveal solution
workshop-project/basic/renewable_coverage_analysis.py
import pandas as pd
# Available data files
csv_population = "/workspaces/orchestration-workshop-tutorial/data/population-with-un-projections.csv"
csv_renewable_share = "/workspaces/orchestration-workshop-tutorial/data/renewable-share-energy.csv"
csv_energy_consumption = "/workspaces/orchestration-workshop-tutorial/data/primary-energy-cons.csv"
csv_regional_grouping = "/workspaces/orchestration-workshop-tutorial/data/regional-grouping.csv"
def _prepare_population(df: pd.DataFrame) -> pd.DataFrame:
"""
Standardize the population column names.
"""
population = df.rename(
columns={
"population__sex_all__age_all__variant_estimates": "population",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
)
return population[["entity", "entity_code", "year", "population"]]
def _prepare_renewable_energy(df: pd.DataFrame) -> pd.DataFrame:
"""
Standardize the renewable energy contributions column names.
"""
renewable_coverage_df = df.rename(
columns={
"renewables__pct_equivalent_primary_energy": "renewable_energy_pct",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
).assign(renewable_energy_pct=lambda x: x["renewable_energy_pct"] / 100)
return renewable_coverage_df[
["entity", "entity_code", "year", "renewable_energy_pct"]
]
def _prepare_energy_consumption(df: pd.DataFrame) -> pd.DataFrame:
"""
Standardize the energy consumption column names.
Standardize units to GWH.
"""
energy_consumption_df = df.rename(
columns={
"primary_energy_consumption__twh": "energy_consumption_twh",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
).assign(energy_consumption_gwh=lambda x: x["energy_consumption_twh"] * 1000)
return energy_consumption_df[
["entity", "entity_code", "year", "energy_consumption_gwh"]
]
def _get_df_from_csv(csv_file: str) -> pd.DataFrame:
df = pd.read_csv(csv_file)
print(f"Loaded {len(df)} rows from {csv_file}")
return df
# Load the data
population_df = _get_df_from_csv(csv_population)
renewable_energy_df = _get_df_from_csv(csv_renewable_share)
energy_consumption = _get_df_from_csv(csv_energy_consumption)
regional_grouping = _get_df_from_csv(csv_regional_grouping)
# Prepare and standardize the data
# Set the common indices
population_df = _prepare_population(population_df)
renewable_energy_df = _prepare_renewable_energy(renewable_energy_df)
energy_consumption_df = _prepare_energy_consumption(energy_consumption)
Problem Highlight
This works for small datasets, but what if files are missing? What if the columns change? How do we ensure consistency and traceability for all our inputs?
Advanced Approach (Dagster Assets & Pandera for Input Validation)
As our data workflows grow, we need tools that help us manage complexity, ensure data quality, and provide visibility into each step. Two powerful tools for this are:
- Dagster: An open-source data orchestrator that lets you define, track, and monitor each step of your pipeline as reusable, observable “assets.”
- Pandera: A Python library for validating pandas DataFrames using expressive, schema-based contracts. It ensures your data meets expectations before it enters your pipeline.
With Dagster
, you can treat each raw data source as a trackable asset, and with Pandera, you can enforce strict data contracts to catch issues early.
Setting Up Dagster
Locally
To use Dagster
for asset-based data extraction and validation, follow these steps to scaffold your project and start the development server:
0. Create a folder to maintain the Dagster
definitions
This will be your source code for the orchestrated pipeline.
mkdir -p /workspaces/orchestration-workshop-tutorial/workshop-project/advanced
cd /workspaces/orchestration-workshop-tutorial/workshop-project/advanced
1. Scaffold a New Dagster Project
create-dagster project energy-analysis
cd energy-analysis
2. Scaffold Asset Definitions
dg scaffold defs dagster.asset assets.py
This will create an assets.py
file where you can define your Dagster assets.
3. Start the Dagster Development Server
dg dev
This launches the Dagster UI locally, allowing you to run, monitor, and debug your asset-based pipelines.
Dagster for “Extract” as Assets
Instead of ad-hoc loading, define each raw data source as a Dagster Asset
. Let’s create our first asset in workshop-project/advanced/energy-analysis/src/energy_analysis/defs/assets.py
:
import dagster as dg
import pandas as pd
@dg.asset
def population_by_country():
"""Population by country from UN projections"""
return pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/population-with-un-projections.csv")
This makes each input visible, trackable, and versioned. If you click on Reload definitions
in the Assets
tab in Dagster, you will be able to see your new asset and materialize it.
Pandera for Data Contracts
Define a data model for each dataset. In order to separte models from assets, we create a models.py
under workshop-project/advanced/energy-analysis/src/energy_analysis/defs/
:
models.py
import pandera as pa
class PopulationDataModel(pa.DataFrameModel):
entity: str = pa.Field(description="Country name")
entity_code: str = pa.Field(description="Country code")
year: int = pa.Field(description="Year of the population estimate")
population: int = pa.Field(description="Population estimate", nullable=True)
Add validation to the asset using dagster types:
assets.py
import dagster as dg
import pandas as pd
from dagster_pandera import pandera_schema_to_dagster_type
from energy_analysis.defs.models import (
PopulationDataModel
)
@dg.asset(dagster_type=pandera_schema_to_dagster_type(PopulationDataModel.to_schema()))
def population_by_country():
"""Population by country from UN projections"""
return pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/population-with-un-projections.csv")
Why Pandera? It is a complete semantic data validation framework; at its core, it validates data schemas automatically, preventing bad data from entering your pipeline and providing clear error messages when validation fails.
Dagster Type Checks
Dagster performs validation against the schema defined, if any of the checks fails you will see a STEP_FAILURE in your materialization output
Solution
Clean-up the data as part of your extraction asset
💡 Click to reveal solution
assets.py
@dg.asset(dagster_type=pandera_schema_to_dagster_type(PopulationDataModel.to_schema()))
def population_by_country():
"""Population by country from UN projections"""
return (
pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/population-with-un-projections.csv")
.rename(
columns={
"population__sex_all__age_all__variant_estimates": "population",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
)
.dropna(subset=["population"])
.astype({"year": int, "population": int, "entity_code": str, "entity": str})
)
Demonstrate: Pandera Prevents Bad Data
If you for example change the type of population from int
to float
, when materializing the asset you will see an actionable error. Dagster will surface this error in its UI/logs, showing exactly which asset and which row failed.
dagster._core.errors.DagsterTypeCheckDidNotPass: Type check failed for step output "result" - expected type "PopulationDataModel". Description: {
"SCHEMA": {
"WRONG_DATATYPE": [
{
"schema": "PopulationDataModel",
"column": "population",
"check": "dtype('float64')",
"error": "expected series 'population' to have type float64, got int64"
}
]
}
}
Hands-on
Let’s define our raw data sources as Dagster
assets, add Pandera
schemas to ensure they meet our basic expectations for structure and type, and see how our tools catch the data error we introduced.
💡 Click to reveal solution
assets.py
import dagster as dg
import pandas as pd
from dagster_pandera import pandera_schema_to_dagster_type
from energy_analysis.defs.models import (
PopulationDataModel,
EnergyConsumptionDataModel,
RenewableCoverageDataModel,
RegionalGroupingDataModel,
)
@dg.asset(dagster_type=pandera_schema_to_dagster_type(PopulationDataModel.to_schema()))
def population():
"""Population by country from UN projections"""
return (
pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/population-with-un-projections.csv")
.rename(
columns={
"population__sex_all__age_all__variant_estimates": "population",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
)
.dropna(subset=["population"])
.astype({"year": int, "population": int, "entity_code": str, "entity": str})
)
@dg.asset(
dagster_type=pandera_schema_to_dagster_type(EnergyConsumptionDataModel.to_schema())
)
def energy_consumption():
"""Energy consumption by country from UN projections"""
return (
pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/primary-energy-cons.csv")
.rename(
columns={
"primary_energy_consumption__twh": "energy_consumption",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
)
.astype(
{
"year": int,
"energy_consumption": float,
"entity_code": str,
"entity": str,
}
)
)
@dg.asset(
dagster_type=pandera_schema_to_dagster_type(RenewableCoverageDataModel.to_schema())
)
def renewable_coverage():
"""Renewable energy coverage by country from UN projections"""
return (
pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/renewable-share-energy.csv")
.rename(
columns={
"renewables__pct_equivalent_primary_energy": "renewable_energy_pct",
"Entity": "entity",
"Code": "entity_code",
"Year": "year",
}
)
.assign(renewable_energy_pct=lambda x: x["renewable_energy_pct"] / 100)
)
@dg.asset(
dagster_type=pandera_schema_to_dagster_type(RegionalGroupingDataModel.to_schema())
)
def regional_grouping():
"""Regional grouping taxonomy"""
return pd.read_csv("/workspaces/orchestration-workshop-tutorial/data/regional-grouping.csv")
models.py
import pandera as pa
class PopulationDataModel(pa.DataFrameModel):
entity: str = pa.Field(description="Entity name")
entity_code: str = pa.Field(description="Country code")
year: int = pa.Field(description="Year of the population estimate")
population: int = pa.Field(description="Population estimate")
class EnergyConsumptionDataModel(pa.DataFrameModel):
entity: str = pa.Field(description="Entity name")
entity_code: str = pa.Field(description="Country code")
year: int = pa.Field(description="Year of the consumption")
energy_consumption: float = pa.Field(description="Energy consumption in TWh")
class RenewableCoverageDataModel(pa.DataFrameModel):
entity: str = pa.Field(description="Entity name")
entity_code: str = pa.Field(description="Country code", nullable=True)
year: int = pa.Field(description="Year of the estimate")
renewable_energy_pct: float = pa.Field(description="Renewable energy coverage in %")
class RegionalGroupingDataModel(pa.DataFrameModel):
region_entity_code: str = pa.Field(description="Region entity code")
region_name: str = pa.Field(description="Region name")
entity_code: str = pa.Field(description="Country code")
Moment of Discovery
By defining our raw data as
Dagster
assets and enforcingPandera
contracts, we’ve built a robust ‘intake system’ that ensures our foundation is solid and traceable, and can proactively identify data quality issues.
Next: Move on to transforming your raw inputs and create new energy data!