diff --git a/pyproject.toml b/pyproject.toml index d01a326..4c2f4fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dev = [ "ruff>=0.15.4", "tox-uv>=1.33.0", "tox>=4.47.0", + "pytest>=8.0", "provider-simenv[db]", ] diff --git a/src/provider_simenv/agents/farmer.py b/src/provider_simenv/agents/farmer.py index 3a384b0..63c8196 100644 --- a/src/provider_simenv/agents/farmer.py +++ b/src/provider_simenv/agents/farmer.py @@ -168,14 +168,12 @@ def _step_bra(self): raises per-unit cost, which raises unit_price automatically. """ env = self.model.environment - farm_scale = env.get_shock_scale("farm_capacity_bra") - farm_capacity = 1.0 + farm_scale * (self.scenario.farm_capacity_bra - 1.0) + farm_capacity = env.get_effective_value("farm_capacity_bra") self.quantity_available = self.base_yield * farm_capacity if self.quantity_available > 0: # fertilizer price factor raises effective fixed costs this step - fertilizer_scale = env.get_shock_scale("fertilizer_price_factor") - fertilizer_factor = 1.0 + fertilizer_scale * (self.scenario.fertilizer_price_factor - 1.0) + fertilizer_factor = env.get_effective_value("fertilizer_price_factor") effective_costs = self.fixed_costs * fertilizer_factor self.unit_price = (effective_costs / self.quantity_available) * (1.0 + self.margin) else: @@ -222,8 +220,7 @@ def _step_arg(self): farm_capacity_arg allows ARG-specific shocks to be modelled independently. Defaults to 1.0 = always unshocked. """ - arg_scale = self.model.environment.get_shock_scale("farm_capacity_arg") - farm_capacity = 1.0 + arg_scale * (self.scenario.farm_capacity_arg - 1.0) + farm_capacity = self.model.environment.get_effective_value("farm_capacity_arg") self.quantity_available = self.base_yield * farm_capacity if self.quantity_available > 0: diff --git a/src/provider_simenv/agents/process.py b/src/provider_simenv/agents/process.py index 583d285..6cfe7d1 100644 --- a/src/provider_simenv/agents/process.py +++ b/src/provider_simenv/agents/process.py @@ -71,13 +71,13 @@ def step(self): # Shared helper: receive from upstream list, convert, compute price # ------------------------------------------------------------------ - def _process(self, upstream_list, peer_list, capacity_factor: float = 1.0, shock_param: str = ""): + def _process(self, upstream_list, peer_list, param_name: str = ""): """ Pull an equal share of upstream output, apply conversion_ratio, and compute unit_price accounting for yield loss. - capacity_factor: optional multiplier on output (used to apply oil_mill_capacity / feed_mill_capacity). - models indirect capaciy reduction from soja shortage. + param_name: scenario param whose effective value scales output. + Models indirect capacity reduction from soja shortage. For every 1 unit of output, (1 / conversion_ratio) input units were consumed, so the input cost per output unit is: @@ -95,8 +95,7 @@ def _process(self, upstream_list, peer_list, capacity_factor: float = 1.0, shock self.unit_price = 0.0 return - capacity_scale = self.model.environment.get_shock_scale(shock_param) if shock_param else 0.0 - effective_factor =1.0 + capacity_scale * (capacity_factor - 1.0) + effective_factor = self.model.environment.get_effective_value(param_name) if param_name else 1.0 total_input = sum(a.quantity_available for a in active_upstream) @@ -136,8 +135,7 @@ def _step_processor(self): self._process( upstream_list=combined_eu, peer_list=self.model.processors, - capacity_factor=self.scenario.oil_mill_capacity, - shock_param="oil_mill_capacity", + param_name="oil_mill_capacity", ) def _step_feed_manufacturer(self): @@ -145,6 +143,5 @@ def _step_feed_manufacturer(self): self._process( upstream_list=self.model.processors, peer_list=self.model.feed_manufacturers, - capacity_factor=self.scenario.feed_mill_capacity, - shock_param="feed_mill_capacity", + param_name="feed_mill_capacity", ) diff --git a/src/provider_simenv/agents/transport.py b/src/provider_simenv/agents/transport.py index adcd0dc..af3363b 100644 --- a/src/provider_simenv/agents/transport.py +++ b/src/provider_simenv/agents/transport.py @@ -109,13 +109,12 @@ def step(self): # cap at capacity, compute all-in unit_price (commodity + freight) # ------------------------------------------------------------------ - def _move(self, upstream, capacity_factor: float = 1.0, shock_param: str = ""): + def _move(self, upstream, param_name: str = ""): """ Pull an equal share of upstream output, ca at own capacity, and compute the all-in price passed to the next chain node. - capacity_factor: optional multiplier on self.capacity (used to - apply port_capacity_sa for SA land transport). + param_name: scenario param whose effective value scales this agent's capacity, via env.get_effective_value() """ margin = self.scenario.margin_transport @@ -136,8 +135,7 @@ def _move(self, upstream, capacity_factor: float = 1.0, shock_param: str = ""): volume_in = total_volume / n_self env = self.model.environment - capacity_scale = env.get_shock_scale(shock_param) if shock_param else 0.0 - effective_factor = 1.0 + capacity_scale * (capacity_factor - 1.0) + effective_factor = env.get_effective_value(param_name) if param_name else 1.0 # effective capacity after applying port capacity shock effective_capacity = self.capacity * effective_factor @@ -153,8 +151,7 @@ def _move(self, upstream, capacity_factor: float = 1.0, shock_param: str = ""): # price = commodity price + freight fee per unit # energy price factor raises transport operation costs if self.quantity_available > 0: - energy_scale = env.get_shock_scale("energy_price_factor") - energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0) + energy_factor = env.get_effective_value("energy_price_factor") effective_costs = self.fixed_costs * energy_factor freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin) self.unit_price = upstream_price + freight_fee @@ -162,13 +159,13 @@ def _move(self, upstream, capacity_factor: float = 1.0, shock_param: str = ""): self.unit_price = 0.0 - def _move_split(self, upstream_list, share: float, capacity_factor: float = 1.0, shock_param: str = "", exclude_arg=False, exclude_usa=False): + def _move_split(self, upstream_list, share: float, param_name: str = "", exclude_arg=False, exclude_usa=False): """ Like _move, but routes only share fraction of total upstream volume through this port. Used to split wholesaler output between Santos and Paranagua. :param share: fraction of total wholesaler output for this port (e.g. 0.7 for Santos, 0.3 for Paranagua). - :param capacity_factor: port specific capacity shock param (KG type 2) + :param param_name: scenario param whose effective value scales this agent's capacity. """ margin = self.scenario.margin_transport active_upstream = upstream_list.filter(lambda a: a.active) @@ -195,8 +192,7 @@ def _move_split(self, upstream_list, share: float, capacity_factor: float = 1.0, volume_in = (routable_volume * share) / n_self env = self.model.environment - capacity_scale = env.get_shock_scale(shock_param) if shock_param else 0.0 - effective_factor = 1.0 + capacity_scale * (capacity_factor - 1.0) + effective_factor = env.get_effective_value(param_name) if param_name else 1.0 effective_capacity = self.capacity * effective_factor self.quantity_available = min(volume_in, effective_capacity) @@ -208,8 +204,7 @@ def _move_split(self, upstream_list, share: float, capacity_factor: float = 1.0, upstream_price = (total_value / total_volume) if total_volume > 0 else 0.0 if self.quantity_available > 0: - energy_scale = env.get_shock_scale("energy_price_factor") - energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0) + energy_factor = env.get_effective_value("energy_price_factor") effective_costs = self.fixed_costs * energy_factor freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin) self.unit_price = upstream_price + freight_fee @@ -249,8 +244,7 @@ def _step_sa_santos(self): self._move_split( self.model.wholesalers, share=self.scenario.santos_share, - capacity_factor=self.scenario.port_capacity_santos, - shock_param="port_capacity_santos", + param_name="port_capacity_santos", exclude_arg=True, exclude_usa=True, ) @@ -263,8 +257,7 @@ def _step_sa_paranagua(self): self._move_split( self.model.wholesalers, share=1.0 - self.scenario.santos_share, - capacity_factor=self.scenario.port_capacity_paranagua, - shock_param="port_capacity_paranagua", + param_name="port_capacity_paranagua", exclude_arg=True, exclude_usa=True, ) @@ -322,8 +315,7 @@ def _step_sea_arg(self): upstream_price = total_value / total_arg if self.quantity_available > 0: - energy_scale = self.model.environment.get_shock_scale("energy_price_factor") - energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0) + energy_factor = self.model.environment.get_effective_value("energy_price_factor") effective_costs = self.fixed_costs * energy_factor freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin) self.unit_price = upstream_price + freight_fee @@ -372,8 +364,7 @@ def _step_sea_usa(self): upstream_price = total_value / total_usa if self.quantity_available > 0: - energy_scale = self.model.environment.get_shock_scale("energy_price_factor") - energy_factor = 1.0 + energy_scale * (self.scenario.energy_price_factor - 1.0) + energy_factor = self.model.environment.get_effective_value("energy_price_factor") effective_costs = self.fixed_costs * energy_factor freight_fee = (effective_costs / self.quantity_available) * (1.0 + margin) self.unit_price = upstream_price + freight_fee @@ -391,7 +382,7 @@ def _step_eu_rtm(self): + self.model.sea_lane_arg.filter(lambda a: a.active) + self.model.sea_lane_usa.filter(lambda a: a.active) ) - self._move(combined, capacity_factor=self.scenario.port_capacity_rotterdam, shock_param="port_capacity_rotterdam") + self._move(combined, param_name="port_capacity_rotterdam") def _step_eu_ham(self): @@ -402,7 +393,6 @@ def _step_eu_ham(self): """ self._move( self.model.sea_lane_paranagua, - capacity_factor=self.scenario.port_capacity_hamburg, - shock_param="port_capacity_hamburg", + param_name="port_capacity_hamburg", ) diff --git a/src/provider_simenv/data/output/price_curves.png b/src/provider_simenv/data/output/price_curves.png deleted file mode 100644 index 3f64727..0000000 Binary files a/src/provider_simenv/data/output/price_curves.png and /dev/null differ diff --git a/src/provider_simenv/data/output/price_curves_2.png b/src/provider_simenv/data/output/price_curves_2.png deleted file mode 100644 index 9acff33..0000000 Binary files a/src/provider_simenv/data/output/price_curves_2.png and /dev/null differ diff --git a/src/provider_simenv/data/output/volume_flow.png b/src/provider_simenv/data/output/volume_flow.png deleted file mode 100644 index 5200640..0000000 Binary files a/src/provider_simenv/data/output/volume_flow.png and /dev/null differ diff --git a/src/provider_simenv/environment.py b/src/provider_simenv/environment.py index c994bce..ae3211e 100644 --- a/src/provider_simenv/environment.py +++ b/src/provider_simenv/environment.py @@ -12,9 +12,13 @@ total_soja_supply: sum of quantity_available across BRA + USA farmers transport_utilisation: average utilisation of all transport agents """ - +from __future__ import annotations +from typing import TYPE_CHECKING from Melodie import Environment +if TYPE_CHECKING: + from event_tracker import EventTracker + # maps scenario param name -> (onset_field, end_field) on SupplyChainScenario _PARAM_TIMING_FIELDS: list[tuple[str, str, str]] = [ @@ -58,6 +62,10 @@ class SupplyChainEnvironment(Environment): # step current_step: int = 0 + # conditional-event tracker + # set in Model.run()/run_stepwise() for PDL runs + _tracker: EventTracker | None = None + def setup(self): """ Initialise environment state form the scenario parameters. @@ -75,25 +83,34 @@ def setup(self): param: 0.0 for param, _, _ in _PARAM_TIMING_FIELDS } + def update_shock_scales(self, period: int): """ update per-parameter shock activation scales for the given day. - Each parameter has its own onset and end day read from the scenario. - A parameter's scale is 1.0 (fully active) when onset <= period < end, - and 0.0 (inactive) otherwise. With shock_ramp_steps = 0 (PDL default) - the transition is instantaneous. + Two modes: + - Tracker mode (PDL with conditions): EventTracker evaluate conditions and durations at runtime. + - Static mode (no PDL, fallback): onset/end read from scenario fields """ - for param, onset_field, end_field in _PARAM_TIMING_FIELDS: - onset = getattr(self.scenario, onset_field) - end = getattr(self.scenario, end_field) - value = getattr(self.scenario, param) - has_shock = value != 1.0 - self.shock_scales[param] = (1.0 if has_shock and onset <= period < end else 0.0) + if self._tracker is not None: + self._tracker.step(period) + for param, _, _ in _PARAM_TIMING_FIELDS: + self.shock_scales[param] = self._tracker.get_shock_scale(param) + else: + for param, onset_field, end_field in _PARAM_TIMING_FIELDS: + onset = getattr(self.scenario, onset_field) + end = getattr(self.scenario, end_field) + value = getattr(self.scenario, param) + has_shock = value != 1.0 + self.shock_scales[param] = (1.0 if has_shock and onset <= period < end else 0.0) self.shock_scale = max(self.shock_scales.values(), default=0.0) + + # drought severity: use racker value if available + bra_scale = self.shock_scales.get("farm_capacity_bra", 0.0) + bra_value = self.get_effective_value("farm_capacity_bra") self.drought_severity = ( - self.shock_scales["farm_capacity_bra"] * (1.0 - self.scenario.farm_capacity_bra) + bra_scale * (1.0 - bra_value) ) @@ -103,6 +120,19 @@ def get_shock_scale(self, param: str) -> float: """ return self.shock_scales.get(param, 0.0) + + def get_effective_value(self, param: str) -> float: + """ + Return the effective value for this step. + + Tracker mode: aggregated from currently active events only. + Static mode: reads fixed value from the scenario. + """ + if self._tracker is not None: + return self._tracker.get_param_value(param) + return getattr(self.scenario, param, 1.0) + + def step(self): """ Aggregate agent outputs into macro indicators diff --git a/src/provider_simenv/event_tracker.py b/src/provider_simenv/event_tracker.py new file mode 100644 index 0000000..06d854a --- /dev/null +++ b/src/provider_simenv/event_tracker.py @@ -0,0 +1,256 @@ +""" +Event state tracker for conditional PDL events. + +Cascade timing system precomputes static onset/end windows per param. +The event tracker does dynamic condition evaluation: events only activate when +their cascade day is reached and the pdl condition is satisfied.("brazil_drought.active") +""" + +from __future__ import annotations +from dataclasses import dataclass + +@dataclass(frozen=True) +class EventDef: + """ + Single PDL event definition, pre-mapped to simenv params. + Events without a simenv param (param: None) are also tracked. + Other events may depend on them. + """ + id: str + param: str | None # simenv param name, None if unmapped + value: float | None # converted impact value + duration: int # duration (days) + condition: str # condition string, "" = unconditional + impact_field: str # supply || price + + +@dataclass(frozen=True) +class TimelineEntry: + """ + One cascade timeline trigger point. + """ + at_day: int + event_id: str + + +@dataclass +class ActiveEvent: + """ + Runtime state for a currently active event. + """ + activated_at: int + event_def: EventDef + + +# --- Aggregation constants --- +_CAPACITY_PARAMS = { + "farm_capacity_bra", "farm_capacity_arg", + "port_capacity_santos", "port_capacity_paranagua", + "port_capacity_rotterdam", "port_capacity_hamburg", + "oil_mill_capacity", "feed_mill_capacity", +} +_PRICE_PARAMS = {"energy_price_factor", "fertilizer_price_factor"} + + +# --- Event tracker --- +class EventTracker: + """ + Tracks PDL event activation state and derives per-parameter shock values. + + Rules: + 1. Event becomes eligible when its cascade timeline is reached. + 2. Eligible event activates on the first step where its condition is satisfied. + Unconditional events activate immediately. + 3. Active event expires after its duration elapses. Events with duration = 0 never expire (permanent). + 4. Once expired, an event does not reactivate. + """ + + def __init__(self, events: list[dict], timeline: list[dict]) -> None: + # event definition indexed by id + self._events: dict[str, EventDef] = {} + for e in events: + self._events[e["id"]] = EventDef( + id=e["id"], + param=e.get("param"), + value=e.get("value"), + duration=e.get("duration", 0), + condition=e.get("condition", ""), + impact_field=e.get("impact_field", "supply"), + ) + + # timeline sorted by day + self._timeline: list[TimelineEntry] = sorted( + [TimelineEntry(at_day=t["at_day"], event_id=t["event_id"]) for t in timeline], + key=lambda e: e.at_day, + ) + + self._eligible: set[str] = set() # cascade day reached + self._active: dict[str, ActiveEvent] = {} # currently firing + self._expired: set[str] = set() # fired and finished + + self._current_day: int = -1 + + # derived shock state (re-calc every step) + self._shock_scales: dict[str, float] = {} + self._param_values: dict[str, float] = {} + + + def step(self, day: int) -> None: + """ + Advance tracker to the given simulation day. + + Order: + mark eligible -> expire -> activate -> recompute scales + """ + self._current_day = day + + # mark newly eligible events + for entry in self._timeline: + if entry.at_day <= day: + eid = entry.event_id + if eid not in self._eligible and eid not in self._expired: + self._eligible.add(eid) + + # expire active events, which have past their duration + newly_expired: list[str] = [] + for eid, active in self._active.items(): + dur = active.event_def.duration + if dur > 0 and day >= active.activated_at + dur: + newly_expired.append(eid) + for eid in newly_expired: + del self._active[eid] + self._expired.add(eid) + + # activate eligible events, which have their condition met + # two passes (unconditional first, then conditional) keep activation order + # deterministic regardless of set-iteration order. + # NOTE: under the current condition language this doesn't enable same-day dependency resolution + # '.active' returns 0 (false) on an event's own activation day, so a conditional whose dependency + # activates the same day always fires one day later. + pending = [] + for eid in list(self._eligible): + if eid in self._active or eid in self._expired: + continue + edef = self._events.get(eid) + if edef is None: + continue + if not edef.condition: + self._active[eid] = ActiveEvent( + activated_at=day, event_def=edef, + ) + else: + pending.append(eid) + + for eid in pending: + if eid in self._active or eid in self._expired: + continue + edef = self._events.get(eid) + if edef is None: + continue + if self._evaluate_condition(edef.condition, day): + self._active[eid] = ActiveEvent( + activated_at=day, event_def=edef, + ) + + self._recompute_scales() + + + def get_shock_scale(self, param: str) -> float: + """ + 1.0 if param has any active shock, 0.0 otherwise. + """ + return self._shock_scales.get(param, 0.0) + + + def get_param_value(self, param: str) -> float: + """ + Aggregated shock value from active events targeting this param. + Returns 1.0 if no active events targeting this param. + """ + return self._param_values.get(param, 1.0) + + + def is_event_active(self, event_id: str) -> int: + """ + Days the event has been continuously active. 0 if inactive. + """ + active = self._active.get(event_id) + if active is None: + return 0 + return self._current_day - active.activated_at + + + def get_active_event_ids(self) -> set[str]: + """ + Set of currently active event IDs. + """ + return set(self._active.keys()) + + + # --- Condition Evaluator --- + def _evaluate_condition(self, condition: str, day: int) -> bool: + """ + Evaluate a PDL condition string against current tracker state. + """ + condition = condition.strip() + if not condition: + return True + + # OR + if " OR " in condition: + return any( + self._evaluate_condition(p, day) for p in condition.split(" OR ") + ) + + # AND + if " AND " in condition: + return all( + self._evaluate_condition(p, day) for p in condition.split(" AND ") + ) + + # "event_id.duration > Xd" + if ".duration" in condition and ">" in condition: + event_id = condition.split(".duration")[0].strip() + threshold_str = condition.split(">")[1].strip().rstrip("d").strip() + try: + threshold = int(threshold_str) + except ValueError: + return False + return self.is_event_active(event_id) > threshold + + # "event_id.active" + if condition.endswith(".active"): + event_id = condition[:-7] # len(".active") == 7 + return self.is_event_active(event_id) + + # Unknown format + return False + + + # --- shock scale computation --- + def _recompute_scales(self) -> None: + """ + Derive per param shock scales and values from active events. + + Multiple active events on the same param: + capacity params -> min(values) + price params -> max(values) + """ + candidates: dict[str, list[float]] = {} + + for active in self._active.values(): + edef = active.event_def + if edef.param is not None and edef.value is not None: + candidates.setdefault(edef.param, []).append(edef.value) + + self._shock_scales.clear() + self._param_values.clear() + + for param, values in candidates.items(): + self._shock_scales[param] = 1.0 + if param in _CAPACITY_PARAMS: + self._param_values[param] = min(values) + elif param in _PRICE_PARAMS: + self._param_values[param] = max(values) + else: + self._param_values[param] = values[0] diff --git a/src/provider_simenv/main.py b/src/provider_simenv/main.py index fe18a11..522028b 100644 --- a/src/provider_simenv/main.py +++ b/src/provider_simenv/main.py @@ -94,6 +94,9 @@ def csv_to_sqlite(output_dir: str, db_name: str = "provider-simenv.sqlite") -> N help=( "Optional PostgreSQL SQLAlchemy connection string for tick writes, " "e.g. postgresql+psycopg2://user:pass@host:5432/dbname" + ), + ) + parser.add_argument( "--cascade", type=str, default=None, @@ -102,6 +105,7 @@ def csv_to_sqlite(output_dir: str, db_name: str = "provider-simenv.sqlite") -> N "PDL cascade id to use for timing. Defaults to the first cascade in the PDL file." ), ) + args = parser.parse_args() if args.postgres_url: @@ -174,6 +178,14 @@ def csv_to_sqlite(output_dir: str, db_name: str = "provider-simenv.sqlite") -> N df.to_csv(csv_path, index=False) print(f"[pdl_loader] CSV updated (baseline + 1 PDL scenario). \n") + # Build event registry for conditional runtime evaluation + event_registry = loader.to_event_registry(args.cascade) + n_total = len(event_registry["events"]) + n_mapped = sum(1 for e in event_registry["events"] if e["param"] is not None) + n_conditional = sum(1 for e in event_registry["events"] if e["condition"]) + print(f"[event_tracker] Registry: {n_total} events " + f"({n_mapped} mapped, {n_conditional} conditional)") + config = Config( project_name= "provider-simenv", @@ -187,8 +199,16 @@ def csv_to_sqlite(output_dir: str, db_name: str = "provider-simenv.sqlite") -> N scenario_cls=SupplyChainScenario, model_cls=SupplyChainModel, ) + + # attach event registry to model class so setup can inject the tracker into the env + if args.pdl: + SupplyChainModel._event_registry = event_registry + simulator.run() + if hasattr(SupplyChainModel, "_event_registry"): + del SupplyChainModel._event_registry + # post-process: merge CSVs -> SQLite for visualize_sql.py print("\n[main] Converting CSVs to SQLite...") csv_to_sqlite(output_folder) diff --git a/src/provider_simenv/model.py b/src/provider_simenv/model.py index 28a84f1..56220e9 100644 --- a/src/provider_simenv/model.py +++ b/src/provider_simenv/model.py @@ -28,6 +28,7 @@ """ import fontTools.misc.arrayTools from Melodie import Model +from .event_tracker import EventTracker from .agents import ( Farmer, Trader, Transport, Process, ROLE_BRA, ROLE_ARG, ROLE_USA, ROLE_EU, @@ -122,6 +123,7 @@ def setup(self): self._setup_with_role(self.feed_manufacturers, self.scenario.n_feed_manufacturers, ROLE_FEED_MANUFACTURER) self._prev_shock_scales: dict[str, float] = {} + self._prev_active_events: set[str] = set() self._heartbeat_interval: int = 30 @@ -152,7 +154,7 @@ def _log_event(self, t: int, direction: str, param: str, snap: dict): """ layer 1: emit one line per shock state transition """ - value = getattr(self.scenario, param) + value = self.environment.get_effective_value(param) if direction == "ON": pct = (value - 1.0) * 100 sign = "+" if pct > 0 else "" @@ -196,6 +198,25 @@ def _do_step(self, t: int) -> None: """ self.environment.update_shock_scales(t) + # tracker event logging + tracker = self.environment._tracker + if tracker is not None: + current_events = tracker.get_active_event_ids() + activated = current_events - self._prev_active_events + expired = self._prev_active_events - current_events + + for eid in sorted(activated): + edef = tracker._events.get(eid) + reason = f"condition: {edef.condition}" if edef and edef.condition else "unconditional" + param_str = f" -> {edef.param}={edef.value:.2f}" if edef and edef.param else "" + dur_str = f", expires day {t + edef.duration}" if edef and edef.duration > 0 else ", permanent" + print(f" © DAY {t:03d} EVENT ON {eid:<35s} ({reason}{param_str}{dur_str})") + + for eid in sorted(expired): + print(f" ® DAY {t:03d} EVENT OFF {eid:<35s} (duration elapsed)") + + self._prev_active_events = current_events + # Production self.bra_farmers.method_foreach('step', ()) self.arg_farmers.method_foreach('step', ()) @@ -244,6 +265,21 @@ def _do_step(self, t: int) -> None: # Record snapshot self.data_collector.collect(t) + + def _init_event_tracker(self) -> None: + """ + Attach the EventTracker for PDL runs. No-op in static / non-PDL mode. + Shard by run() and run_stepwise(). + """ + if getattr(self.scenario, "id", 0) == 0: + return # baseline: no conditional events (no-shock) + registry = getattr(self.__class__, "_event_registry", None) + if registry is not None: + self.environment._tracker = EventTracker( + events=registry["events"], + timeline=registry["timeline"], + ) + def run(self): """ Main simulation loop. Melodie calls this after create() and setup(). @@ -251,7 +287,7 @@ def run(self): self.iterator(n): yields period 0..n-1, handles any visualiser updates per step agent_list.method_foreach(method_name, args): calls method_name on every agent in the list; args must be a tuple. """ - + self._init_event_tracker() for t in self.iterator(self.scenario.period_num): self._do_step(t) self._log_scenario_summary(self.scenario.id, self.scenario.period_num) @@ -273,6 +309,8 @@ def run_stepwise(self): id_scenario = getattr(self.scenario, "id", 0) tick_writer = TickWriter.from_config(PostgresDBConfig(), reset=(id_scenario == 0)) + self._init_event_tracker() + for t in range(self.scenario.period_num): self._do_step(t) tick_writer.write_tick(self, id_scenario=id_scenario, id_run=0, t=t) diff --git a/src/provider_simenv/pdl_loader.py b/src/provider_simenv/pdl_loader.py index ebc0fb7..ec6933b 100644 --- a/src/provider_simenv/pdl_loader.py +++ b/src/provider_simenv/pdl_loader.py @@ -251,5 +251,61 @@ def to_cascade_schedule(self, cascade_id: str | None = None) -> dict[str, dict[s for param, pairs in candidates.items() } + + def to_event_registry(self, cascade_id: str | None = None) -> dict: + """ + Export event definitions and cascade timeline for the EventTracker. + + All PDL events are included, events without a simenv param mapping get param=None / value=None + """ + events: list[dict] = [] + for event in (self._doc.get("events") or []): + eid = event.get("id", "") + trigger = event.get("trigger") or {} + target = trigger.get("target", "") + condition = trigger.get("condition", "") + impact = event.get("impact") or {} + + duration_raw = impact.get("duration") + duration = _parse_duration(duration_raw) if duration_raw else 0 + + # find the first mapped (target, field) + param = None + value = None + impact_field = "supply" + + for field in ("supply", "price"): + raw = impact.get(field) + if raw is None: + continue + mapped = _PDL_MAPPING.get((target, field)) + if mapped is not None: + pct = _parse_percent(str(raw)) + param = mapped + value = round(1.0 + pct / 100.0, 6) + impact_field = field + break # one param per event + + events.append({ + "id": eid, + "param": param, + "value": value, + "duration": duration, + "condition": condition, + "impact_field": impact_field, + }) + + # --- cascade timeline --- + cascade = self._get_cascade(cascade_id) + timeline: list[dict] = [] + for entry in (cascade.get("timeline") or []): + timeline.append({ + "at_day": _parse_duration(entry.get("at", "0d")), + "event_id": entry.get("event", ""), + }) + + return {"events": events, "timeline": timeline} + + def __repr__(self) -> str: return f"PDLLoader({self.path.name!r}, label={self.label!r})" diff --git a/src/provider_simenv/tick_writer.py b/src/provider_simenv/tick_writer.py index c7e08fe..f68d81c 100644 --- a/src/provider_simenv/tick_writer.py +++ b/src/provider_simenv/tick_writer.py @@ -48,7 +48,7 @@ class TickWriter: def __init__(self, engine) -> None: self.engine = engine - self.eabled = True + self.enabled = True self._reset_tables() @classmethod @@ -70,7 +70,7 @@ def from_config(cls, cfg) -> "TickWriter": print(f"[tick_writer] WARNING: could not connect to Postgres ({exc}).") writer = object.__new__(cls) writer.engine = None - writer.eabled = False + writer.enabled = False return writer diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8420d2f --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,8 @@ +import sys +from pathlib import Path + +# event_tracker and pdl_loader use bare imports +# -> package directory itself must be importable +SRC = Path(__file__).resolve().parent.parent / "src" / "provider_simenv" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) \ No newline at end of file diff --git a/tests/test_event_tracker.py b/tests/test_event_tracker.py new file mode 100644 index 0000000..9b1c802 --- /dev/null +++ b/tests/test_event_tracker.py @@ -0,0 +1,179 @@ +""" +Unit tests for EventTracker — the conditional-event engine (Issue #4). + +EventTracker must be stepped once per simulation day starting at 0; an event's +activated_at is fixed on the first step where it is eligible, so these tests +always step sequentially from day 0. +""" +from event_tracker import EventTracker + + +def ev(eid, *, param=None, value=None, duration=0, condition="", impact_field="supply"): + return { + "id": eid, "param": param, "value": value, + "duration": duration, "condition": condition, "impact_field": impact_field, + } + + +def tl(at_day, event_id): + return {"at_day": at_day, "event_id": event_id} + + +def step_through(tracker, last_day): + """Step the tracker day 0..last_day inclusive, as the model does.""" + for d in range(last_day + 1): + tracker.step(d) + + +# --- eligibility / unconditional activation --- + +def test_unconditional_event_activates_on_its_cascade_day(): + t = EventTracker([ev("a", param="farm_capacity_bra", value=0.6)], [tl(3, "a")]) + step_through(t, 2) + assert "a" not in t.get_active_event_ids() + t.step(3) + assert "a" in t.get_active_event_ids() + assert t.get_param_value("farm_capacity_bra") == 0.6 + assert t.get_shock_scale("farm_capacity_bra") == 1.0 + + +# --- conditional gating --- + +def test_conditional_activates_when_dependency_already_active(): + events = [ + ev("drought", param="farm_capacity_bra", value=0.6), + ev("export", param="port_capacity_santos", value=0.88, condition="drought.active"), + ] + t = EventTracker(events, [tl(0, "drought"), tl(5, "export")]) + step_through(t, 5) + assert "drought" in t.get_active_event_ids() + assert "export" in t.get_active_event_ids() # drought active since day 0 + assert t.get_param_value("port_capacity_santos") == 0.88 + + +def test_same_day_dependency_shifts_conditional_by_one_day(): + """Design decision (2026-06-02): a conditional whose dependency activates on + the SAME cascade day fires one day later, because `.active` is false on the + dependency's activation day.""" + events = [ + ev("a", param="farm_capacity_bra", value=0.6), + ev("b", param="port_capacity_santos", value=0.8, condition="a.active"), + ] + t = EventTracker(events, [tl(0, "a"), tl(0, "b")]) + t.step(0) + assert "a" in t.get_active_event_ids() + assert "b" not in t.get_active_event_ids() # NOT same day + assert t.is_event_active("a") == 0 # 0 days on activation day + t.step(1) + assert "b" in t.get_active_event_ids() # fires the next day + assert t.is_event_active("a") == 1 + assert t.is_event_active("b") == 0 + + +def test_duration_threshold_condition(): + events = [ + ev("a", param="farm_capacity_bra", value=0.5), + ev("b", param="port_capacity_santos", value=0.5, condition="a.duration > 2d"), + ] + t = EventTracker(events, [tl(0, "a"), tl(0, "b")]) + step_through(t, 2) + assert "b" not in t.get_active_event_ids() # is_active(a)=2, 2 > 2 is False + t.step(3) + assert "b" in t.get_active_event_ids() # 3 > 2 + + +def test_and_condition_requires_both(): + events = [ + ev("a"), ev("c"), # unmapped (param=None) — still usable as conditions + ev("b", param="farm_capacity_bra", value=0.5, + condition="a.active AND c.active"), + ] + t = EventTracker(events, [tl(0, "a"), tl(5, "c"), tl(0, "b")]) + step_through(t, 5) + assert {"a", "c"} <= t.get_active_event_ids() + assert "b" not in t.get_active_event_ids() # c only just activated at day 5 + t.step(6) + assert "b" in t.get_active_event_ids() + + +def test_or_condition_requires_either(): + events = [ + ev("a"), + ev("b", param="farm_capacity_bra", value=0.5, condition="a.active OR z.active"), + ] + t = EventTracker(events, [tl(0, "a"), tl(0, "b")]) + t.step(0) + assert "b" not in t.get_active_event_ids() # a just activated, z never defined + t.step(1) + assert "b" in t.get_active_event_ids() # a.active -> OR true + + +# --- duration / expiry --- + +def test_event_expires_after_duration(): + t = EventTracker([ev("a", param="farm_capacity_bra", value=0.5, duration=5)], [tl(0, "a")]) + step_through(t, 4) + assert "a" in t.get_active_event_ids() + t.step(5) # day >= activated_at + duration + assert "a" not in t.get_active_event_ids() + assert t.get_param_value("farm_capacity_bra") == 1.0 + + +def test_zero_duration_is_permanent(): + t = EventTracker([ev("a", param="farm_capacity_bra", value=0.5, duration=0)], [tl(0, "a")]) + step_through(t, 500) + assert "a" in t.get_active_event_ids() + + +def test_expired_event_does_not_reactivate(): + t = EventTracker([ev("a", param="farm_capacity_bra", value=0.5, duration=3)], [tl(0, "a")]) + for d in range(20): + t.step(d) + if d >= 3: + assert "a" not in t.get_active_event_ids() + + +# --- aggregation --- + +def test_capacity_params_aggregate_with_min(): + events = [ + ev("a", param="port_capacity_santos", value=0.8), + ev("b", param="port_capacity_santos", value=0.6), + ] + t = EventTracker(events, [tl(0, "a"), tl(0, "b")]) + t.step(0) + assert t.get_param_value("port_capacity_santos") == 0.6 + assert t.get_shock_scale("port_capacity_santos") == 1.0 + + +def test_price_params_aggregate_with_max(): + events = [ + ev("a", param="energy_price_factor", value=1.5), + ev("b", param="energy_price_factor", value=3.0), + ] + t = EventTracker(events, [tl(0, "a"), tl(0, "b")]) + t.step(0) + assert t.get_param_value("energy_price_factor") == 3.0 + + +# --- defaults / unmapped events --- + +def test_unknown_param_defaults_to_baseline(): + t = EventTracker([], []) + t.step(0) + assert t.get_param_value("anything") == 1.0 + assert t.get_shock_scale("anything") == 0.0 + + +def test_unmapped_event_tracked_for_conditions_only(): + events = [ + ev("trigger"), # param=None -> produces no shock value + ev("dependent", param="farm_capacity_bra", value=0.7, condition="trigger.active"), + ] + t = EventTracker(events, [tl(0, "trigger"), tl(0, "dependent")]) + t.step(0) + assert "trigger" in t.get_active_event_ids() + assert t.get_param_value("farm_capacity_bra") == 1.0 # trigger has no param + t.step(1) + assert "dependent" in t.get_active_event_ids() # +1-day shift, then active + assert t.get_param_value("farm_capacity_bra") == 0.7 \ No newline at end of file diff --git a/tests/test_pdl_event_registry.py b/tests/test_pdl_event_registry.py new file mode 100644 index 0000000..d2f6cf5 --- /dev/null +++ b/tests/test_pdl_event_registry.py @@ -0,0 +1,27 @@ +""" +Validates PDLLoader.to_event_registry() against the real s1-soja.pdl.yaml. +""" +from pathlib import Path + +from pdl_loader import PDLLoader + +PDL_PATH = ( + Path(__file__).resolve().parent.parent + / "src" / "provider_simenv" / "scenarios" / "s1-soja.pdl.yaml" +) + + +def test_event_registry_counts(): + reg = PDLLoader(PDL_PATH).to_event_registry() # default cascade = soy_crisis_cascade + events = reg["events"] + assert len(events) == 18 + assert sum(1 for e in events if e["param"] is not None) == 8 # mapped + assert sum(1 for e in events if e["condition"]) == 15 # conditional + assert len(reg["timeline"]) == 13 + + +def test_argentina_supply_increase_is_mapped(): + reg = PDLLoader(PDL_PATH).to_event_registry() + arg = next(e for e in reg["events"] if e["id"] == "argentina_supply_increase") + assert arg["param"] == "farm_capacity_arg" # NOT unmapped (HTML brief is wrong) + assert arg["value"] == 1.1 # +10% -> 1.10 \ No newline at end of file