Client Initialization

The Python SDK provides both synchronous and asynchronous clients.

Synchronous Client

from openelectricity import OEClient

# Initialize with environment variables (recommended)
client = OEClient()

# Or explicitly pass credentials
client = OEClient(
    api_key="your-api-key",
    base_url="https://api.openelectricity.org.au/v4"
)

# Use as context manager
with OEClient() as client:
    # Make API calls
    pass

Asynchronous Client

from openelectricity import AsyncOEClient
import asyncio

async def main():
    async with AsyncOEClient() as client:
        # Make async API calls
        pass

asyncio.run(main())

Network Data

Fetch network-level time series data for power, energy, emissions, and market value metrics.

get_network_data

def get_network_data(
    network_code: str,                    # "NEM" | "WEM" | "AU"
    metrics: list[DataMetric],           # List of metrics to fetch
    interval: str = "5m",                 # Time interval
    date_start: datetime | None = None,  # Start date
    date_end: datetime | None = None,    # End date
    primary_grouping: str | None = None, # Primary grouping
    secondary_grouping: str | None = None # Secondary grouping
) -> TimeSeriesResponse
Example:
from openelectricity.types import DataMetric
from datetime import datetime

response = client.get_network_data(
    network_code="NEM",
    metrics=[DataMetric.POWER, DataMetric.ENERGY],
    interval="1h",
    date_start=datetime(2024, 1, 1),
    date_end=datetime(2024, 1, 2),
    primary_grouping="network_region",
    secondary_grouping="fueltech"
)

# Access data
for timeseries in response.data:
    print(f"Metric: {timeseries.metric}")
    for result in timeseries.results:
        for data_point in result.data:
            print(f"  {data_point.timestamp}: {data_point.value}")

Market Data

Fetch market-related metrics including price, demand, and curtailment.

get_market

def get_market(
    network_code: str,                    # "NEM" | "WEM" | "AU"
    metrics: list[MarketMetric],         # List of market metrics
    interval: str = "5m",                 # Time interval
    date_start: datetime | None = None,  # Start date
    date_end: datetime | None = None,    # End date (omit for latest)
    network_region: str | None = None,   # Specific region filter
    primary_grouping: str | None = None  # Primary grouping
) -> TimeSeriesResponse

Available Market Metrics

Price and Demand:
  • MarketMetric.PRICE - Electricity spot price ($/MWh)
  • MarketMetric.DEMAND - Electricity demand (MW)
  • MarketMetric.DEMAND_ENERGY - Electricity demand energy (MWh)
Curtailment Power (MW):
  • MarketMetric.CURTAILMENT - Total curtailment across all renewables
  • MarketMetric.CURTAILMENT_SOLAR - Solar generation curtailed
  • MarketMetric.CURTAILMENT_WIND - Wind generation curtailed
Curtailment Energy (MWh):
  • MarketMetric.CURTAILMENT_ENERGY - Total curtailed energy
  • MarketMetric.CURTAILMENT_SOLAR_ENERGY - Solar energy curtailed
  • MarketMetric.CURTAILMENT_WIND_ENERGY - Wind energy curtailed

Examples

Fetch Real-time Curtailment (5-minute intervals):
from openelectricity.types import MarketMetric
from datetime import datetime, timedelta
import pandas as pd

# Get latest curtailment data (omit date_end for latest)
response = client.get_market(
    network_code="NEM",
    metrics=[
        MarketMetric.CURTAILMENT_SOLAR,
        MarketMetric.CURTAILMENT_WIND,
        MarketMetric.CURTAILMENT
    ],
    interval="5m",
    date_start=datetime.now() - timedelta(days=1),
    # date_end omitted to get latest data
    primary_grouping="network_region"
)

# Convert to DataFrame
data = []
for timeseries in response.data:
    for result in timeseries.results:
        region = result.name.split("_")[-1]  # Extract region from name
        for data_point in result.data:
            data.append({
                "timestamp": data_point.timestamp,
                "region": region,
                "metric": timeseries.metric,
                "value": data_point.value,
                "unit": timeseries.unit
            })

df = pd.DataFrame(data)
Fetch Daily Curtailment Energy:
# Get daily curtailment energy totals (MWh)
response = client.get_market(
    network_code="NEM",
    metrics=[
        MarketMetric.CURTAILMENT_SOLAR_ENERGY,
        MarketMetric.CURTAILMENT_WIND_ENERGY,
        MarketMetric.CURTAILMENT_ENERGY
    ],
    interval="1d",
    date_start=datetime(2024, 1, 1),
    date_end=datetime(2024, 1, 31),
    primary_grouping="network_region"
)

# Process results
for timeseries in response.data:
    print(f"\nMetric: {timeseries.metric} ({timeseries.unit})")
    for result in timeseries.results:
        region = result.name.split("_")[-1]
        total = sum(dp.value for dp in result.data if dp.value)
        print(f"  {region}: {total:,.0f} {timeseries.unit}")
Price and Curtailment Correlation:
# Fetch price and curtailment for correlation analysis
response = client.get_market(
    network_code="NEM",
    metrics=[
        MarketMetric.PRICE,
        MarketMetric.CURTAILMENT
    ],
    interval="5m",
    date_start=datetime.now() - timedelta(hours=24),
    primary_grouping="network_region"
)

# Calculate correlation by region
import pandas as pd

data = []
for timeseries in response.data:
    for result in timeseries.results:
        region = result.name.split("_")[-1]
        for data_point in result.data:
            data.append({
                "timestamp": data_point.timestamp,
                "region": region,
                "metric": timeseries.metric,
                "value": data_point.value
            })

df = pd.DataFrame(data)
pivot_df = df.pivot_table(
    index=["timestamp", "region"],
    columns="metric",
    values="value"
).reset_index()

# Calculate correlation
for region in pivot_df["region"].unique():
    region_df = pivot_df[pivot_df["region"] == region]
    correlation = region_df["price"].corr(region_df["curtailment"])
    print(f"{region}: {correlation:.3f}")

Facility Data

get_facility_data

Fetch facility-specific time series data.
def get_facility_data(
    network_code: str,                      # "NEM" | "WEM" | "AU"
    facility_codes: str | list[str],        # Single or multiple facility codes
    metrics: list[DataMetric],             # List of metrics
    interval: str = "5m",                   # Time interval
    date_start: datetime | None = None,    # Start date
    date_end: datetime | None = None       # End date
) -> TimeSeriesResponse
Example:
response = client.get_facility_data(
    network_code="NEM",
    facility_codes=["BAYSW1", "ERARING"],
    metrics=[DataMetric.POWER, DataMetric.EMISSIONS],
    interval="1h",
    date_start=datetime(2024, 1, 1),
    date_end=datetime(2024, 1, 2)
)

Facility Information

get_facilities

Get information about generation facilities and their units.
def get_facilities(
    status_id: list[str] | None = None,      # Filter by status
    fueltech_id: list[str] | None = None,    # Filter by fuel technology
    network_id: str | list[str] | None = None, # Filter by network
    network_region: str | None = None        # Filter by region
) -> FacilityResponse
Example:
from openelectricity.types import UnitStatusType, UnitFueltechType

# Get all operating solar and wind facilities in NSW
response = client.get_facilities(
    status_id=[UnitStatusType.OPERATING],
    fueltech_id=[
        UnitFueltechType.SOLAR_UTILITY,
        UnitFueltechType.WIND
    ],
    network_id=["NEM"],
    network_region="NSW1"
)

for facility in response.data:
    print(f"{facility.code}: {facility.name}")
    for unit in facility.units:
        print(f"  {unit.code}: {unit.capacity_mw} MW")

Data Analysis

Converting to DataFrames

The SDK provides built-in support for converting responses to Pandas and Polars DataFrames.

Pandas Integration

import pandas as pd

# Get market data
response = client.get_market(
    network_code="NEM",
    metrics=[MarketMetric.PRICE, MarketMetric.DEMAND],
    interval="1h",
    date_start=datetime(2024, 1, 1),
    date_end=datetime(2024, 1, 2),
    primary_grouping="network_region"
)

# Convert to DataFrame
data = []
for timeseries in response.data:
    for result in timeseries.results:
        for data_point in result.data:
            data.append({
                "timestamp": data_point.timestamp,
                "metric": timeseries.metric,
                "value": data_point.value,
                "unit": timeseries.unit
            })

df = pd.DataFrame(data)

# Analyze
print(df.groupby("metric")["value"].describe())

Polars Integration

import polars as pl

# Same data structure as above
df = pl.DataFrame(data)

# Fast aggregations with Polars
result = (
    df.lazy()
    .groupby(["metric"])
    .agg([
        pl.col("value").mean().alias("avg"),
        pl.col("value").max().alias("max"),
        pl.col("value").min().alias("min")
    ])
    .collect()
)

Error Handling

The SDK provides comprehensive error handling with detailed error messages.
from openelectricity.exceptions import OpenElectricityError

try:
    response = client.get_market(
        network_code="NEM",
        metrics=[MarketMetric.PRICE],
        interval="invalid_interval"  # Invalid parameter
    )
except OpenElectricityError as e:
    print(f"API Error: {e}")
    if hasattr(e, 'response'):
        print(f"Details: {e.response}")

Best Practices

  1. Use environment variables for API credentials:
    export OPENELECTRICITY_API_KEY="your-api-key"
    export OPENELECTRICITY_API_URL="https://api.openelectricity.org.au/v4"
    
  2. Use context managers to ensure proper resource cleanup:
    with OEClient() as client:
        # API calls
        pass
    
  3. Omit date_end to get the latest available data:
    # Gets data from start_date to latest available
    response = client.get_market(
        network_code="NEM",
        metrics=[MarketMetric.CURTAILMENT],
        date_start=datetime.now() - timedelta(days=1)
        # date_end omitted
    )
    
  4. Choose appropriate intervals:
    • Use "5m" for real-time power monitoring
    • Use "1h" for hourly aggregations
    • Use "1d" for daily energy totals
    • Use energy metrics (MWh) for longer periods
  5. Handle large datasets efficiently:
    # Use generators for large datasets
    def process_large_dataset(client, start_date, end_date):
        current = start_date
        while current < end_date:
            chunk_end = min(current + timedelta(days=7), end_date)
            
            response = client.get_market(
                network_code="NEM",
                metrics=[MarketMetric.PRICE],
                date_start=current,
                date_end=chunk_end
            )
            
            yield response
            current = chunk_end