Source code for osier.models.dispatch

import pandas as pd
import numpy as np
import pyomo.environ as pe
import pyomo.opt as po
from pyomo.environ import ConcreteModel
from unyt import unyt_array, hr, MW, kW, GW
import itertools as it
from osier import Technology
from osier.technology import _validate_quantity, _validate_unit
from osier.utils import synchronize_units
import warnings
import logging
import time


_freq_opts = {'D': 'day',
              'H': 'hour',
              'S': 'second',
              'T': 'minute'}
LARGE_NUMBER = 1e20
MEDIUM_NUMBER = 1e10
BLACKOUT_COST = 10 * (1 / (kW * hr))  # M$/kWh


curtailment_tech = Technology(technology_name='Curtailment',
                              technology_type='removal',
                              dispatchable=True,
                              capacity=MEDIUM_NUMBER)
reliability_tech = Technology(technology_name='LoadLoss',
                              technology_type='matching',
                              dispatchable=True,
                              fuel_cost=BLACKOUT_COST,
                              capacity=MEDIUM_NUMBER)


[docs] class DispatchModel(): """ The :class:`DispatchModel` class creates and solves a basic dispatch model from the perspective of a "grid operator." The model uses `pyomo <https://pyomo.readthedocs.io/en/stable/index.html>`_ to create and solve a linear programming model. The mathematical formulation for this problem is: Minimize .. math:: \\text{C}_{\\text{total}} = \\sum_t^T \\sum_u^U \\left[c^{\\text{fuel}}_{u,t} + c^{\\text{om,var}}_{u,t}\\right]x_{u,t} Such that, 1. The generation meets demand within a user-specified tolerance (undersupply and oversupply) .. math:: \\sum_u^Ux_{u,t} &\\geq \\left(1-\\text{undersupply}\\right)\\text{D}_t \\ \\forall \\ t \\in T \\sum_u^Ux_{u,t} &\\leq \\left(1+\\text{oversupply}\\right)\\text{D}_t \\ \\forall \\ t \\in T 2. A technology's generation (:math:`x_u`) does not exceed its capacity to generate at any time, :math:`t`. .. math:: x_{u,t} \\leq \\textbf{CAP}_{u}\\Delta t \\ \\forall \\ u,t \\in U,T 3. Technologies may not exceed their ramp up rate, .. math:: \\frac{x_{r,t} - x_{r,t-1}}{\\Delta t} = \\Delta P_{r,t} \\leq (\\text{ramp up})\\textbf{CAP}_u\\Delta t \\ \\forall \\ r,t \\in R \\subset U, T or ramp down rate, .. math:: \\frac{x_{r,t} - x_{r,t-1}}{\\Delta t} = \\Delta P_{r,t} \\leq -(\\text{ramp down})\\textbf{CAP}_u\\Delta t \\ \\forall \\ r,t \\in R \\subset U, T . Parameters ---------- technology_list : list of :class:`osier.Technology` The list of :class:`Technology` objects to dispatch -- i.e. decide how much energy each technology should produce. net_demand : list, :class:`numpy.ndarray`, :class:`unyt.array.unyt_array`, :class:`pandas.DataFrame` The remaining energy demand to be fulfilled by the technologies in :attr:`technology_list`. The `values` of an object passed as `net_demand` are used to create a supply constraint. See :attr:`oversupply` and :attr:`undersupply`. If a :class:`pandas.DataFrame` is passed, :mod:`osier` will try inferring a `time_delta` from the dataframe index. Otherwise, the :attr:`time_delta` must be passed or the default is used. time_delta : str, :class:`unyt.unyt_quantity`, float, int Specifies the amount of time between two time slices. The default is one hour. Can be overridden by specifying a unit with the value. For example: >>> time_delta = "5 minutes" >>> from unyt import days >>> time_delta = 30*days would both work. power_units : str, :class:`unyt.unit_object` Specifies the units for the power demand. The default is :attr:`MW`. Can be overridden by specifying a unit with the value. solver : str Indicates which solver to use. May require separate installation. Accepts: ['cplex', 'cbc', 'appsi_highs']. Other solvers will be added in the future. lower_bound : float The minimum amount of energy each technology can produce per time period. Default is 0.0. oversupply : float The amount of allowed oversupply as a percentage of demand. Default is 0.0 (no oversupply allowed). undersupply : float The amount of allowed undersupply as a percentage of demand. Default is 0.0 (no undersupply allowed). verbosity : Optional, int Sets the logging level for the simulation. Accepts `logging.LEVEL` or integer where LEVEL is {10:DEBUG, 20:INFO, 30:WARNING, 40:ERROR, 50:CRITICAL}. curtailment : boolean Indicates if the model should enable a curtailment option. allow_blackout : boolean If True, a "reliability" technology is added to the model that will fulfill the mismatch in supply and demand. This reliability technology has a variable cost of 1e4 $/MWh. The value must be higher than the variable cost of any other technology to prevent a pathological preference for blackouts. Default is False. Attributes ---------- model : :class:`pyomo.environ.ConcreteModel` The :mod:`pyomo` model class that converts python code into a set of linear equations. objective : float The result of the model's objective function. Only instantiated after :meth:`DispatchModel.solve()` is called. n_timesteps : int The number of timesteps in the model. upper_bound : float The upper bound for all decision variables. Chosen to be equal to the maximum capacity of all technologies in :attr:`tech_set`. storage_upper_bound : float The upper bound for storage decision variables. penalty : float The penalty applied to the objective function to eliminate simultaneous charging and discharging. Users may need to tune this parameter. Default is 1e-10. model_initialized : bool Indicates whether :attr:`DispatchModel.model` has been populated with equations yet. This is set to ``True`` after :meth:`DispatchModel._write_model_equations()` has been called. indices : list of tuples The list of tuples representing the product of the :attr:`tech_set` and :attr:`time_set` attributes. tech_set : list of str A list of the unique technology names in the simulation. capacity_dict : dict A dictionary of {name : capacity} pairs. efficiency_dict : dict A dictionary of {name : efficiency} pairs. time_set : iterable The result of ``range(self.n_timesteps)``. cost_params : list The set of cost parameters for each technology. Corresponds to a list of values. Size is equal to the product of the number of timesteps and the number of technologies in the model. ramp_up_params : list The set of ramp_up parameters. Only initialized if there is a ramping technology. ramp_down_params : list The set of ramp_down parameters. Only initialized if there is a ramping technology. ramping_techs : list The subset of :attr:`tech_set` containing ramping technologies. Notes ----- 1. Technically, :attr:`solver` will accept any solver that :class:`pyomo` can use. We only list two solvers because those are the only solvers in the :mod:`osier` test suite. 2. The default value for :attr:`time_delta` in :attr:`__init__` is `None`. This is replaced by a setter method in :class:`Dispatch`. 3. This formulation uses a :attr:`penalty` parameter to prevent unphysical behavior because it preserves the problem's linearity. Some formulations may use a binary variable to prevent simultaneous charging and discharing. However, this changes the problem to a mixed-integer linear program which requires a more sophisticated solver such as ``gurobi``. 4. :meth:`_write_model_equations` may be called before :meth:`solve` if users wish to add their own constraints or parameters to the problem. """ def __init__(self, technology_list, net_demand, time_delta=None, solver='cbc', lower_bound=0.0, oversupply=0.0, undersupply=0.0, verbosity=50, penalty=1e-10, power_units=MW, curtailment=True, allow_blackout=False, **kwargs): self.net_demand = net_demand self.time_delta = time_delta self.lower_bound = lower_bound self.oversupply = oversupply self.undersupply = undersupply self.penalty = penalty self.model = ConcreteModel() self.solver = solver self.results = None self.objective = None self.model_initialized = False self.verbosity = verbosity if self.verbosity > 10: self.verbose = False else: self.verbose = True self.curtailment = curtailment self.allow_blackout = allow_blackout if isinstance(net_demand, unyt_array): self.power_units = net_demand.units else: self.power_units = power_units if ((self.curtailment) and (self.allow_blackout)): sync_list = technology_list + [curtailment_tech, reliability_tech] elif self.curtailment: sync_list = technology_list + [curtailment_tech] elif self.allow_blackout: sync_list = technology_list + [reliability_tech] else: sync_list = technology_list self.technology_list = synchronize_units( sync_list, unit_power=self.power_units, unit_time=self.time_delta.units) logging.basicConfig(level=verbosity, format='%(message)s') @property def time_delta(self): return self._time_delta @time_delta.setter def time_delta(self, value): if value: valid_quantity = _validate_quantity(value, dimension='time') self._time_delta = valid_quantity else: if isinstance(self.net_demand, pd.DataFrame): try: freq_list = list(self.net_demand.index.inferred_freq) freq_key = freq_list[-1] try: value = float(freq_list[0]) except ValueError: warnings.warn((f"Could not convert value " f"{freq_list[0]} to float. " "Setting to 1.0."), UserWarning) value = 1.0 self._time_delta = _validate_quantity( f"{value} {_freq_opts[freq_key]}", dimension='time') except KeyError: warnings.warn( (f"Could not infer time delta with freq {freq_key} " "from pandas dataframe. Setting delta to 1 hour."), UserWarning) self._time_delta = 1 * hr else: self._time_delta = 1 * hr @property def power_units(self): return self._power_units @power_units.setter def power_units(self, value): if value: valid_quantity = _validate_unit(value, dimension='power') self._power_units = valid_quantity else: warnings.warn(f"Could not infer power units. Unit set to MW.") self._power_units = MW @property def n_timesteps(self): return len(self.net_demand) @property def tech_set(self): tech_names = [t.technology_name for t in self.technology_list] return tech_names @property def capacity_dict(self): capacity_set = unyt_array([t.capacity for t in self.technology_list]) return dict(zip(self.tech_set, capacity_set)) @property def efficiency_dict(self): capacity_set = [t.efficiency for t in self.technology_list] return dict(zip(self.tech_set, capacity_set)) @property def time_set(self): return range(self.n_timesteps) @property def indices(self): return list(it.product(self.tech_set, self.time_set)) @property def cost_params(self): v_costs = np.array([ (t.variable_cost_ts(self.n_timesteps)) for t in self.technology_list ]).flatten() return dict(zip(self.indices, v_costs)) @property def storage_techs(self): return [t.technology_name for t in self.technology_list if hasattr(t, "storage_capacity")] @property def storage_upper_bound(self): caps = unyt_array([t.storage_capacity for t in self.technology_list if hasattr(t, "storage_capacity")]) return caps.max().to_value() @property def max_storage_params(self): storage_dict = {t.technology_name: t.storage_capacity.to_value() for t in self.technology_list if hasattr(t, "storage_capacity")} return storage_dict @property def initial_storage_params(self): storage_dict = {t.technology_name: t.initial_storage.to_value() for t in self.technology_list if hasattr(t, "initial_storage")} return storage_dict @property def ramping_techs(self): return [t.technology_name for t in self.technology_list if hasattr(t, 'ramp_up')] @property def ramp_up_params(self): rates_dict = { t.technology_name: ( t.ramp_up * self.time_delta).to_value() for t in self.technology_list if hasattr(t, 'ramp_up')} return rates_dict @property def ramp_down_params(self): rates_dict = { t.technology_name: ( t.ramp_down * self.time_delta).to_value() for t in self.technology_list if hasattr(t, 'ramp_up')} return rates_dict @property def upper_bound(self): caps = unyt_array([t.capacity for t in self.technology_list]) return caps.max().to_value() def _create_model_indices(self): self.model.Generators = pe.Set(initialize=self.tech_set, ordered=True) self.model.Time = pe.Set(initialize=self.time_set, ordered=True) if len(self.ramping_techs) > 0: self.model.RampingTechs = pe.Set(initialize=self.ramping_techs, ordered=True, within=self.model.Generators) if len(self.storage_techs) > 0: self.model.StorageTech = pe.Set(initialize=self.storage_techs, ordered=True, within=self.model.Generators) def _create_demand_param(self): self.model.Demand = pe.Param(self.model.Time, initialize=dict( zip(self.model.Time, np.array(self.net_demand)))) def _create_cost_param(self): self.model.VariableCost = pe.Param( self.model.Generators, self.model.Time, initialize=self.cost_params) def _create_ramping_params(self): self.model.ramp_up = pe.Param( self.model.RampingTechs, initialize=self.ramp_up_params) self.model.ramp_down = pe.Param( self.model.RampingTechs, initialize=self.ramp_down_params) def _create_max_storage_params(self): self.model.storage_capacity = pe.Param( self.model.StorageTech, initialize=self.max_storage_params ) def _create_init_storage_params(self): self.model.initial_storage = pe.Param( self.model.StorageTech, initialize=self.initial_storage_params ) def _create_model_variables(self): self.model.x = pe.Var(self.model.Generators, self.model.Time, domain=pe.NonNegativeReals, bounds=(self.lower_bound, self.upper_bound)) if len(self.storage_techs) > 0: self.model.storage_level = pe.Var( self.model.StorageTech, self.model.Time, domain=pe.NonNegativeReals, bounds=( self.lower_bound, self.storage_upper_bound)) self.model.charge = pe.Var(self.model.StorageTech, self.model.Time, domain=pe.NonNegativeReals, bounds=(self.lower_bound, self.upper_bound)) def _objective_function(self): expr = sum(self.model.VariableCost[g, t] * self.model.x[g, t] for g in self.model.Generators for t in self.model.Time) if len(self.storage_techs) > 0: expr += sum(self.model.x[s, t] + self.model.charge[s, t] for s in self.model.StorageTech for t in self.model.Time) * self.penalty self.model.objective = pe.Objective(sense=pe.minimize, expr=expr) def _supply_constraints(self): self.model.oversupply = pe.ConstraintList() self.model.undersupply = pe.ConstraintList() for t in self.model.Time: generation = sum(self.model.x[g, t] for g in self.model.Generators if g != 'Curtailment') if self.curtailment: generation -= self.model.x['Curtailment', t] if len(self.storage_techs) > 0: generation -= sum(self.model.charge[s, t] for s in self.model.StorageTech) over_demand = self.model.Demand[t] * (1 + self.oversupply) under_demand = self.model.Demand[t] * (1 - self.undersupply) self.model.oversupply.add(generation <= over_demand) self.model.undersupply.add(generation >= under_demand) def _generation_constraint(self): self.model.gen_limit = pe.ConstraintList() for g in self.model.Generators: unit_capacity = ( self.capacity_dict[g] * self.time_delta).to_value() for t in self.model.Time: unit_generation = self.model.x[g, t] self.model.gen_limit.add(unit_generation <= unit_capacity) def _ramping_constraints(self): self.model.ramp_up_limit = pe.ConstraintList() self.model.ramp_down_limit = pe.ConstraintList() for r in self.model.RampingTechs: ramp_up = self.model.ramp_up[r] ramp_down = self.model.ramp_down[r] for t in self.model.Time: if t != self.model.Time.first(): t_prev = self.model.Time.prev(t) previous_gen = self.model.x[r, t_prev] current_gen = self.model.x[r, t] delta_power = (current_gen - previous_gen) / \ self.time_delta.to_value() self.model.ramp_up_limit.add(delta_power <= ramp_up) self.model.ramp_down_limit.add(delta_power >= -ramp_down) def _storage_constraints(self): self.model.discharge_limit = pe.ConstraintList() self.model.charge_limit = pe.ConstraintList() self.model.charge_rate_limit = pe.ConstraintList() self.model.storage_limit = pe.ConstraintList() self.model.set_storage = pe.ConstraintList() for s in self.model.StorageTech: efficiency = self.efficiency_dict[s] storage_cap = self.model.storage_capacity[s] unit_capacity = ( self.capacity_dict[s] * self.time_delta).to_value() initial_storage = self.model.initial_storage[s] for t in self.model.Time: self.model.charge_rate_limit.add( self.model.charge[s, t] <= unit_capacity) if t == self.model.Time.first(): self.model.set_storage.add(self.model.storage_level[s, t] == initial_storage) self.model.discharge_limit.add( self.model.x[s, t] <= initial_storage) self.model.charge_limit.add(self.model.charge[s, t] <= storage_cap - initial_storage) else: t_prev = self.model.Time.prev(t) previous_storage = self.model.storage_level[s, t_prev] current_discharge = self.model.x[s, t] current_charge = self.model.charge[s, t] self.model.set_storage.add( self.model.storage_level[s, t] == previous_storage + np.sqrt(efficiency) * current_charge - np.sqrt(efficiency) * current_discharge ) self.model.charge_limit.add( np.sqrt(efficiency) * current_charge <= storage_cap - previous_storage) self.model.discharge_limit.add( current_discharge <= previous_storage) self.model.storage_limit.add( self.model.storage_level[s, t] <= storage_cap) def _write_model_equations(self): self._create_model_indices() self._create_demand_param() self._create_cost_param() self._create_model_variables() self._objective_function() self._supply_constraints() self._generation_constraint() if len(self.ramping_techs) > 0: self._create_ramping_params() self._ramping_constraints() if len(self.storage_techs) > 0: self._create_init_storage_params() self._create_max_storage_params() self._storage_constraints() self.model_initialized = True def _format_results(self): df = pd.DataFrame(index=self.model.Time) for g in self.model.Generators: if g == 'Curtailment': df[g] = [-1 * self.model.x[g, t].value for t in self.model.Time] else: df[g] = [self.model.x[g, t].value for t in self.model.Time] if len(self.storage_techs) > 0: for s in self.model.StorageTech: df[f"{s}_charge"] = [-1 * self.model.charge[s, t].value for t in self.model.Time] df[f"{s}_level"] = [self.model.storage_level[s, t].value for t in self.model.Time] df["Cost"] = np.array(sum(self.model.VariableCost[g, t] * self.model.x[g, t].value for g in self.model.Generators) for t in self.model.Time) return df
[docs] def solve(self, solver=None): """ Executes the model solve. Model equations are written at the time the method is called. Parameters ---------- solver : str Indicates which solver to use. If no solver is specified, the default :attr:`DispatchModel.solver` attribute is used. Default is ['cplex']. """ if not self.model_initialized: self._write_model_equations() if solver: optimizer = po.SolverFactory(solver) else: optimizer = po.SolverFactory(self.solver) try: optimizer.solve(self.model, tee=self.verbose) self.objective = self.model.objective() except (ValueError, RuntimeError): if self.verbosity <= 30: warnings.warn( f"Infeasible or no solution. Objective set to {LARGE_NUMBER}") self.objective = LARGE_NUMBER try: self.results = self._format_results() except TypeError: self.results = None