-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.py
More file actions
99 lines (86 loc) · 4.31 KB
/
Copy pathengine.py
File metadata and controls
99 lines (86 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from pathfinder import SpaceTimeNode, ReservationTable
from models import SimulationLevel
from typing import Optional
class SimulationEngine:
"""Translates space-time routes into turn-by-turn command strings."""
# :Initiate with fleet routes, level and reservation table
def __init__(self,
fleet_routes: dict[int, list[SpaceTimeNode]],
level: Optional[SimulationLevel] = None,
res_table: Optional[ReservationTable] = None) -> None:
"""Initializes the class object"""
self.fleet_routes: dict[int, list[SpaceTimeNode]] = fleet_routes
# Maps turn_number: list of movement strings
self.timeline: dict[int, list[str]] = {}
self.max_turn: int = 0
# :Track level and reservation data
self.level: SimulationLevel | None = level
self.res_table: ReservationTable | None = res_table
self._build_timeline()
def _build_timeline(self) -> None:
"""Interpolates route jumps and discrete per-turn actions."""
for drone_id, path in self.fleet_routes.items():
for i in range(1, len(path)):
prev_node: SpaceTimeNode = path[i - 1]
curr_node: SpaceTimeNode = path[i]
dt: int = curr_node.t - prev_node.t
if dt == 1:
# Filter out "Wait in Place" turns per subject rules
if curr_node.node_name != prev_node.node_name:
self._add_event(
curr_node.t,
f"D{drone_id}-{curr_node.node_name}"
)
elif dt == 2:
# Interpolate the 2-turn restricted transit
# curr_node.action_str holds
# "D<ID>-<edge_name>" from Phase 3
transit_event = curr_node.action_str
arrival_event = f"D{drone_id}-{curr_node.node_name}"
self._add_event(prev_node.t + 1, transit_event)
self._add_event(curr_node.t, arrival_event)
self.max_turn = max(self.max_turn, curr_node.t)
def _add_event(self, turn: int, event_str: str) -> None:
"""Safely appends an event to the timeline dictionary."""
if turn not in self.timeline:
self.timeline[turn] = []
self.timeline[turn].append(event_str)
# :Generate output
def generate_output(self, show_capacity: bool = False) -> list[str]:
"""Generates the exact formatted console output."""
output_lines: list[str] = []
for t in range(1, self.max_turn + 1):
if t in self.timeline and self.timeline[t]:
# Sort numerically by Drone ID (D1, D2, D3...)
# for clean reading
events: list[str] = sorted(
self.timeline[t],
key=lambda x: int(x.split("-")[0][1:])
)
line = " ".join(events)
# :Provided all stats
if show_capacity and self.level and self.res_table:
caps: list[str] = []
for n_name, z in self.level.nodes.items():
occ: int = self.res_table.get_node_occupancy(n_name, t)
if occ > 0 and z.max_capacity < 99999:
caps.append(f"Zone {n_name}: "
f"{occ}/{z.max_capacity} drones")
for e_name, e in self.level.edges.items():
occ = self.res_table.get_edge_occupancy(e_name, t)
if occ > 0:
caps.append(f"Connection {e_name}: {occ}/"
f"{e.max_capacity} capacity used")
if caps:
line += " | " + ", ".join(caps)
# :Output all stats
output_lines.append(line)
return output_lines
def display_metrics(self) -> None:
"""Prints secondary evaluation metrics for peer review."""
print("\n--- Simulation Metrics ---")
print(f"Total Simulation Turns : {self.max_turn}")
avg_turns: float = sum(
path[-1].t for path in self.fleet_routes.values()
) / len(self.fleet_routes)
print(f"Average Turns per Drone: {avg_turns:.2f}")