Transformer Developer Guide
This guide covers the full data transformation pipeline for creating transformers that convert between energy system model formats using CESM (Common Energy System Model) as the interchange format.
1. Architecture
CESM uses a hub-and-spoke architecture. Instead of N x N direct converters between every pair of tools, each tool only needs one import + one export transformer.
+------------------------------- CESM ----------------------------------+
| |
| CESM spec (LinkML) ----gen-python----> Python class |
| | | |
| v v |
| CESM sample (YAML) --linkml-runtime--> validation |
| | |
| v |
| CESM DataFrames (in-memory) |
| | | |
| writer reader |
| | | |
| v v |
| CESM DuckDB (on-disk) |
| |
+-----------------------------------------------------------------------+
| | | |
writer reader writer reader
| | | |
Model X format (e.g. Spine DB) Model Y format (e.g. SQLite)
| | | |
reader writer reader writer
| | | |
+------+----------+------+ +-------+----------+------+
| Transformer script | | Transformer script |
| - Config (YAML): | | - Config (YAML): |
| from-CESM / to-CESM | | from-CESM / to-CESM |
| - CESM core functions | | - CESM core functions |
| - Purpose-built funcs | | - Purpose-built funcs |
+-------------------------+ +-------------------------+
Data Flow
Each conversion pipeline has up to five components:
Source DB --> Reader --> Source DataFrames --> YAML Transformer --> Python Transformer --> CESM DataFrames --> DuckDB Writer
And the reverse for export:
CESM DuckDB --> DuckDB Reader --> CESM DataFrames --> YAML Transformer --> Python Transformer --> Target DataFrames --> Writer --> Target DB
CESM as Central Interchange Format
CESM (stored in DuckDB and defined by LinkML schema) serves as the canonical data representation:
-
Normalization: All timestamps are UTC, all entity relationships are explicit
-
Versioning: CESM schema versions enable forward compatibility
-
Validation: Central format enables schema validation
-
Storage: DuckDB provides efficient columnar storage with metadata preservation
Components Overview
| Component | Location | Purpose |
|---|---|---|
Reader |
|
Reads source format into DataFrames |
YAML Transformer |
|
Declarative entity/parameter mappings |
Python Transformer |
|
Complex logic YAML can’t handle |
Writer |
|
Writes DataFrames to target format |
Processing Script |
|
Orchestrates the full pipeline |
2. DataFrame Conventions
The pipeline uses pandas DataFrames as the universal intermediate representation with specific naming and structure conventions.
DataFrame Naming
| Pattern | Index type | Example |
|---|---|---|
|
Entity names |
|
|
DatetimeIndex |
|
|
String keys |
|
|
Integer position |
|
Entity DataFrames
Entity DataFrames store entities and their scalar parameter values.
-
Index: Entity names (single Index or MultiIndex for dimensional entities)
-
Columns: Parameter values (scalars)
-
Naming:
{class_name}(e.g.,unit,balance,unit_to_node)
Example: Single-dimensional entity
# DataFrame name: 'unit'
unit_df = pd.DataFrame({
'efficiency': [0.45, 0.55, 0.38],
'units_existing': [2, 1, 3],
'investment_method': ['not_allowed', 'no_limits', 'not_allowed']
}, index=pd.Index(['coal_plant', 'gas_turbine', 'wind_farm'], name='unit'))
Example: Multi-dimensional entity
# DataFrame name: 'unit_to_node'
unit_to_node_df = pd.DataFrame({
'capacity': [500.0, 200.0, 100.0],
'other_operational_cost': [25.0, 15.0, 0.0]
}, index=pd.MultiIndex.from_tuples([
('coal_plant.west', 'coal_plant', 'west'),
('gas_turbine.east', 'gas_turbine', 'east'),
('wind_farm.west', 'wind_farm', 'west')
], names=['name', 'source', 'sink']))
Time Series / Map / Array DataFrames
These DataFrames store indexed parameter values where the index is datetime, string keys, or array positions.
-
Index: datetime (for time series), string keys (for maps), or position (for arrays)
-
Columns: Entity names (critical for transformations!)
Example: Time series
# DataFrame name: 'unit.ts.availability'
availability_ts = pd.DataFrame({
'coal_plant': [1.0, 1.0, 0.9, 0.95, ...],
'gas_turbine': [1.0, 1.0, 1.0, 1.0, ...],
'wind_farm': [0.3, 0.45, 0.6, 0.2, ...]
}, index=pd.date_range('2023-01-01', periods=8760, freq='h', name='datetime'))
Example: String-indexed map (period data)
# DataFrame name: 'unit.str.virtual_unitsize'
period_capacity = pd.DataFrame({
'coal_plant': [500.0, 500.0, 400.0],
'gas_turbine': [200.0, 250.0, 300.0]
}, index=pd.Index(['y2025', 'y2030', 'y2035'], name='period'))
Example: Array
# DataFrame name: 'system.array.solve_order'
solve_order = pd.DataFrame({
'main_system': ['y2025', 'y2030', 'y2035']
})
MultiIndex Column Convention
For multi-dimensional entity classes (e.g., unit_to_node), time series/map/array DataFrames use MultiIndex columns:
-
Column tuples:
(entity_name, dimension1, dimension2, …) -
Level names:
['name', 'source', 'sink']etc.
# DataFrame name: 'unit_to_node.ts.profile_limit_upper'
profile_ts = pd.DataFrame({
('coal_plant.west', 'coal_plant', 'west'): [1.0, 0.95, 0.9, ...],
('gas_turbine.east', 'gas_turbine', 'east'): [1.0, 1.0, 1.0, ...]
}, index=pd.date_range('2023-01-01', periods=8760, freq='h', name='datetime'))
profile_ts.columns = pd.MultiIndex.from_tuples(
profile_ts.columns.tolist(),
names=['name', 'source', 'sink']
)
This convention enables dimension-based transformations (reordering, aggregation) in the YAML transformer.
3. YAML Transformer Syntax
The YAML transformer handles most data transformations declaratively. Each operation in the YAML file defines a transformation from source to target.
Basic Structure
operation-name:
- source_specification
- target_specification
- operations_list (optional)
Entity Operations
Simple entity copy:
# Copy all entities from 'unit' class to 'unit' class
unit-entities:
- unit
- unit
Entity copy with dimension specification:
# Copy unit_to_node entities, specifying dimension names
unit-to-node-entities:
- unit_to_node
- unit.outputNode:
order: [[0], [1], [2]]
- dimensions: [unit, node]
Conditional entity copy with if_parameter:
# Only copy nodes that have the 'has_balance' parameter set
balance-entities:
- node
- balance:
if_parameter: has_balance
Conditional entity copy with if_not_parameter:
# Copy nodes that have has_balance but NOT has_storage
balance-entities:
- node
- balance
- - if_parameter: has_balance
- if_not_parameter: has_storage
Multiple condition parameters:
# Copy if any of the listed parameters exist
profile-entities:
- unit_to_node
- profile:
order: [[0]]
if_parameter: [profile_limit_upper, profile_limit_lower]
Dimension Reordering
The order specification controls how source dimensions map to target dimensions.
Syntax: order: [[source_dims_for_target_0], [source_dims_for_target_1], …]
-
Index
0refers to the source entity name -
Index
1, 2, …refer to source dimension elements
Examples:
# Direct 1:1 mapping (entity_name, dim1, dim2) -> (name, source, sink)
unit_to_node-entities:
- unit.outputNode
- unit_to_node:
order: [[0], [1], [2]]
- dimensions: [source, sink]
# Swap dimensions: (name, dim1, dim2) -> (name, dim2, dim1)
node_to_unit-entities:
- unit.inputNode
- node_to_unit:
order: [[0], [2], [1]]
- dimensions: [source, sink]
# Extract only entity name (drop dimensions)
link efficiency:
- connection: efficiency
- link: efficiency
- order: [[0]]
# Combine dimensions into name
link-entities:
- connection.node.node
- link:
order: [[1], [2], [3]]
- dimensions: [node_A, node_B]
Parameter Transformations
Simple parameter copy:
unit_efficiency:
- unit: efficiency
- unit: efficiency
Parameter rename:
units_existing:
- unit: existing
- unit: units_existing
Parameter with value rename mapping:
unit_investment_method:
- unit: invest_method
- unit: investment_method
- rename:
not_allowed: not_allowed
invest_no_limit: no_limits
Data Type Conversions
Use list notation [parameter, [type]] for indexed parameters:
Time series (ts) - datetime indexed:
balance_inflow:
- node: [inflow, [str]] # Source: string-indexed map
- balance: [flow_profile, [ts]] # Target: datetime-indexed time series
- - if_parameter: has_balance
- if_not_parameter: has_storage
String-indexed map (str):
unit_to_node profile_limit_upper:
- unit_to_node: [profile_limit_upper, [ts]] # Source: time series
- profile: [profile, [str]] # Target: string-indexed map
- - order: [[0]]
Array:
solve_order:
- system: [solve_order, [array]]
- model: [solves, [array]]
Mathematical Operations
Multiply/Divide/Add/Subtract with constant:
storage_investment_cost:
- node: invest_cost
- storage: investment_cost
- - if_parameter: has_storage
- operation: multiply
with: 1000
storage fixed-cost:
- storage: fixed_cost
- node: fixed_cost
- operation:
- multiply:
with: 0.001
Algebra between multiple sources:
# Multiply values from two different sources
# Formula: source1 * source2 where match defines dimension alignment
test-multiply:
- node_to_unit: other_operational_cost
unit: efficiency
- unit.inputNode: test
- - algebra: "1*2"
match: [[2], [1]] # Match dim 2 of source 1 with dim 1 of source 2
- order: [[2], [1]]
Aggregation
When reducing dimensions, specify how to aggregate:
unit_to_node investment_cost:
- unit_to_node: investment_cost
- unit: invest_cost
- - order: [[1]]
aggregate: sum # Options: sum, average, max, min, first
Creating Parameters from Entities
Use value to create parameters based on entity existence:
Constant value:
has_balance:
- [balance, storage]
- node: has_balance
- value: 'yes'
Value from dimension:
# Extract dimension value as parameter
# value: [N] extracts the Nth dimension
link_node_A:
- connection.node.node
- link: node_A
- - order: [[0]]
- value: [2] # Extract dimension 2 as the parameter value
4. Python Transformer Functions
Use Python transformers for complex logic that cannot be expressed in YAML.
When to Use Python vs YAML
Use YAML for:
-
Direct entity/parameter copies
-
Simple renames and value mappings
-
Dimension reordering
-
Basic mathematical operations with constants
-
Conditional filtering
Use Python for:
-
Complex conditional logic spanning multiple DataFrames
-
Temporal data reconstruction (timelines, solve patterns)
-
Lookups across different entity classes
-
Business logic requiring iteration or state
Python Transformer Structure
"""
Module docstring explaining the transformations.
"""
import pandas as pd
from typing import Dict
from datetime import datetime
def transform_to_cesm(source: Dict[str, pd.DataFrame],
cesm: Dict[str, pd.DataFrame],
start_time: datetime) -> Dict[str, pd.DataFrame]:
"""
Main entry point called after the YAML transformer.
Args:
source: Dictionary of source DataFrames
cesm: Dictionary of CESM DataFrames (partially transformed by YAML)
start_time: Start datetime for the timeline
Returns:
Updated cesm dictionary with all Python transformations applied
"""
cesm = my_custom_transform(source, cesm)
cesm = another_transform(source, cesm, start_time)
return cesm
Available Helpers in transform_parameters.py
The transform_parameters.py module provides utilities:
from src.core.transform_parameters import (
transform_data, # Main YAML transformer function
load_config, # Load YAML configuration
is_timeseries, # Check if DataFrame is time series
get_entity_index, # Get entity index (handles pivoted data)
set_entity_index, # Set entity index
index_to_names, # Convert index to list of name lists
list_of_lists_to_index, # Convert name lists back to index
reorder_dimensions, # Reorder DataFrame dimensions
apply_rename, # Apply value rename mapping
)
Example: Time Data Transformation
def time_from_spine(flextool: Dict[str, pd.DataFrame],
cesm: Dict[str, pd.DataFrame],
start_time: datetime) -> Dict[str, pd.DataFrame]:
"""
Extract temporal data from FlexTool format and add to CESM.
Creates/updates:
- cesm['timeline']: DataFrame with DatetimeIndex
- cesm['solve_pattern']: DataFrame with start_time, duration
- cesm['period']: DataFrame with years_represented
"""
timestep_minutes = _get_timestep_minutes(flextool)
if 'timeline.str.timestep_duration' in flextool:
timestep_df = flextool['timeline.str.timestep_duration']
datetime_index = _convert_str_index_to_datetime(
timestep_df.index, start_time, timestep_minutes
)
cesm['timeline'] = pd.DataFrame(index=datetime_index)
cesm['timeline'].index.name = 'datetime'
return cesm
5. DuckDB Storage Format
DuckDB serves as the persistent storage for CESM data, with metadata to reconstruct exact DataFrame structures.
Column Encoding
MultiIndex columns are encoded using :: separator:
("region", "north", "heat") -> "region::north::heat"
The :: separator is used because entity names may contain dots (e.g., source.sink).
Metadata Table
Each DuckDB file contains a _dataframe_metadata table with:
| Column | Type | Description |
|---|---|---|
|
string |
Original DataFrame name (e.g., |
|
string |
SQL-safe table name (dots replaced with underscores) |
|
string |
|
|
int |
Number of index columns (stored as first N columns) |
|
bool |
Whether columns are MultiIndex |
|
JSON |
Array of level names if MultiIndex columns |
6. Readers and Writers
Available Readers
from_spine_db.py - Read Spine Toolbox databases:
from src.readers.from_spine_db import spine_to_dataframes
dfs = spine_to_dataframes(
db_url="sqlite:///path/to/database.sqlite",
scenario="base"
)
from_duckdb.py - Read DuckDB CESM storage:
from src.readers.from_duckdb import dataframes_from_duckdb
dfs = dataframes_from_duckdb("path/to/cesm.duckdb")
Available Writers
to_duckdb.py - Write to DuckDB CESM storage:
from src.writers.to_duckdb import dataframes_to_duckdb
dataframes_to_duckdb(
dataframes=dfs,
db_path="output/cesm.duckdb",
overwrite=True
)
to_spine_db.py - Write to Spine Toolbox databases:
from src.writers.to_spine_db import dataframes_to_spine
dataframes_to_spine(
dataframes=dfs,
db_url="sqlite:///path/to/output.sqlite",
import_datetime="2025-01-15_10-30",
purge_before_import=True
)
How Readers/Writers Preserve Structure
Entity DataFrames:
-
Index stored as first column(s)
-
MultiIndex levels preserved via metadata
-
Column names preserved as-is
Time Series/Map/Array DataFrames:
-
DatetimeIndex converted to/from ISO 8601 strings
-
MultiIndex columns encoded/decoded with
::separator -
Level names preserved in metadata
7. Creating a New Transformer
Directory Structure
Transformers are organized by source format, CESM version, and source version:
src/transformers/
└── {source_format}/
└── cesm_{cesm_version}/
└── {source_version}/
├── from_{source}.yaml # Import: source -> CESM
├── to_cesm.py # Import: complex transformations (optional)
├── to_{target}.yaml # Export: CESM -> target
└── from_cesm.py # Export: complex transformations (optional)
Example:
src/transformers/
└── irena_flextool/
└── cesm_v0.1.0/
└── v3.14.0/
├── from_flextool.yaml
├── to_cesm.py
├── to_flextool.yaml
└── from_cesm.py
Step-by-Step: Creating an Import Transformer
-
Create directory structure:
mkdir -p src/transformers/{source}/cesm_v0.1.0/{version} -
Analyze source data format:
-
Identify entity classes and their relationships
-
Map source parameters to CESM equivalents
-
Note indexed data (time series, maps, arrays)
-
-
Create a reader (
src/readers/from_{source}.py) if the source format is not already supported. The reader must returnDict[str, pd.DataFrame]following the naming conventions above. -
Create
from_{source}.yaml:# Entity mappings source-entity-entities: - source_class - cesm_class # Parameter mappings source_param: - source_class: source_param_name - cesm_class: cesm_param_name -
Create
to_cesm.py(if needed):def transform_to_cesm(source, cesm, start_time): # Complex transformations return cesm -
Create a processing script (
scripts/processing/{source}_to_cesm.py) that orchestrates the pipeline: parse CLI args, call reader, apply YAML transformer viacore.transform_parameters.transform_data(), apply Python transformer, call writer. Seeflextool_to_cesm.pyandgriddb_to_cesm.pyfor working examples. -
Test the transformer:
from src.readers.from_spine_db import spine_to_dataframes from src.core.transform_parameters import transform_data source_dfs = spine_to_dataframes(db_url, scenario) cesm_dfs = transform_data(source_dfs, 'from_source.yaml')
Step-by-Step: Creating an Export Transformer
-
Create
to_{target}.yaml:# Entity mappings (reverse of import) cesm-entity-entities: - cesm_class - target_class # Parameter mappings cesm_param: - cesm_class: cesm_param_name - target_class: target_param_name -
Create
from_cesm.py(if needed):def transform_from_cesm(cesm, target, timeline): # Complex reverse transformations return target -
Create a writer (
src/writers/to_{target}.py) if the target format is not already supported. -
Test the full round-trip:
# Import source_dfs = reader(source_path) cesm_dfs = transform_data(source_dfs, 'from_source.yaml') # Export target_dfs = transform_data(cesm_dfs, 'to_target.yaml') writer(target_dfs, target_path)
Checklist: Import Transformer
-
Create
from_{source}.yamlwith entity mappings -
Map all scalar parameters
-
Handle indexed data (time series, maps, arrays) with type annotations
-
Add dimension specifications for multi-dimensional entities
-
Implement conditional logic with
if_parameter/if_not_parameter -
Create
to_cesm.pyfor complex transformations (if needed) -
Test with sample data
-
Verify round-trip consistency
Checklist: Export Transformer
-
Create
to_{target}.yamlwith entity mappings -
Map all CESM parameters to target format
-
Handle data type conversions (
[ts]<→[str]) -
Implement dimension reordering with
order -
Add aggregation where dimensions collapse
-
Create
from_cesm.pyfor complex transformations (if needed) -
Test with sample CESM data
-
Verify output matches target format specification
8. Quick Reference
YAML Operation Types
| Pattern | Type | Description |
|---|---|---|
|
Entity copy |
Copy entities between classes |
|
Parameter transform |
Copy/transform parameter values |
|
Create parameter |
Create parameter with fixed value |