Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dev = [
"ruff>=0.15.4",
"tox-uv>=1.33.0",
"tox>=4.47.0",
"pytest>=8.0",
"provider-simenv[db]",
]

Expand Down
9 changes: 3 additions & 6 deletions src/provider_simenv/agents/farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,12 @@ def _step_bra(self):
raises per-unit cost, which raises unit_price automatically.
"""
env = self.model.environment
farm_scale = env.get_shock_scale("farm_capacity_bra")
farm_capacity = 1.0 + farm_scale * (self.scenario.farm_capacity_bra - 1.0)
farm_capacity = env.get_effective_value("farm_capacity_bra")
self.quantity_available = self.base_yield * farm_capacity

if self.quantity_available > 0:
# fertilizer price factor raises effective fixed costs this step
fertilizer_scale = env.get_shock_scale("fertilizer_price_factor")
fertilizer_factor = 1.0 + fertilizer_scale * (self.scenario.fertilizer_price_factor - 1.0)
fertilizer_factor = env.get_effective_value("fertilizer_price_factor")
effective_costs = self.fixed_costs * fertilizer_factor
self.unit_price = (effective_costs / self.quantity_available) * (1.0 + self.margin)
else:
Expand Down Expand Up @@ -222,8 +220,7 @@ def _step_arg(self):
farm_capacity_arg allows ARG-specific shocks to be modelled independently.
Defaults to 1.0 = always unshocked.
"""
arg_scale = self.model.environment.get_shock_scale("farm_capacity_arg")
farm_capacity = 1.0 + arg_scale * (self.scenario.farm_capacity_arg - 1.0)
farm_capacity = self.model.environment.get_effective_value("farm_capacity_arg")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this call different to the other ones?
Above env.get_effective_value is called, here it's self.model.environment.get_effective_value. What am I missing?

self.quantity_available = self.base_yield * farm_capacity

if self.quantity_available > 0:
Expand Down
15 changes: 6 additions & 9 deletions src/provider_simenv/agents/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ def step(self):
# Shared helper: receive from upstream list, convert, compute price
# ------------------------------------------------------------------

def _process(self, upstream_list, peer_list, capacity_factor: float = 1.0, shock_param: str = ""):
def _process(self, upstream_list, peer_list, param_name: str = ""):
"""
Pull an equal share of upstream output, apply conversion_ratio,
and compute unit_price accounting for yield loss.

capacity_factor: optional multiplier on output (used to apply oil_mill_capacity / feed_mill_capacity).
models indirect capaciy reduction from soja shortage.
param_name: scenario param whose effective value scales output.
Models indirect capacity reduction from soja shortage.

For every 1 unit of output, (1 / conversion_ratio) input units
were consumed, so the input cost per output unit is:
Expand All @@ -95,8 +95,7 @@ def _process(self, upstream_list, peer_list, capacity_factor: float = 1.0, shock
self.unit_price = 0.0
return

capacity_scale = self.model.environment.get_shock_scale(shock_param) if shock_param else 0.0
effective_factor =1.0 + capacity_scale * (capacity_factor - 1.0)
effective_factor = self.model.environment.get_effective_value(param_name) if param_name else 1.0

total_input = sum(a.quantity_available for a in active_upstream)

Expand Down Expand Up @@ -136,15 +135,13 @@ def _step_processor(self):
self._process(
upstream_list=combined_eu,
peer_list=self.model.processors,
capacity_factor=self.scenario.oil_mill_capacity,
shock_param="oil_mill_capacity",
param_name="oil_mill_capacity",
)

def _step_feed_manufacturer(self):
"""Receive soja meal from processors, compound to animal feed."""
self._process(
upstream_list=self.model.processors,
peer_list=self.model.feed_manufacturers,
capacity_factor=self.scenario.feed_mill_capacity,
shock_param="feed_mill_capacity",
param_name="feed_mill_capacity",
)
38 changes: 14 additions & 24 deletions src/provider_simenv/agents/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,12 @@ def step(self):
# cap at capacity, compute all-in unit_price (commodity + freight)
# ------------------------------------------------------------------

def _move(self, upstream, capacity_factor: float = 1.0, shock_param: str = ""):
def _move(self, upstream, param_name: str = ""):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

param_name is too vague imo.
Should be a more descriptive name

"""
Pull an equal share of upstream output, ca at own capacity,
and compute the all-in price passed to the next chain node.

capacity_factor: optional multiplier on self.capacity (used to
apply port_capacity_sa for SA land transport).
param_name: scenario param whose effective value scales this agent's capacity, via env.get_effective_value()
"""
margin = self.scenario.margin_transport

Expand All @@ -136,8 +135,7 @@ def _move(self, upstream, capacity_factor: float = 1.0, shock_param: str = ""):
volume_in = total_volume / n_self

env = self.model.environment
capacity_scale = env.get_shock_scale(shock_param) if shock_param else 0.0
effective_factor = 1.0 + capacity_scale * (capacity_factor - 1.0)
effective_factor = env.get_effective_value(param_name) if param_name else 1.0

# effective capacity after applying port capacity shock
effective_capacity = self.capacity * effective_factor
Expand All @@ -153,22 +151,21 @@ def _move(self, upstream, capacity_factor: float = 1.0, shock_param: str = ""):
# price = commodity price + freight fee per unit
# energy price factor raises transport operation costs
if self.quantity_available > 0:
energy_scale = env.get_shock_scale("energy_price_factor")
energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0)
energy_factor = env.get_effective_value("energy_price_factor")
effective_costs = self.fixed_costs * energy_factor
freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin)
self.unit_price = upstream_price + freight_fee
else:
self.unit_price = 0.0


def _move_split(self, upstream_list, share: float, capacity_factor: float = 1.0, shock_param: str = "", exclude_arg=False, exclude_usa=False):
def _move_split(self, upstream_list, share: float, param_name: str = "", exclude_arg=False, exclude_usa=False):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit excludes need to be removed in the future

"""
Like _move, but routes only share fraction of total upstream volume through this port.
Used to split wholesaler output between Santos and Paranagua.

:param share: fraction of total wholesaler output for this port (e.g. 0.7 for Santos, 0.3 for Paranagua).
:param capacity_factor: port specific capacity shock param (KG type 2)
:param param_name: scenario param whose effective value scales this agent's capacity.
"""
margin = self.scenario.margin_transport
active_upstream = upstream_list.filter(lambda a: a.active)
Expand All @@ -195,8 +192,7 @@ def _move_split(self, upstream_list, share: float, capacity_factor: float = 1.0,
volume_in = (routable_volume * share) / n_self

env = self.model.environment
capacity_scale = env.get_shock_scale(shock_param) if shock_param else 0.0
effective_factor = 1.0 + capacity_scale * (capacity_factor - 1.0)
effective_factor = env.get_effective_value(param_name) if param_name else 1.0
effective_capacity = self.capacity * effective_factor

self.quantity_available = min(volume_in, effective_capacity)
Expand All @@ -208,8 +204,7 @@ def _move_split(self, upstream_list, share: float, capacity_factor: float = 1.0,
upstream_price = (total_value / total_volume) if total_volume > 0 else 0.0

if self.quantity_available > 0:
energy_scale = env.get_shock_scale("energy_price_factor")
energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0)
energy_factor = env.get_effective_value("energy_price_factor")
effective_costs = self.fixed_costs * energy_factor
freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin)
self.unit_price = upstream_price + freight_fee
Expand Down Expand Up @@ -249,8 +244,7 @@ def _step_sa_santos(self):
self._move_split(
self.model.wholesalers,
share=self.scenario.santos_share,
capacity_factor=self.scenario.port_capacity_santos,
shock_param="port_capacity_santos",
param_name="port_capacity_santos",
exclude_arg=True,
exclude_usa=True,
)
Expand All @@ -263,8 +257,7 @@ def _step_sa_paranagua(self):
self._move_split(
self.model.wholesalers,
share=1.0 - self.scenario.santos_share,
capacity_factor=self.scenario.port_capacity_paranagua,
shock_param="port_capacity_paranagua",
param_name="port_capacity_paranagua",
exclude_arg=True,
exclude_usa=True,
)
Expand Down Expand Up @@ -322,8 +315,7 @@ def _step_sea_arg(self):
upstream_price = total_value / total_arg

if self.quantity_available > 0:
energy_scale = self.model.environment.get_shock_scale("energy_price_factor")
energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0)
energy_factor = self.model.environment.get_effective_value("energy_price_factor")
effective_costs = self.fixed_costs * energy_factor
freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin)
self.unit_price = upstream_price + freight_fee
Expand Down Expand Up @@ -372,8 +364,7 @@ def _step_sea_usa(self):
upstream_price = total_value / total_usa

if self.quantity_available > 0:
energy_scale = self.model.environment.get_shock_scale("energy_price_factor")
energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0)
energy_factor = self.model.environment.get_effective_value("energy_price_factor")
effective_costs = self.fixed_costs * energy_factor
freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin)
self.unit_price = upstream_price + freight_fee
Expand All @@ -391,7 +382,7 @@ def _step_eu_rtm(self):
+ self.model.sea_lane_arg.filter(lambda a: a.active)
+ self.model.sea_lane_usa.filter(lambda a: a.active)
)
self._move(combined, capacity_factor=self.scenario.port_capacity_rotterdam, shock_param="port_capacity_rotterdam")
self._move(combined, param_name="port_capacity_rotterdam")


def _step_eu_ham(self):
Expand All @@ -402,7 +393,6 @@ def _step_eu_ham(self):
"""
self._move(
self.model.sea_lane_paranagua,
capacity_factor=self.scenario.port_capacity_hamburg,
shock_param="port_capacity_hamburg",
param_name="port_capacity_hamburg",
)

Binary file removed src/provider_simenv/data/output/price_curves.png
Binary file not shown.
Binary file removed src/provider_simenv/data/output/price_curves_2.png
Binary file not shown.
Binary file removed src/provider_simenv/data/output/volume_flow.png
Binary file not shown.
54 changes: 42 additions & 12 deletions src/provider_simenv/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
total_soja_supply: sum of quantity_available across BRA + USA farmers
transport_utilisation: average utilisation of all transport agents
"""

from __future__ import annotations
from typing import TYPE_CHECKING
from Melodie import Environment

if TYPE_CHECKING:
from event_tracker import EventTracker


# maps scenario param name -> (onset_field, end_field) on SupplyChainScenario
_PARAM_TIMING_FIELDS: list[tuple[str, str, str]] = [
Expand Down Expand Up @@ -58,6 +62,10 @@ class SupplyChainEnvironment(Environment):
# step
current_step: int = 0

# conditional-event tracker
# set in Model.run()/run_stepwise() for PDL runs
_tracker: EventTracker | None = None

def setup(self):
"""
Initialise environment state form the scenario parameters.
Expand All @@ -75,25 +83,34 @@ def setup(self):
param: 0.0 for param, _, _ in _PARAM_TIMING_FIELDS
}


def update_shock_scales(self, period: int):
"""
update per-parameter shock activation scales for the given day.

Each parameter has its own onset and end day read from the scenario.
A parameter's scale is 1.0 (fully active) when onset <= period < end,
and 0.0 (inactive) otherwise. With shock_ramp_steps = 0 (PDL default)
the transition is instantaneous.
Two modes:
- Tracker mode (PDL with conditions): EventTracker evaluate conditions and durations at runtime.
- Static mode (no PDL, fallback): onset/end read from scenario fields
"""
for param, onset_field, end_field in _PARAM_TIMING_FIELDS:
onset = getattr(self.scenario, onset_field)
end = getattr(self.scenario, end_field)
value = getattr(self.scenario, param)
has_shock = value != 1.0
self.shock_scales[param] = (1.0 if has_shock and onset <= period < end else 0.0)
if self._tracker is not None:
self._tracker.step(period)
for param, _, _ in _PARAM_TIMING_FIELDS:
self.shock_scales[param] = self._tracker.get_shock_scale(param)
else:
for param, onset_field, end_field in _PARAM_TIMING_FIELDS:
onset = getattr(self.scenario, onset_field)
end = getattr(self.scenario, end_field)
value = getattr(self.scenario, param)
has_shock = value != 1.0
self.shock_scales[param] = (1.0 if has_shock and onset <= period < end else 0.0)

self.shock_scale = max(self.shock_scales.values(), default=0.0)

# drought severity: use racker value if available
bra_scale = self.shock_scales.get("farm_capacity_bra", 0.0)
bra_value = self.get_effective_value("farm_capacity_bra")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit bra values should be made dynamic in the future

self.drought_severity = (
self.shock_scales["farm_capacity_bra"] * (1.0 - self.scenario.farm_capacity_bra)
bra_scale * (1.0 - bra_value)
)


Expand All @@ -103,6 +120,19 @@ def get_shock_scale(self, param: str) -> float:
"""
return self.shock_scales.get(param, 0.0)


def get_effective_value(self, param: str) -> float:
"""
Return the effective value for this step.

Tracker mode: aggregated from currently active events only.
Static mode: reads fixed value from the scenario.
"""
if self._tracker is not None:
return self._tracker.get_param_value(param)
return getattr(self.scenario, param, 1.0)


def step(self):
"""
Aggregate agent outputs into macro indicators
Expand Down
Loading