Skip to content

feat(run-lifecycle-mqtt): MQTT consumer + full run lifecycle pipeline#59

Open
dotjae wants to merge 6 commits into
mainfrom
feat/run-lifecycle-mqtt
Open

feat(run-lifecycle-mqtt): MQTT consumer + full run lifecycle pipeline#59
dotjae wants to merge 6 commits into
mainfrom
feat/run-lifecycle-mqtt

Conversation

@dotjae
Copy link
Copy Markdown
Collaborator

@dotjae dotjae commented May 23, 2026

Summary

  • MQTT consumer (realtime_engine/mqtt.py): subscribes to vehicle telemetry broker, drives run lifecycle FSM transitions in real time
  • Run lifecycle pipeline (runs/domain/): complete guards, actions, and transitions for all lifecycle states — including flatten/sync fixes for terminal transitions
  • Schedule engine cleanup (schedule_engine/tasks.py): ported publisher builders into the unified backend, removed legacy publisher/, scheduler/, and scripts/ top-level service directories
  • Celery beat (databus/celery.py): added build_vehicle_positions and build_trip_updates beat schedules (15 s cadence)
  • Fix (schedule_engine/fake_stop_times.py): handle dict-shaped run/progression entries to unblock dev simulation

dotjae and others added 6 commits May 18, 2026 18:54
UpdateRunViewSet
- Flatten serializer.validated_data["details"] into payload before
  process_event. Guards like is_*_authorized read actor_role at the
  top level of payload (consistent with the Celery path in
  realtime_engine/tasks.py), so without this every UI-driven
  cancel/interrupt/short-turn/run_rejected returned 422 with
  "actor_role '' is not authorized…".

Transitions
- INTERRUPT_RUN, SHORT_TURN_RUN, COMPLETE_RUN: add sync_lifecycle_state
  so Redis's run:{id} hash reflects the terminal state. Without this
  the simulator's RunBinder polled an "In Progress" value forever and
  the UI never recognized the terminal state.
- RUN_TRACKING_LOST: stop removing the run from runs:tracking — the
  set is scan_stale_runs's work queue, and removing on lost prevented
  the eventual RUN_TRACKING_EXPIRED transition.
- RUN_TRACKING_EXPIRED: add sync_lifecycle_state + remove_from_tracking_set
  so the run finally exits the queue when fully terminal.

Also includes RunHistoryView (read-only GET /api/runs/{id}/history/)
in views.py — drives the history panel in the simulator's Runs tab.

Note: committed against feat/run-lifecycle-mqtt which still has
unrelated WIP from another contributor (uncommitted) — only the two
files touched here are part of this fix.
Replaces the management-command mqtt_consumer with an in-worker
Celery bootstep at realtime_engine/mqtt.py. It subscribes to
transit/vehicle/+/{position,progression,occupancy}, updates Redis
on every ping, and emits lifecycle events (run_tracking_started,
run_started, complete_run, run_tracking_restored) based on the
current run state.

- backend/realtime_engine/mqtt.py: new MQTT bootstep
- backend/realtime_engine/management/commands/*: removed
  (mqtt_consumer + bootstrap_simulator_runs no longer needed)
- backend/databus/celery.py: register the bootstep on workers that
  set MQTT_CONSUMER_ENABLED=true
- backend/api/urls.py: add /api/runs/<uuid>/history/
- backend/Dockerfile, compose.dev.yml, compose.prod.yml: wire
  telemetry-broker + realtime-engine environment so the consumer
  attaches at startup
- backend/uv.lock: pick up paho-mqtt
- docs/content/processes/run-lifecycle.md, README, realtime_engine/README:
  document the new flow
…update related views and URLs

feat(run): add RunStateViewSet for retrieving current run lifecycle state
docs: add README for run lifecycle states
refactor(redis): change vehicle data key from 'data' to 'metadata' across scripts and tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants