diff --git a/PRPs/PRP-showcase-workspace-E1-persistence-backbone.md b/PRPs/PRP-showcase-workspace-E1-persistence-backbone.md new file mode 100644 index 00000000..729d6e62 --- /dev/null +++ b/PRPs/PRP-showcase-workspace-E1-persistence-backbone.md @@ -0,0 +1,666 @@ +name: "PRP — Showcase Workspace E1: Persistence Backbone (issue #390)" +description: | + +## Purpose + +Implement the Foundation epic of the showcase-workspace initiative (umbrella #389): +the demo slice gains its first persistence — a `showcase_workspace` table + Alembic +migration + additive Optional `preservation`/`workspace_name` fields on +`DemoRunRequest` + pipeline recording of every created object id into the workspace +row. Blocks epics #391 (presets), #392 (tags), #393 (restore/replay). + +## Core Principles + +1. **Context is King**: every reference below was verified against the live code on 2026-06-12. +2. **Validation Loops**: each level is executable as written. +3. **Information Dense**: patterns cite exact file:line. +4. **Progressive Success**: schema fields → model+migration → service → pipeline hook → tests. +5. **Global rules**: follow CLAUDE.md / AGENTS.md; all five CI gates must pass. + +--- + +## Goal + +A demo/showcase run started with `preservation="keep"` creates exactly one +`showcase_workspace` row that records the run configuration (seed, scenario, reset, +skip_seed, name) and — when the pipeline finishes — every object the run created +(winning/V2 registry run ids, alias, scenario plan ids, batch id, agent session id, +artifact paths, store/product grain, date window) plus a result summary. A run +without the new fields behaves **byte-identically to today** (no row, same events, +same responses). Legacy WS/HTTP clients keep working unchanged. + +**Deliverable** (all additive, backend-only — no frontend changes in E1): + +- `app/features/demo/models.py` — new `ShowcaseWorkspace` ORM model (first table owned by the demo slice). +- `alembic/versions/_create_showcase_workspace_table.py` — forward migration + clean downgrade. +- `alembic/env.py` — one added model-registration import. +- `app/features/demo/schemas.py` — `DemoRunRequest` gains `preservation` + `workspace_name`. +- `app/features/demo/workspace.py` — new module: create/finalize (+ get/list helpers for tests and E4). +- `app/features/demo/pipeline.py` — `DemoContext.workspace_id` field + create/finalize hooks in `run_pipeline`; `pipeline_complete.data` gains additive `workspace_id`. +- Tests: schema unit tests, model constraint/CRUD integration tests, workspace-service integration tests, pipeline unit tests, route passthrough tests. +- `docs/_base/API_CONTRACTS.md` — additive contract notes for the two request fields and the `workspace_id` summary key. + +**Success definition**: all Success Criteria below check off, the five CI gates are +green, and a manual `POST /demo/run` with `{"preservation": "keep", "workspace_name": +"e1-smoke"}` against a seeded local stack produces a `completed` workspace row whose +`created_objects` JSONB contains the run's real ids. + +## Why + +- The cleanup step deletes nothing (`app/features/demo/pipeline.py:2045` `step_cleanup` only closes the agent session and restores the `demo-production` alias), so showcase objects already persist — but unlabeled and unfindable. E1 gives that de-facto preservation explicit semantics and discoverability. +- Umbrella #389 decomposition: E1 is the Foundation; #391/#392/#393 all build on the table and the request fields added here. +- The only run memory today is a localStorage FIFO-5 in the frontend (`frontend/src/pages/showcase.tsx:166`) — server-side workspace rows are the prerequisite for restore/replay (E4). + +## What + +### User-visible behavior + +- `POST /demo/run` and the `WS /demo/stream` start frame accept two new **optional** fields: + - `preservation`: `"ephemeral"` (default — today's behavior, no row) or `"keep"` (create + finalize a workspace row). + - `workspace_name`: optional human label, `^[a-z0-9][a-z0-9\-_]*$`, ≤100 chars (same pattern as registry alias names, `app/features/registry/schemas.py:213`). Only allowed with `preservation="keep"` — supplying it with `"ephemeral"` is a 422. +- The final `pipeline_complete` event's `data` dict gains an additive `workspace_id` key (`null` on ephemeral runs). +- No new public endpoints in E1 (list/load is epic #393/E4). `workspace.py` ships `get_workspace`/`list_workspaces` helpers for tests and E4 reuse, unrouted. + +### Technical requirements + +- Workspace row is created (status `running`) before the first step executes and finalized (status `completed`/`failed` + collected ids) before `pipeline_complete` is yielded — including the mid-run-failure path, so a partial run still records what it created. +- Workspace DB writes are **warn-and-continue**: a DB failure must never break the demo pipeline (mirror the lifespan pattern at `app/main.py:62-71`). +- **No ForeignKeys** to `model_run` / `scenario_plan` / `batch_job` / `agent_session` — recorded ids are opaque soft references. A cross-slice FK would couple the demo slice's schema to four other slices and break independent deletion. This is a deliberate design decision; document it in the model docstring. +- The demo slice still never imports another feature slice (`app/features/demo/` imports only `app.core.*`, `app.shared.*`, and stdlib/3rd-party — verified: the pipeline drives everything through ASGITransport). + +### Success Criteria + +- [ ] `DemoRunRequest()` (no args) serializes identically to today's defaults plus `preservation="ephemeral"`, `workspace_name=None`; a start frame without the new keys validates (legacy compatibility). +- [ ] `preservation="keep"` run → exactly one `showcase_workspace` row: status `completed` on a green run, `failed` when a step fails; `created_objects` carries the ids the run produced; `result_summary` carries winner/wape/wall-clock. +- [ ] `preservation="ephemeral"` (or omitted) → zero rows written, zero workspace queries issued. +- [ ] `workspace_name` with `preservation="ephemeral"` → 422 `application/problem+json`. +- [ ] `pipeline_complete.data.workspace_id` present (string on keep runs, `null` otherwise). +- [ ] Migration applies AND downgrades cleanly on a fresh DB; `schema-validation.yml` autogenerate drift check sees the model (env.py import added). +- [ ] `uv run ruff check . && uv run ruff format --check . && uv run mypy app/ && uv run pyright app/ && uv run pytest -v -m "not integration"` all green; integration suite green against docker-compose Postgres. + +## All Needed Context + +### Documentation & References + +```yaml +# MUST READ — codebase patterns (all verified 2026-06-12, branch dev @ 2c71928) + +- file: app/features/demo/schemas.py + why: | + DemoRunRequest lives at lines 29-61. ConfigDict(strict=True) at line 38. + The `scenario` field (line 57) shows the Field(strict=False) override pattern + for enum-on-the-wire; the NEW fields are JSON-native (str/Literal) so they + need NO strict=False. Copy the comment style used for the PRP-38 scenario field. + +- file: app/features/demo/pipeline.py + why: | + DemoContext dataclass at line 212 (add `workspace_id: str | None = None` after + the PRP-41 fields at line 256). Orchestrator run_pipeline at line 2554: ctx is + built at 2582-2587; the _Client context opens at 2595; the fail-path alias + restore at 2661-2668; pipeline_complete is yielded at 2671-2691 — finalize the + workspace BEFORE this yield and add "workspace_id" to its data dict (line 2681). + The orchestrator MUST NEVER raise (contract in docstring, lines 2557-2558). + +- file: app/features/demo/service.py + why: | + Single-flight asyncio.Lock at line 19 — only one pipeline runs at a time, so + workspace-row writes have no concurrency races. run_pipeline_sync (line 46) + builds DemoRunResult from the pipeline_complete event — no change needed there + unless you surface workspace_id on DemoRunResult (optional, recommended: + additive `workspace_id: str | None = None` field mirroring `winning_run_id`). + +- file: app/features/demo/routes.py + why: | + POST /demo/run (line 38) and WS /demo/stream (line 57). The WS start frame is + validated via DemoRunRequest.model_validate(raw) at line 73 — pydantic default + (no extra="forbid") IGNORES unknown keys, so old/new clients interoperate. + Routes have NO DB dependency today and need none — the workspace module opens + its own sessions. + +- file: app/features/batch/models.py + why: | + THE precedent for "a slice owns its own table": Base + TimestampMixin imports + (lines 42-43), Mapped[]/mapped_column patterns, String(32) unique external id + (line 143), JSONB columns (lines 145-146, 159-160), CheckConstraint + + composite Index in __table_args__ (lines 166-180). Mirror this file's shape. + +- file: app/features/scenarios/models.py + why: | + Second precedent: JSONB with server_default text("'[]'::jsonb") (lines 74-76), + CHECK constraint naming convention ck__(lines 102-115). + GOTCHA in its docstring: SQLAlchemy reserves attribute name `metadata` — + never name a column/attribute that. + +- file: app/shared/models.py + why: TimestampMixin (created_at/updated_at, server_default=func.now()) — use it. + +- file: alembic/env.py + why: | + Lines 15-24: every slice with models registers via + `from app.features. import models as _models # noqa: F401`. + ADD `from app.features.demo import models as demo_models # noqa: F401` + in alphabetical position (after data_platform, before explainability). + +- file: alembic/versions/e4f5a6b7c8d9_add_model_selection_decision_promotion.py + why: | + CURRENT HEAD revision is e4f5a6b7c8d9 (verified `uv run alembic heads`). + Your new migration's down_revision = "e4f5a6b7c8d9". Copy the header/docstring + format, the typing (`revision: str`, `down_revision: str | None`), and the + upgrade()/downgrade() docstring style. + +- file: alembic/versions/43e35957a248_create_scenario_plan_table.py + why: | + create_table + named CheckConstraint + op.create_index (incl. GIN with + postgresql_using='gin', lines 62-70) — the create-table migration to mirror. + +- file: app/core/database.py + why: | + Base class + get_session_maker(). The workspace module opens sessions via + get_session_maker() (NOT a request dependency) because run_pipeline is not + request-scoped. Precedent: app/main.py:63-65 (lifespan) and the agents + websocket per-message sessions. + +- file: app/main.py + why: | + Lines 62-71 — the warn-and-continue pattern ("config must never block + startup"): try/except Exception + logger.warning with error & error_type. + Workspace writes use exactly this pattern ("workspace must never break the demo"). + +- file: app/features/scenarios/service.py + why: | + create_plan (line 354) — canonical async service write: build ORM object, + db.add, await db.commit() (line 423), await db.refresh (line 424). + Follow for create_workspace/finalize_workspace. + +- file: app/core/exceptions.py + why: | + ForecastLabError subclasses → RFC 7807 via registered handlers. The 422 on + workspace_name+ephemeral comes FREE from pydantic validation at the boundary + (FastAPI → 422 problem+json via the validation handler); no manual raise needed. + +- file: app/features/demo/tests/conftest.py + why: | + The demo test client fixture (ASGITransport over app.main.app); route tests + monkeypatch the demo service so the real pipeline never runs. + +- file: app/features/demo/tests/test_schemas.py + why: | + Existing DemoRunRequest tests INCLUDING the JSON-path convention + (Model.model_validate({json-shaped dict}) — mandated by + .claude/rules/security-patterns.md § strict mode). Extend this file. + +- file: app/features/scenarios/tests/conftest.py + why: | + Integration DB fixture precedent (async_sessionmaker over create_async_engine, + line 52-59) — copy for the workspace/model integration tests. + +- file: docs/_base/API_CONTRACTS.md + why: | + The POST /demo/run row and "WebSocket Events (/demo/stream)" section document + the start-frame fields — add the two new Optional fields + the additive + pipeline_complete data.workspace_id key, in the same additive-note style as + the PRP-38 scenario field. + +# Issue / initiative context +- url: https://github.com/w7-mgfcode/ForecastLabAI/issues/390 + why: The epic this PRP implements (Foundation; blocks #391 #392 #393). +- url: https://github.com/w7-mgfcode/ForecastLabAI/issues/389 + why: Umbrella — success criteria + out-of-scope list (no export, no per-phase config, no endpoints beyond recording in E1). +``` + +### Current Codebase tree (relevant subset) + +```bash +app/features/demo/ +├── __init__.py +├── pipeline.py # 2692 lines; DemoContext @212; run_pipeline @2554 +├── routes.py # POST /demo/run @38; WS /demo/stream @57 +├── schemas.py # DemoRunRequest @29; StepEvent @64; DemoRunResult @106 +├── service.py # asyncio.Lock single-flight @19 +└── tests/ + ├── conftest.py # ASGITransport client fixture + ├── test_pipeline.py + ├── test_routes.py + └── test_schemas.py +alembic/ +├── env.py # model imports @15-24 (NO demo import yet) +└── versions/ # head: e4f5a6b7c8d9 +``` + +### Desired Codebase tree (files added/modified) + +```bash +app/features/demo/ +├── models.py # NEW — ShowcaseWorkspace ORM (+ status constants) +├── workspace.py # NEW — create/finalize/get/list (session-maker based) +├── schemas.py # MOD — DemoRunRequest +preservation +workspace_name (+model_validator); +│ # DemoRunResult +workspace_id (additive Optional) +├── pipeline.py # MOD — DemoContext.workspace_id; create/finalize hooks in run_pipeline +├── service.py # MOD — surface workspace_id on DemoRunResult (1 line in the final build) +└── tests/ + ├── test_schemas.py # MOD — new-field defaults, JSON path, pattern, ephemeral+name=422, legacy frame + ├── test_models.py # NEW — constraint + CRUD (integration) + ├── test_workspace.py # NEW — create/finalize/get/list (integration) + ├── test_pipeline.py # MOD — keep-mode creates+finalizes (workspace fns monkeypatched); ephemeral writes nothing + └── test_routes.py # MOD — passthrough of new fields (service monkeypatched); WS legacy frame +alembic/ +├── env.py # MOD — +demo models import +└── versions/a1b2c3d4e5f6_create_showcase_workspace_table.py # NEW (id illustrative — generate your own 12-hex) +docs/_base/API_CONTRACTS.md # MOD — additive contract notes +``` + +### Known Gotchas & Library Quirks + +```python +# CRITICAL — strict mode: DemoRunRequest has ConfigDict(strict=True) (schemas.py:38). +# The new fields are JSON-native (Literal[str] / str|None) → NO Field(strict=False) +# needed. test_strict_mode_policy.py (AST walker) only fires on +# date/datetime/time/UUID/Decimal — neither new field triggers it. + +# CRITICAL — the orchestrator must NEVER raise (pipeline.py:2557 contract). +# Wrap every workspace DB call in try/except Exception + logger.warning +# (pattern: app/main.py:62-71). A dead Postgres must not kill the demo stream. + +# CRITICAL — pipeline_complete is ALWAYS emitted (even on step failure via the +# break at pipeline.py:2668). Finalize the workspace row BEFORE the final yield +# at 2671 so the failure path records partial created_objects too. + +# CRITICAL — NO ForeignKeys on showcase_workspace. ids are soft references. +# ctx.winning_run_id et al. are plain strings produced by HTTP responses; the +# referenced rows can be deleted independently (e.g. DELETE /registry/runs/{id}). + +# GOTCHA — SQLAlchemy reserves the declarative attr name `metadata` +# (scenarios/models.py:9-10). Use `created_objects` / `result_summary`. + +# GOTCHA — external-id convention is uuid.uuid4().hex (32 chars, python-side), +# String(32) unique+index — NOT server-side gen_random_uuid(). Matches +# batch_job.batch_id (batch/models.py:143) and scenario_plan.scenario_id. + +# GOTCHA — alembic/env.py MUST import the new models module (noqa: F401) or the +# schema-validation autogenerate drift check will not see the table and a later +# autogenerate would try to DROP it. + +# GOTCHA — alembic revision ids in this repo are hand-written 12-hex strings +# continuing the chain (head = e4f5a6b7c8d9). Either run +# `uv run alembic revision -m "create showcase_workspace table"` and keep the +# generated id, or hand-write one — but down_revision MUST be "e4f5a6b7c8d9". + +# GOTCHA — WS start frame: DemoRunRequest.model_validate(raw) at routes.py:73 with +# default model_config IGNORES unknown keys. Do NOT add extra="forbid" — that +# would break forward/backward compatibility deliberately relied upon. + +# GOTCHA — repo has mixed CRLF/LF line endings; check `git diff --stat` before +# committing to avoid whole-file noise diffs (Write/Edit emit LF — fine for NEW +# files; for schemas.py/pipeline.py edits, verify the diff is surgical). + +# GOTCHA — mypy --strict AND pyright --strict both gate merge. New modules need +# full annotations incl. return types on fixtures and `-> None` on tests. + +# CONVENTION — commits: `feat(api): ... (#390)`; branch off dev: +# feat/showcase-workspace-persistence-backbone (≤50 chars, kebab). +# NO AI co-author trailer (hook-enforced). + +# RUNTIME-VERIFICATION LOG (per prp-create step 3): +# - `uv run alembic heads` → e4f5a6b7c8d9 (verified 2026-06-12) +# - DemoRunRequest strict config + scenario strict=False → schemas.py:38,57 (read) +# - No FastAPI/SQLAlchemy/Pydantic API is cited here beyond patterns already +# working in-repo (JSONB, CheckConstraint, async_sessionmaker) — no external +# library claims requiring a one-off import probe. +``` + +## Implementation Blueprint + +### Data models and structure + +```python +# app/features/demo/models.py (NEW — mirror batch/models.py shape) +"""Showcase workspace ORM model. + +First table owned by the demo slice (precedent: app/features/batch/models.py). +A row = one preserved showcase run: its configuration and the ids of every +object the pipeline created. All recorded ids are OPAQUE SOFT REFERENCES — +deliberately no ForeignKey to model_run / scenario_plan / batch_job / +agent_session, so cross-slice schema coupling stays zero and referenced rows +remain independently deletable. +""" +from __future__ import annotations +import datetime as _dt +from typing import Any +from sqlalchemy import CheckConstraint, Date, Index, Integer, String, text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column +from app.core.database import Base +from app.shared.models import TimestampMixin + +WORKSPACE_STATUS_RUNNING = "running" +WORKSPACE_STATUS_COMPLETED = "completed" +WORKSPACE_STATUS_FAILED = "failed" + +class ShowcaseWorkspace(TimestampMixin, Base): + __tablename__ = "showcase_workspace" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + workspace_id: Mapped[str] = mapped_column(String(32), unique=True, index=True) # uuid4().hex + name: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + status: Mapped[str] = mapped_column( + String(20), default=WORKSPACE_STATUS_RUNNING, nullable=False, index=True + ) + # Run configuration — replay inputs (E4 reads these verbatim). + seed: Mapped[int] = mapped_column(Integer, nullable=False) + scenario: Mapped[str] = mapped_column(String(40), nullable=False) # ScenarioPreset.value + reset: Mapped[bool] = mapped_column(nullable=False, default=False) + skip_seed: Mapped[bool] = mapped_column(nullable=False, default=True) + # Grain + window discovered by the status/seed steps (nullable: unknown on early failure). + store_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + product_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + date_start: Mapped[_dt.date | None] = mapped_column(Date, nullable=True) + date_end: Mapped[_dt.date | None] = mapped_column(Date, nullable=True) + # Everything the run created — flexible JSONB (soft references, see module docstring). + created_objects: Mapped[dict[str, Any]] = mapped_column( + JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb") + ) + # winner_model_type / winner_wape / wall_clock_s / any_fail — display payload. + result_summary: Mapped[dict[str, Any] | None] = mapped_column(JSONB, nullable=True) + + __table_args__ = ( + CheckConstraint( + "status IN ('running', 'completed', 'failed')", + name="ck_showcase_workspace_status", + ), + Index("ix_showcase_workspace_status_created", "status", "created_at"), + ) +``` + +```python +# app/features/demo/schemas.py — DemoRunRequest additions (after `scenario`, line 61) + # E1 (#390): preservation policy. Default "ephemeral" keeps legacy behaviour + # byte-identical (no workspace row). Both fields are JSON-native, so the + # model-level strict=True needs no per-field override. + preservation: Literal["ephemeral", "keep"] = Field( + default="ephemeral", + description="'keep' records this run as a showcase_workspace row.", + ) + workspace_name: str | None = Field( + default=None, + max_length=100, + pattern=r"^[a-z0-9][a-z0-9\-_]*$", # same pattern as registry alias_name + description="Optional workspace label; requires preservation='keep'.", + ) + + @model_validator(mode="after") + def _workspace_name_requires_keep(self) -> DemoRunRequest: + if self.workspace_name is not None and self.preservation != "keep": + raise ValueError("workspace_name requires preservation='keep'") + return self +``` + +### List of tasks (dependency order) + +```yaml +Task 1 — branch & issue hygiene: + RUN: git switch dev && git pull && git switch -c feat/showcase-workspace-persistence-backbone + VERIFY: gh issue view 390 --json state # open + +Task 2 — CREATE app/features/demo/models.py: + - MIRROR shape: app/features/batch/models.py (Base+TimestampMixin, __table_args__) + - CONTENT: ShowcaseWorkspace + 3 status constants (see blueprint above) + - DOCSTRING: state the no-FK soft-reference decision explicitly + +Task 3 — MODIFY alembic/env.py: + - INSERT (alphabetical, after data_platform import at line 18): + from app.features.demo import models as demo_models # noqa: F401 + +Task 4 — CREATE migration alembic/versions/_create_showcase_workspace_table.py: + - down_revision = "e4f5a6b7c8d9" + - MIRROR: 43e35957a248_create_scenario_plan_table.py (create_table + named CHECK + + op.create_index incl. unique index on workspace_id + composite status/created_at) + - downgrade(): drop indexes then op.drop_table("showcase_workspace") + - VERIFY locally: uv run alembic upgrade head && uv run alembic downgrade -1 && uv run alembic upgrade head + +Task 5 — MODIFY app/features/demo/schemas.py: + - ADD model_validator import from pydantic + - ADD the two fields + validator to DemoRunRequest (blueprint above) + - ADD to DemoRunResult: workspace_id: str | None = Field(default=None, description=...) + - UPDATE DemoRunRequest docstring (the "every field is JSON-native" claim still holds — say so) + +Task 6 — CREATE app/features/demo/workspace.py: + - Module docstring: warn-and-continue contract; session-maker (not request-scoped) + - async def create_workspace(req: DemoRunRequest) -> str | None + # opens get_session_maker()() session; inserts row (uuid4().hex, status=running, + # config from req); commit; returns workspace_id. On ANY Exception: + # logger.warning("demo.workspace_create_failed", error=..., error_type=...); return None + - async def finalize_workspace(workspace_id: str, ctx: DemoContext, *, failed: bool) -> None + # loads row by workspace_id, sets status, store_id/product_id/date_start/date_end, + # created_objects (see pseudocode), result_summary; commit. Warn-and-continue. + # NOTE: import DemoContext under TYPE_CHECKING to avoid runtime import cycles + # (pipeline imports workspace; workspace needs only the ctx type). + - async def get_workspace(db: AsyncSession, workspace_id: str) -> ShowcaseWorkspace | None + - async def list_workspaces(db: AsyncSession, *, limit: int = 50, offset: int = 0) -> list[ShowcaseWorkspace] + # newest-first; unrouted in E1 — consumed by tests now, E4 routes later + +Task 7 — MODIFY app/features/demo/pipeline.py: + - DemoContext: ADD `workspace_id: str | None = None` after line 256 (PRP-41 block), + with an `# E1 (#390)` comment matching the per-PRP comment convention + - run_pipeline: AFTER ctx construction (line 2587): + if req.preservation == "keep": + ctx.workspace_id = await workspace.create_workspace(req) + - run_pipeline: BEFORE the pipeline_complete yield (line 2671): + if ctx.workspace_id is not None: + await workspace.finalize_workspace(ctx.workspace_id, ctx, failed=any_fail) + - pipeline_complete data dict: ADD "workspace_id": ctx.workspace_id + - import: from app.features.demo import workspace (module import, monkeypatch-friendly) + +Task 8 — MODIFY app/features/demo/service.py: + - run_pipeline_sync: thread workspace_id from final.data into DemoRunResult + (mirror the winning_run_id line at service.py:77) + +Task 9 — tests (see Validation Loop for the full matrix): + - MODIFY tests/test_schemas.py (unit) + - CREATE tests/test_models.py (@pytest.mark.integration) + - CREATE tests/test_workspace.py(@pytest.mark.integration) + - MODIFY tests/test_pipeline.py (unit — monkeypatch workspace.create_workspace/finalize_workspace) + - MODIFY tests/test_routes.py (unit — service monkeypatched) + +Task 10 — MODIFY docs/_base/API_CONTRACTS.md: + - POST /demo/run row: append "E1 (#390) — body accepts additive Optional + `preservation: 'ephemeral'|'keep'` (default 'ephemeral') and `workspace_name`; + `workspace_name` without `preservation='keep'` → 422." + - WS /demo/stream section: same note on the start frame + "`pipeline_complete.data` + gains additive `workspace_id` (string|null)." + +Task 11 — gates, commit, PR: + - RUN the five gates + integration suite (Validation Loop) + - git diff --stat # confirm surgical diffs (CRLF noise check) + - COMMITS (reference #390, no AI trailer), e.g.: + feat(api): add showcase_workspace model and migration (#390) + feat(api): record demo run objects into showcase workspace (#390) + docs(api): document preservation and workspace_name fields (#390) + - PR into dev; title `feat(api): showcase workspace persistence backbone (#390)` +``` + +### Per-task pseudocode — the finalize payload (Task 6) + +```python +def _collect_created_objects(ctx: DemoContext) -> dict[str, Any]: + """Map DemoContext accumulator fields -> created_objects JSONB. + + Every value is already a plain str/None on ctx (HTTP response payloads). + Drop None values so the JSONB stays sparse and greppable. + """ + raw: dict[str, Any] = { + "winning_run_id": ctx.winning_run_id, # pipeline.py:234 + "v2_run_id": ctx.v2_run_id, # :237 + "v2_model_path": ctx.v2_model_path, # :238 (artifact path) + "alias": "demo-production" if ctx.winning_run_id else None, # DEMO_ALIAS + "agent_session_id": ctx.session_id, # :235 + "batch_id": ctx.batch_id, # :245 + "scenario_plan_ids": [ + s for s in (ctx.price_cut_scenario_id, ctx.holiday_scenario_id) if s + ], # :250-251 + "scenario_artifact_key": ctx.scenario_artifact_key, # :249 + "train_model_types": sorted(ctx.train_results), # :230 (keys only) + "stale_alias_run_id": ctx.stale_alias_run_id, # :243 (PRP-39 controlled row) + } + return {k: v for k, v in raw.items() if v not in (None, [])} + +# finalize_workspace core (warn-and-continue wrapper around ALL of it): +async def finalize_workspace(workspace_id: str, ctx: "DemoContext", *, failed: bool) -> None: + try: + session_maker = get_session_maker() + async with session_maker() as db: + row = (await db.execute( + select(ShowcaseWorkspace).where(ShowcaseWorkspace.workspace_id == workspace_id) + )).scalar_one_or_none() + if row is None: # create failed earlier — nothing to finalize + return + row.status = WORKSPACE_STATUS_FAILED if failed else WORKSPACE_STATUS_COMPLETED + row.store_id, row.product_id = ctx.store_id, ctx.product_id + row.date_start, row.date_end = ctx.date_start, ctx.date_end + row.created_objects = _collect_created_objects(ctx) + row.result_summary = { + "winner_model_type": ctx.winner_model_type, + "winner_wape": ctx.winner_wape, + } + await db.commit() + except Exception as exc: # workspace must never break the demo (app/main.py:62 pattern) + logger.warning("demo.workspace_finalize_failed", + workspace_id=workspace_id, error=str(exc), error_type=type(exc).__name__) +``` + +### Integration Points + +```yaml +DATABASE: + - migration: create showcase_workspace (PK id, unique workspace_id, CHECK status, + composite ix status+created_at, JSONB created_objects/result_summary) + - registration: alembic/env.py demo models import (Task 3) + +CONFIG: none — no new settings, no env vars. + +ROUTES: none added in E1. Existing /demo/run + /demo/stream gain fields via schema only. + +FRONTEND: none in E1 (epic #393/E4 wires the UI; adding the optional fields to + frontend/src/types/api.ts DemoRunRequest interface is additive whenever needed). + +DOCS: docs/_base/API_CONTRACTS.md additive notes (Task 10). RUNBOOKS/DOMAIN_MODEL + sweeps belong to the E5 release gate — do not scope-creep them here. +``` + +## Validation Loop + +### Level 1: Syntax & Style + +```bash +uv run ruff check . && uv run ruff format --check . +uv run mypy app/ && uv run pyright app/ +# Expected: clean. Both type checkers are --strict and gate merge. +``` + +### Level 2: Unit Tests (no DB) + +```python +# tests/test_schemas.py — add: +def test_demo_run_request_new_field_defaults() -> None: ... + # DemoRunRequest() -> preservation == "ephemeral", workspace_name is None + +def test_demo_run_request_json_path_keep_with_name() -> None: ... + # DemoRunRequest.model_validate({"preservation": "keep", "workspace_name": "bf-demo"}) + # — the MANDATORY json-dict path per security-patterns.md + +def test_demo_run_request_legacy_frame_still_validates() -> None: ... + # model_validate({"seed": 7}) — no new keys — passes; defaults applied + +def test_demo_run_request_workspace_name_requires_keep() -> None: ... + # pytest.raises(ValidationError): model_validate({"workspace_name": "x"}) + +def test_demo_run_request_workspace_name_pattern_rejected() -> None: ... + # "Black Friday!" and "-leading-dash" both raise ValidationError + +# tests/test_pipeline.py — add (monkeypatch app.features.demo.pipeline.workspace): +async def test_run_pipeline_keep_creates_and_finalizes_workspace(...) -> None: ... + # stub create_workspace -> "ws123"; run with canned _Client responses; + # assert finalize called once with failed matching outcome; + # assert pipeline_complete data["workspace_id"] == "ws123" + +async def test_run_pipeline_ephemeral_touches_no_workspace(...) -> None: ... + # stubs assert_not_called + +async def test_run_pipeline_workspace_create_failure_does_not_break_run(...) -> None: ... + # create_workspace returns None (its warn path) -> pipeline still completes, + # data["workspace_id"] is None + +# tests/test_routes.py — add (service monkeypatched per existing pattern): +async def test_run_demo_accepts_preservation_fields(client) -> None: ... +async def test_run_demo_rejects_name_without_keep_422(client) -> None: ... + # response.status_code == 422; content-type application/problem+json +``` + +```bash +uv run pytest app/features/demo -v -m "not integration" +uv run pytest app/core/tests/test_strict_mode_policy.py -v # AST walker still green +``` + +### Level 3: Integration (real Postgres) + +```python +# tests/test_models.py + tests/test_workspace.py — @pytest.mark.integration, +# session fixture copied from app/features/scenarios/tests/conftest.py:52-59. +# Cases: insert/read roundtrip incl. JSONB; duplicate workspace_id -> IntegrityError; +# status CHECK violation -> IntegrityError; create_workspace persists config; +# finalize_workspace(failed=True/False) sets status + payloads; finalize on a +# missing id is a silent no-op; list_workspaces newest-first + limit/offset. +``` + +```bash +docker compose up -d +uv run alembic upgrade head +uv run alembic downgrade -1 && uv run alembic upgrade head # downgrade is clean +uv run pytest app/features/demo -v -m integration +``` + +### Level 4: Manual smoke (seeded local stack, uvicorn on :8123) + +```bash +curl -s -X POST http://localhost:8123/demo/run \ + -H 'Content-Type: application/json' \ + -d '{"skip_seed": true, "preservation": "keep", "workspace_name": "e1-smoke"}' | python3 -m json.tool | head -20 +# Expect overall_status pass + workspace_id non-null. Then: +docker exec forecastlab-postgres psql -U forecastlab -d forecastlab -c \ + "SELECT workspace_id, name, status, created_objects FROM showcase_workspace ORDER BY created_at DESC LIMIT 1;" +# Expect: status=completed, created_objects with winning_run_id etc. +curl -s -X POST http://localhost:8123/demo/run -H 'Content-Type: application/json' \ + -d '{"workspace_name": "bad"}' | python3 -m json.tool # 422 problem+json +``` + +## Final validation Checklist + +- [ ] All five gates green: `uv run ruff check . && uv run ruff format --check . && uv run mypy app/ && uv run pyright app/ && uv run pytest -v -m "not integration"` +- [ ] Integration suite green: `uv run pytest -v -m integration` (fresh docker-compose DB) +- [ ] Migration upgrade + downgrade clean on a fresh DB; env.py imports demo models +- [ ] Legacy start frame (`{"seed": 42}`) behaves byte-identically (no row, no workspace key absent — `workspace_id: null` present in pipeline_complete data is the ONLY delta, and it is additive) +- [ ] Manual smoke (Level 4) passes: keep→row recorded, ephemeral→no row, name-without-keep→422 +- [ ] `git diff --stat` shows surgical diffs (no CRLF whole-file noise) +- [ ] docs/_base/API_CONTRACTS.md updated additively +- [ ] Commits formatted `feat(api)/docs(api): ... (#390)`, no AI trailer; PR into dev + +--- + +## Anti-Patterns to Avoid + +- ❌ Don't add ForeignKeys from showcase_workspace to other slices' tables — soft references only. +- ❌ Don't let a workspace DB error propagate out of run_pipeline — warn-and-continue, always. +- ❌ Don't add `extra="forbid"` to DemoRunRequest — unknown-key tolerance is the WS compat contract. +- ❌ Don't add list/get HTTP routes — that's epic #393 (E4); E1 ships the helpers unrouted. +- ❌ Don't touch the localStorage history or any frontend file — E1 is backend-only. +- ❌ Don't edit existing migrations — new revision off head e4f5a6b7c8d9. +- ❌ Don't import another feature slice from app/features/demo/ — core/shared only. + +## Confidence Score + +**9/10** for one-pass implementation success. Every pattern has a verified in-repo +precedent (batch models, scenarios migration, lifespan warn-and-continue, demo test +monkeypatching); the two open judgment calls (exact `created_objects` key set and +whether `DemoRunResult.workspace_id` is surfaced) are both specified above and both +additive — a wrong guess costs a follow-up field, not a rework. The −1 is for the +pipeline-unit-test fixtures: canned `_Client` response sequences are fiddly and may +need iteration against the existing `test_pipeline.py` harness. diff --git a/alembic/env.py b/alembic/env.py index 2cadd971..4f40a1ee 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -16,6 +16,7 @@ from app.features.batch import models as batch_models # noqa: F401 from app.features.config import models as config_models # noqa: F401 from app.features.data_platform import models as data_platform_models # noqa: F401 +from app.features.demo import models as demo_models # noqa: F401 from app.features.explainability import models as explainability_models # noqa: F401 from app.features.jobs import models as jobs_models # noqa: F401 from app.features.model_selection import models as model_selection_models # noqa: F401 diff --git a/alembic/versions/324a2fa37fcc_create_showcase_workspace_table.py b/alembic/versions/324a2fa37fcc_create_showcase_workspace_table.py new file mode 100644 index 00000000..a3dd4bc2 --- /dev/null +++ b/alembic/versions/324a2fa37fcc_create_showcase_workspace_table.py @@ -0,0 +1,103 @@ +"""create showcase_workspace table + +Revision ID: 324a2fa37fcc +Revises: e4f5a6b7c8d9 +Create Date: 2026-06-12 10:00:00.000000 + +E1 of the showcase-workspace initiative (umbrella #389, epic #390). First +table owned by the demo slice: one row per preserved showcase run -- its +configuration (replay inputs) plus the soft-reference ids of every object the +pipeline created. Deliberately NO ForeignKey to ``model_run`` / +``scenario_plan`` / ``batch_job`` / ``agent_session`` -- recorded ids are +opaque soft references so cross-slice schema coupling stays zero and the +referenced rows remain independently deletable. +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "324a2fa37fcc" +down_revision: str | None = "e4f5a6b7c8d9" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Apply migration -- create the showcase_workspace table.""" + op.create_table( + "showcase_workspace", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("workspace_id", sa.String(length=32), nullable=False), + sa.Column("name", sa.String(length=100), nullable=True), + sa.Column("status", sa.String(length=20), nullable=False), + sa.Column("seed", sa.Integer(), nullable=False), + sa.Column("scenario", sa.String(length=40), nullable=False), + sa.Column("reset", sa.Boolean(), nullable=False), + sa.Column("skip_seed", sa.Boolean(), nullable=False), + sa.Column("store_id", sa.Integer(), nullable=True), + sa.Column("product_id", sa.Integer(), nullable=True), + sa.Column("date_start", sa.Date(), nullable=True), + sa.Column("date_end", sa.Date(), nullable=True), + sa.Column( + "created_objects", + postgresql.JSONB(astext_type=sa.Text()), + server_default=sa.text("'{}'::jsonb"), + nullable=False, + ), + sa.Column("result_summary", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.CheckConstraint( + "status IN ('running', 'completed', 'failed')", + name="ck_showcase_workspace_status", + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_showcase_workspace_workspace_id"), + "showcase_workspace", + ["workspace_id"], + unique=True, + ) + op.create_index( + op.f("ix_showcase_workspace_name"), + "showcase_workspace", + ["name"], + unique=False, + ) + op.create_index( + op.f("ix_showcase_workspace_status"), + "showcase_workspace", + ["status"], + unique=False, + ) + op.create_index( + "ix_showcase_workspace_status_created", + "showcase_workspace", + ["status", "created_at"], + unique=False, + ) + + +def downgrade() -> None: + """Revert migration -- drop the showcase_workspace table.""" + op.drop_index("ix_showcase_workspace_status_created", table_name="showcase_workspace") + op.drop_index(op.f("ix_showcase_workspace_status"), table_name="showcase_workspace") + op.drop_index(op.f("ix_showcase_workspace_name"), table_name="showcase_workspace") + op.drop_index(op.f("ix_showcase_workspace_workspace_id"), table_name="showcase_workspace") + op.drop_table("showcase_workspace") diff --git a/app/features/demo/models.py b/app/features/demo/models.py new file mode 100644 index 00000000..30ad586e --- /dev/null +++ b/app/features/demo/models.py @@ -0,0 +1,89 @@ +"""Showcase workspace ORM model. + +First table owned by the demo slice (precedent: ``app/features/batch/models.py``). +A row = one preserved showcase run: its configuration (replay inputs) plus the +ids of every object the pipeline created. All recorded ids are OPAQUE SOFT +REFERENCES -- deliberately NO ForeignKey to ``model_run`` / ``scenario_plan`` / +``batch_job`` / ``agent_session``: a cross-slice FK would couple the demo +slice's schema to four other slices and break independent deletion (e.g. +``DELETE /registry/runs/{id}`` must keep working while a workspace row still +references the run). E1 of the showcase-workspace initiative (umbrella #389, +epic #390). + +GOTCHA: SQLAlchemy reserves the declarative attribute name ``metadata``; the +JSONB columns are therefore named ``created_objects`` and ``result_summary``. +""" + +from __future__ import annotations + +import datetime as _dt +from typing import Any + +from sqlalchemy import CheckConstraint, Date, Index, Integer, String, text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column + +from app.core.database import Base +from app.shared.models import TimestampMixin + +# Workspace lifecycle states -- guarded by a CHECK constraint. ``running`` is +# written at creation (before the first pipeline step executes); the finalize +# hook settles the row to ``completed`` or ``failed``. +WORKSPACE_STATUS_RUNNING = "running" +WORKSPACE_STATUS_COMPLETED = "completed" +WORKSPACE_STATUS_FAILED = "failed" + + +class ShowcaseWorkspace(TimestampMixin, Base): + """A preserved showcase run. + + Attributes: + id: Surrogate primary key. + workspace_id: Unique external identifier (UUID hex, 32 chars). + name: Optional human label from ``DemoRunRequest.workspace_name``. + status: Lifecycle state -- running / completed / failed (CHECK-constrained). + seed: Seeder seed the run was started with (replay input). + scenario: Seeder scenario preset value (replay input). + reset: Whether the run wiped the database before seeding (replay input). + skip_seed: Whether the run skipped the seed step (replay input). + store_id: Showcase grain store id; NULL when the run failed early. + product_id: Showcase grain product id; NULL when the run failed early. + date_start: Seeded data window start; NULL when unknown. + date_end: Seeded data window end; NULL when unknown. + created_objects: Soft-reference ids of everything the run created (JSONB). + result_summary: Winner / WAPE / wall-clock display payload (JSONB). + """ + + __tablename__ = "showcase_workspace" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + workspace_id: Mapped[str] = mapped_column(String(32), unique=True, index=True) + name: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + status: Mapped[str] = mapped_column( + String(20), default=WORKSPACE_STATUS_RUNNING, nullable=False, index=True + ) + # Run configuration -- replay inputs (E4 restore/replay reads these verbatim). + seed: Mapped[int] = mapped_column(Integer, nullable=False) + scenario: Mapped[str] = mapped_column(String(40), nullable=False) + reset: Mapped[bool] = mapped_column(nullable=False, default=False) + skip_seed: Mapped[bool] = mapped_column(nullable=False, default=True) + # Grain + window discovered by the status/seed steps (NULL on early failure). + store_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + product_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + date_start: Mapped[_dt.date | None] = mapped_column(Date, nullable=True) + date_end: Mapped[_dt.date | None] = mapped_column(Date, nullable=True) + # Everything the run created -- flexible JSONB of soft references (see the + # module docstring for the deliberate no-FK decision). + created_objects: Mapped[dict[str, Any]] = mapped_column( + JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb") + ) + # winner_model_type / winner_wape / wall_clock_s -- display payload. + result_summary: Mapped[dict[str, Any] | None] = mapped_column(JSONB, nullable=True) + + __table_args__ = ( + CheckConstraint( + "status IN ('running', 'completed', 'failed')", + name="ck_showcase_workspace_status", + ), + Index("ix_showcase_workspace_status_created", "status", "created_at"), + ) diff --git a/app/features/demo/pipeline.py b/app/features/demo/pipeline.py index 041d5361..9af07a3f 100644 --- a/app/features/demo/pipeline.py +++ b/app/features/demo/pipeline.py @@ -40,6 +40,7 @@ from app.core.config import get_settings from app.core.logging import get_logger from app.core.problem_details import EMBEDDING_AUTH_CODE, ERROR_TYPES +from app.features.demo import workspace from app.features.demo.schemas import DemoRunRequest, StepEvent, StepStatus from app.shared.seeder.config import ScenarioPreset @@ -254,6 +255,9 @@ class DemoContext: # step_agent_hitl_flow on SHOWCASE_RICH. Remain None on every other path. approval_action_id: str | None = None agent_approval_decision: str | None = None # "executed"|"rejected"|"expired"|"timed_out" + # E1 (#390) -- workspace persistence. Set only on preservation="keep" runs + # (and only when the row insert succeeded); None on ephemeral runs. + workspace_id: str | None = None # ============================================================================= @@ -2585,6 +2589,11 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE reset=req.reset, scenario=req.scenario, ) + # E1 (#390) -- create the workspace row BEFORE the first step executes so + # even an early failure records the run config. create_workspace is + # warn-and-continue: a DB failure returns None and the run proceeds. + if req.preservation == "keep": + ctx.workspace_id = await workspace.create_workspace(req) wall_start = time.monotonic() any_fail = False # PRP-41 — buffer for intermediate events the HITL step emits via @@ -2668,6 +2677,13 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE break wall = time.monotonic() - wall_start + # E1 (#390) -- settle the workspace row BEFORE the final yield so the + # mid-run-failure path records partial created_objects too. + # finalize_workspace is warn-and-continue: it never raises. + if ctx.workspace_id is not None: + await workspace.finalize_workspace( + ctx.workspace_id, ctx, failed=any_fail, wall_clock_s=wall + ) yield StepEvent( event_type="pipeline_complete", step_name="summary", @@ -2687,5 +2703,8 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE # PRP-38 — expose the V2 run id when set so the Inspect deep # link can target /explorer/runs/{v2_run_id}. "v2_run_id": ctx.v2_run_id, + # E1 (#390) -- additive; a string on preservation='keep' runs, + # None otherwise (legacy clients ignore unknown keys). + "workspace_id": ctx.workspace_id, }, ) diff --git a/app/features/demo/schemas.py b/app/features/demo/schemas.py index e7f0aa4b..e02738af 100644 --- a/app/features/demo/schemas.py +++ b/app/features/demo/schemas.py @@ -11,7 +11,7 @@ from datetime import UTC, datetime from typing import Any, Literal -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, model_validator from app.shared.seeder.config import ScenarioPreset @@ -29,10 +29,12 @@ def _utc_now() -> datetime: class DemoRunRequest(BaseModel): """Request body for ``POST /demo/run`` and the ``WS /demo/stream`` start frame. - Every field is JSON-native (``int`` / ``bool``), so ``ConfigDict(strict=True)`` - is safe with no ``Field(strict=False)`` override -- there is no - ``date`` / ``datetime`` / ``UUID`` / ``Decimal`` field (see - ``.claude/rules/security-patterns.md`` and ``test_strict_mode_policy.py``). + Every field is JSON-native (``int`` / ``bool`` / ``str`` / ``Literal``), so + ``ConfigDict(strict=True)`` is safe with no ``Field(strict=False)`` + override -- there is no ``date`` / ``datetime`` / ``UUID`` / ``Decimal`` + field (see ``.claude/rules/security-patterns.md`` and + ``test_strict_mode_policy.py``). The sole exception is ``scenario``, whose + enum-on-the-wire form carries its own override (PRP-38). """ model_config = ConfigDict(strict=True) @@ -59,6 +61,28 @@ class DemoRunRequest(BaseModel): strict=False, description="Seeder scenario preset that drives the pipeline shape.", ) + # E1 (#390): preservation policy. Default "ephemeral" keeps legacy + # behaviour byte-identical (no workspace row). Both new fields are + # JSON-native (Literal[str] / str), so the model-level ``strict=True`` + # needs no per-field override. + preservation: Literal["ephemeral", "keep"] = Field( + default="ephemeral", + description="'keep' records this run as a showcase_workspace row.", + ) + workspace_name: str | None = Field( + default=None, + max_length=100, + # Same pattern as the registry alias_name (registry/schemas.py). + pattern=r"^[a-z0-9][a-z0-9\-_]*$", + description="Optional workspace label; requires preservation='keep'.", + ) + + @model_validator(mode="after") + def _workspace_name_requires_keep(self) -> DemoRunRequest: + """Reject a workspace_name on a run that does not keep a workspace.""" + if self.workspace_name is not None and self.preservation != "keep": + raise ValueError("workspace_name requires preservation='keep'") + return self class StepEvent(BaseModel): @@ -134,3 +158,9 @@ class DemoRunResult(BaseModel): default=0.0, description="Total pipeline wall-clock in seconds.", ) + # E1 (#390): additive Optional field mirroring ``winning_run_id`` -- + # ``None`` on ephemeral runs, the workspace_id on preservation='keep' runs. + workspace_id: str | None = Field( + default=None, + description="showcase_workspace id recorded for this run, if kept.", + ) diff --git a/app/features/demo/service.py b/app/features/demo/service.py index cc3dd8a6..514d3b3f 100644 --- a/app/features/demo/service.py +++ b/app/features/demo/service.py @@ -77,4 +77,6 @@ async def run_pipeline_sync(app: FastAPI, req: DemoRunRequest) -> DemoRunResult: winning_run_id=final.data.get("winning_run_id"), alias=final.data.get("alias"), wall_clock_s=float(wall_clock) if isinstance(wall_clock, (int, float)) else 0.0, + # E1 (#390) -- additive; mirrors the winning_run_id passthrough. + workspace_id=final.data.get("workspace_id"), ) diff --git a/app/features/demo/tests/conftest.py b/app/features/demo/tests/conftest.py index c4653ff7..607eb163 100644 --- a/app/features/demo/tests/conftest.py +++ b/app/features/demo/tests/conftest.py @@ -4,7 +4,11 @@ import pytest from httpx import ASGITransport, AsyncClient +from sqlalchemy import delete +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from app.core.config import get_settings +from app.features.demo.models import ShowcaseWorkspace from app.main import app @@ -20,3 +24,27 @@ async def client() -> AsyncGenerator[AsyncClient, None]: base_url="http://demo-test", ) as ac: yield ac + + +@pytest.fixture +async def db_session() -> AsyncGenerator[AsyncSession, None]: + """Yield an async session; wipe every showcase_workspace row on teardown. + + E1 (#390) integration fixture (pattern: + ``app/features/scenarios/tests/conftest.py``). ``showcase_workspace`` is a + slice-private table -- no seeder or other slice writes it -- so the + teardown safely wipes it whole rather than relying on a row marker. + """ + settings = get_settings() + engine = create_async_engine(settings.database_url, echo=False) + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + async with session_maker() as session: + try: + yield session + finally: + await session.rollback() + await session.execute(delete(ShowcaseWorkspace)) + await session.commit() + + await engine.dispose() diff --git a/app/features/demo/tests/test_models.py b/app/features/demo/tests/test_models.py new file mode 100644 index 00000000..91c9d0e5 --- /dev/null +++ b/app/features/demo/tests/test_models.py @@ -0,0 +1,111 @@ +"""Integration tests for the ShowcaseWorkspace ORM model (E1, #390). + +Run against the real docker-compose Postgres (``docker compose up -d`` + +``uv run alembic upgrade head`` required). Constraint tests assert the +DB-level guarantees the migration created (unique workspace_id, status CHECK). +""" + +from __future__ import annotations + +import uuid +from datetime import date + +import pytest +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.features.demo.models import ( + WORKSPACE_STATUS_COMPLETED, + WORKSPACE_STATUS_RUNNING, + ShowcaseWorkspace, +) +from app.features.demo.workspace import get_workspace + +pytestmark = pytest.mark.integration + + +def _make_row(**overrides: object) -> ShowcaseWorkspace: + """Build a valid ShowcaseWorkspace row; keyword overrides win.""" + values: dict[str, object] = { + "workspace_id": uuid.uuid4().hex, + "name": "it-row", + "seed": 42, + "scenario": "demo_minimal", + "reset": False, + "skip_seed": True, + } + values.update(overrides) + return ShowcaseWorkspace(**values) + + +async def test_showcase_workspace_crud_roundtrip(db_session: AsyncSession) -> None: + """Insert a full row incl. JSONB payloads and read it back intact.""" + created = { + "winning_run_id": "run-abc123", + "scenario_plan_ids": ["scn-1", "scn-2"], + } + summary = {"winner_model_type": "seasonal_naive", "winner_wape": 0.15} + row = _make_row( + status=WORKSPACE_STATUS_COMPLETED, + store_id=7, + product_id=3, + date_start=date(2026, 1, 1), + date_end=date(2026, 3, 31), + created_objects=created, + result_summary=summary, + ) + db_session.add(row) + await db_session.commit() + + loaded = await get_workspace(db_session, row.workspace_id) + assert loaded is not None + assert loaded.status == WORKSPACE_STATUS_COMPLETED + assert loaded.name == "it-row" + assert loaded.seed == 42 + assert loaded.scenario == "demo_minimal" + assert loaded.store_id == 7 + assert loaded.product_id == 3 + assert loaded.date_start == date(2026, 1, 1) + assert loaded.date_end == date(2026, 3, 31) + assert loaded.created_objects == created + assert loaded.result_summary == summary + assert loaded.created_at is not None + assert loaded.updated_at is not None + + +async def test_showcase_workspace_defaults_applied(db_session: AsyncSession) -> None: + """A minimal insert gets running status + empty created_objects.""" + row = _make_row(name=None) + db_session.add(row) + await db_session.commit() + + loaded = await get_workspace(db_session, row.workspace_id) + assert loaded is not None + assert loaded.status == WORKSPACE_STATUS_RUNNING + assert loaded.name is None + assert loaded.created_objects == {} + assert loaded.result_summary is None + assert loaded.store_id is None + assert loaded.product_id is None + + +async def test_showcase_workspace_duplicate_workspace_id_rejected( + db_session: AsyncSession, +) -> None: + """The unique index on workspace_id rejects a duplicate insert.""" + workspace_id = uuid.uuid4().hex + db_session.add(_make_row(workspace_id=workspace_id)) + await db_session.commit() + + db_session.add(_make_row(workspace_id=workspace_id)) + with pytest.raises(IntegrityError): + await db_session.commit() + await db_session.rollback() + + +async def test_showcase_workspace_status_check_violation(db_session: AsyncSession) -> None: + """The status CHECK constraint rejects values outside the state set.""" + db_session.add(_make_row(status="archived")) + with pytest.raises(IntegrityError): + await db_session.commit() + await db_session.rollback() diff --git a/app/features/demo/tests/test_pipeline.py b/app/features/demo/tests/test_pipeline.py index 5f73a8c8..b9b37f07 100644 --- a/app/features/demo/tests/test_pipeline.py +++ b/app/features/demo/tests/test_pipeline.py @@ -2110,3 +2110,152 @@ async def request(self, step: str, method: str, path: str, **_kw: Any) -> dict[s "total_aliases": 0, "degrading_health_count": 0, } + + +# ============================================================================= +# E1 (#390) -- workspace persistence hooks +# ============================================================================= + + +class _WorkspaceSpy: + """Recording stand-in for the workspace module's create/finalize hooks.""" + + def __init__(self, create_returns: str | None = "ws-e1-test") -> None: + self.create_calls: list[Any] = [] + self.finalize_calls: list[dict[str, Any]] = [] + self._create_returns = create_returns + + async def create_workspace(self, req: Any) -> str | None: + self.create_calls.append(req) + return self._create_returns + + async def finalize_workspace( + self, + workspace_id: str, + ctx: Any, + *, + failed: bool, + wall_clock_s: float | None = None, + ) -> None: + self.finalize_calls.append( + {"workspace_id": workspace_id, "failed": failed, "wall_clock_s": wall_clock_s} + ) + + +async def test_run_pipeline_keep_creates_and_finalizes_workspace(monkeypatch, tmp_path): + """E1 (#390) -- keep run: create before steps, finalize before the yield.""" + artifact = tmp_path / "m.joblib" + artifact.write_bytes(b"x") + monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) + wapes = {"naive": 0.3, "seasonal_naive": 0.1, "moving_average": 0.2} + monkeypatch.setattr(pipeline, "_Client", _build_fake_client(str(artifact), wapes)) + spy = _WorkspaceSpy() + monkeypatch.setattr(pipeline, "workspace", spy) + + req = DemoRunRequest.model_validate({"preservation": "keep", "workspace_name": "e1-test"}) + events = [e async for e in pipeline.run_pipeline(app=_FAKE_APP, req=req)] + + assert len(spy.create_calls) == 1 + assert spy.create_calls[0] is req + assert len(spy.finalize_calls) == 1 + assert spy.finalize_calls[0]["workspace_id"] == "ws-e1-test" + assert spy.finalize_calls[0]["failed"] is False + assert spy.finalize_calls[0]["wall_clock_s"] is not None + + final = events[-1] + assert final.event_type == "pipeline_complete" + assert final.status == "pass" + assert final.data["workspace_id"] == "ws-e1-test" + + +async def test_run_pipeline_ephemeral_touches_no_workspace(monkeypatch, tmp_path): + """E1 (#390) -- default (ephemeral) run issues zero workspace calls.""" + artifact = tmp_path / "m.joblib" + artifact.write_bytes(b"x") + monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) + wapes = {"naive": 0.3, "seasonal_naive": 0.1, "moving_average": 0.2} + monkeypatch.setattr(pipeline, "_Client", _build_fake_client(str(artifact), wapes)) + spy = _WorkspaceSpy() + monkeypatch.setattr(pipeline, "workspace", spy) + + events = [e async for e in pipeline.run_pipeline(app=_FAKE_APP, req=DemoRunRequest())] + + assert spy.create_calls == [] + assert spy.finalize_calls == [] + final = events[-1] + assert final.event_type == "pipeline_complete" + # The key is additive and present, with a null value on ephemeral runs. + assert "workspace_id" in final.data + assert final.data["workspace_id"] is None + + +async def test_run_pipeline_workspace_create_failure_does_not_break_run(monkeypatch, tmp_path): + """E1 (#390) -- create_workspace's warn path (None) leaves the run green.""" + artifact = tmp_path / "m.joblib" + artifact.write_bytes(b"x") + monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg"))) + wapes = {"naive": 0.3, "seasonal_naive": 0.1, "moving_average": 0.2} + monkeypatch.setattr(pipeline, "_Client", _build_fake_client(str(artifact), wapes)) + spy = _WorkspaceSpy(create_returns=None) + monkeypatch.setattr(pipeline, "workspace", spy) + + req = DemoRunRequest.model_validate({"preservation": "keep"}) + events = [e async for e in pipeline.run_pipeline(app=_FAKE_APP, req=req)] + + assert len(spy.create_calls) == 1 + # No row was created, so there is nothing to finalize. + assert spy.finalize_calls == [] + final = events[-1] + assert final.event_type == "pipeline_complete" + assert final.status == "pass" + assert final.data["workspace_id"] is None + + +async def test_run_pipeline_keep_finalizes_failed_on_step_failure(monkeypatch): + """E1 (#390) -- a mid-run step failure still finalizes, with failed=True.""" + + class _FailingClient: + def __init__(self, _app: Any, *, event_sink: list[Any] | None = None) -> None: + self._event_sink = event_sink + + async def __aenter__(self) -> _FailingClient: + return self + + async def __aexit__(self, *_exc: object) -> None: + return None + + def yield_event(self, event: Any) -> None: + if self._event_sink is None: + return + self._event_sink.append(event) + + async def request( + self, + step: str, + method: str, + path: str, + *, + json_body: dict[str, Any] | None = None, + ) -> dict[str, Any]: + if path == "/health": + return {"status": "ok"} + if path == "/seeder/status": + raise pipeline._StepError( + "status", 500, {"title": "Database Error", "detail": "db down"} + ) + raise AssertionError(f"unexpected request after failure: {path}") + + monkeypatch.setattr(pipeline, "_Client", _FailingClient) + spy = _WorkspaceSpy() + monkeypatch.setattr(pipeline, "workspace", spy) + + req = DemoRunRequest.model_validate({"preservation": "keep"}) + events = [e async for e in pipeline.run_pipeline(app=_FAKE_APP, req=req)] + + assert len(spy.finalize_calls) == 1 + assert spy.finalize_calls[0]["workspace_id"] == "ws-e1-test" + assert spy.finalize_calls[0]["failed"] is True + final = events[-1] + assert final.event_type == "pipeline_complete" + assert final.status == "fail" + assert final.data["workspace_id"] == "ws-e1-test" diff --git a/app/features/demo/tests/test_routes.py b/app/features/demo/tests/test_routes.py index caad2d64..5158d1ca 100644 --- a/app/features/demo/tests/test_routes.py +++ b/app/features/demo/tests/test_routes.py @@ -112,3 +112,90 @@ async def fake_stream(_app, _params: DemoRunRequest) -> AsyncIterator[StepEvent] event = ws.receive_json() assert event["event_type"] == "error" assert "in progress" in event["detail"] + + +# ============================================================================= +# E1 (#390) -- preservation / workspace_name passthrough +# ============================================================================= + + +async def test_run_demo_accepts_preservation_fields( + client, monkeypatch, canned_result: DemoRunResult +): + """E1 (#390) -- the new optional fields validate and reach the service.""" + seen: dict[str, DemoRunRequest] = {} + + async def fake_run_sync(_app, params: DemoRunRequest) -> DemoRunResult: + seen["params"] = params + return canned_result + + monkeypatch.setattr(service, "run_pipeline_sync", fake_run_sync) + + resp = await client.post( + "/demo/run", + json={"skip_seed": True, "preservation": "keep", "workspace_name": "e1-route"}, + ) + assert resp.status_code == 200 + assert seen["params"].preservation == "keep" + assert seen["params"].workspace_name == "e1-route" + # The additive DemoRunResult field rides on the response (None here -- + # the canned result doesn't set it). + assert resp.json()["workspace_id"] is None + + +async def test_run_demo_rejects_name_without_keep_422(client): + """E1 (#390) -- workspace_name without preservation='keep' is a 422.""" + resp = await client.post("/demo/run", json={"workspace_name": "bad"}) + assert resp.status_code == 422 + assert resp.headers["content-type"].startswith("application/problem+json") + + +def test_demo_stream_websocket_accepts_preservation_fields(monkeypatch): + """E1 (#390) -- the WS start frame accepts the new fields end-to-end.""" + seen: dict[str, DemoRunRequest] = {} + + async def fake_stream(_app, params: DemoRunRequest) -> AsyncIterator[StepEvent]: + seen["params"] = params + yield StepEvent( + event_type="pipeline_complete", + step_name="summary", + step_index=11, + total_steps=11, + status="pass", + data={"workspace_id": "ws-route-test"}, + ) + + monkeypatch.setattr(service, "stream_pipeline", fake_stream) + + with TestClient(app).websocket_connect("/demo/stream") as ws: + ws.send_json({"preservation": "keep", "workspace_name": "ws-frame"}) + event = ws.receive_json() + assert event["event_type"] == "pipeline_complete" + assert event["data"]["workspace_id"] == "ws-route-test" + assert seen["params"].preservation == "keep" + assert seen["params"].workspace_name == "ws-frame" + + +def test_demo_stream_websocket_legacy_frame_ignores_unknown_keys(monkeypatch): + """E1 (#390) -- unknown start-frame keys stay ignored (the WS forward/ + backward compatibility contract; no extra='forbid').""" + seen: dict[str, DemoRunRequest] = {} + + async def fake_stream(_app, params: DemoRunRequest) -> AsyncIterator[StepEvent]: + seen["params"] = params + yield StepEvent( + event_type="pipeline_complete", + step_name="summary", + step_index=11, + total_steps=11, + status="pass", + ) + + monkeypatch.setattr(service, "stream_pipeline", fake_stream) + + with TestClient(app).websocket_connect("/demo/stream") as ws: + ws.send_json({"seed": 7, "future_key_from_a_newer_client": True}) + event = ws.receive_json() + assert event["event_type"] == "pipeline_complete" + assert seen["params"].seed == 7 + assert seen["params"].preservation == "ephemeral" diff --git a/app/features/demo/tests/test_schemas.py b/app/features/demo/tests/test_schemas.py index ed390ee7..bdbfaac3 100644 --- a/app/features/demo/tests/test_schemas.py +++ b/app/features/demo/tests/test_schemas.py @@ -47,6 +47,51 @@ def test_demo_run_request_scenario_rejects_unknown(): DemoRunRequest.model_validate({"scenario": "not_a_preset"}) +def test_demo_run_request_new_field_defaults(): + """E1 (#390) -- defaults preserve legacy behaviour (ephemeral, unnamed).""" + req = DemoRunRequest() + assert req.preservation == "ephemeral" + assert req.workspace_name is None + + +def test_demo_run_request_json_path_keep_with_name(): + """E1 (#390) -- the JSON wire form (validate_python on a parsed dict, the + path FastAPI uses) accepts keep + a named workspace.""" + req = DemoRunRequest.model_validate({"preservation": "keep", "workspace_name": "bf-demo"}) + assert req.preservation == "keep" + assert req.workspace_name == "bf-demo" + + +def test_demo_run_request_legacy_frame_still_validates(): + """E1 (#390) -- a legacy start frame without the new keys still validates.""" + req = DemoRunRequest.model_validate({"seed": 7}) + assert req.seed == 7 + assert req.preservation == "ephemeral" + assert req.workspace_name is None + + +def test_demo_run_request_workspace_name_requires_keep(): + """E1 (#390) -- workspace_name without preservation='keep' is rejected.""" + with pytest.raises(ValidationError): + DemoRunRequest.model_validate({"workspace_name": "x"}) + with pytest.raises(ValidationError): + DemoRunRequest.model_validate({"preservation": "ephemeral", "workspace_name": "x"}) + + +def test_demo_run_request_workspace_name_pattern_rejected(): + """E1 (#390) -- names violating the alias-style pattern are rejected.""" + with pytest.raises(ValidationError): + DemoRunRequest.model_validate({"preservation": "keep", "workspace_name": "Black Friday!"}) + with pytest.raises(ValidationError): + DemoRunRequest.model_validate({"preservation": "keep", "workspace_name": "-leading-dash"}) + + +def test_demo_run_request_rejects_unknown_preservation(): + """E1 (#390) -- preservation is a closed Literal; unknown values 422.""" + with pytest.raises(ValidationError): + DemoRunRequest.model_validate({"preservation": "archive"}) + + def test_step_event_json_round_trip(): event = StepEvent( event_type="step_complete", @@ -134,3 +179,5 @@ def test_demo_run_result_defaults(): assert result.winner_model_type is None assert result.winner_wape is None assert result.wall_clock_s == 0.0 + # E1 (#390) -- additive Optional field defaults to None (ephemeral runs). + assert result.workspace_id is None diff --git a/app/features/demo/tests/test_workspace.py b/app/features/demo/tests/test_workspace.py new file mode 100644 index 00000000..110254c4 --- /dev/null +++ b/app/features/demo/tests/test_workspace.py @@ -0,0 +1,166 @@ +"""Integration tests for the workspace persistence helpers (E1, #390). + +``create_workspace`` / ``finalize_workspace`` open their OWN sessions via +``get_session_maker()`` -- these tests exercise that real write path against +the docker-compose Postgres; the ``db_session`` fixture is used only to read +back and to wipe rows on teardown. +""" + +from __future__ import annotations + +from datetime import date + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from app.features.demo import workspace +from app.features.demo.models import ( + WORKSPACE_STATUS_COMPLETED, + WORKSPACE_STATUS_FAILED, + WORKSPACE_STATUS_RUNNING, +) +from app.features.demo.pipeline import DemoContext +from app.features.demo.schemas import DemoRunRequest +from app.shared.seeder.config import ScenarioPreset + +pytestmark = pytest.mark.integration + + +def _keep_request(**overrides: object) -> DemoRunRequest: + """Build a preservation='keep' request; keyword overrides win.""" + payload: dict[str, object] = { + "seed": 7, + "reset": False, + "skip_seed": True, + "preservation": "keep", + "workspace_name": "it-keep", + } + payload.update(overrides) + return DemoRunRequest.model_validate(payload) + + +def _finished_ctx() -> DemoContext: + """Build a DemoContext as a green showcase run would leave it.""" + ctx = DemoContext( + seed=7, + skip_seed=True, + reset=False, + scenario=ScenarioPreset.DEMO_MINIMAL, + ) + ctx.store_id = 7 + ctx.product_id = 3 + ctx.date_start = date(2026, 1, 1) + ctx.date_end = date(2026, 3, 31) + ctx.winner_model_type = "seasonal_naive" + ctx.winner_wape = 0.15 + ctx.winning_run_id = "run-abc123def456" + ctx.train_results = {"naive": {}, "seasonal_naive": {}, "moving_average": {}} + ctx.session_id = "sess-0123abcd" + return ctx + + +async def test_create_workspace_persists_config(db_session: AsyncSession) -> None: + """create_workspace inserts a running row carrying the request config.""" + workspace_id = await workspace.create_workspace(_keep_request()) + assert workspace_id is not None + + row = await workspace.get_workspace(db_session, workspace_id) + assert row is not None + assert row.status == WORKSPACE_STATUS_RUNNING + assert row.name == "it-keep" + assert row.seed == 7 + assert row.scenario == "demo_minimal" + assert row.reset is False + assert row.skip_seed is True + assert row.created_objects == {} + assert row.result_summary is None + + +async def test_finalize_workspace_completed(db_session: AsyncSession) -> None: + """finalize(failed=False) settles to completed with collected ids.""" + workspace_id = await workspace.create_workspace(_keep_request()) + assert workspace_id is not None + + await workspace.finalize_workspace( + workspace_id, _finished_ctx(), failed=False, wall_clock_s=12.5 + ) + + row = await workspace.get_workspace(db_session, workspace_id) + assert row is not None + assert row.status == WORKSPACE_STATUS_COMPLETED + assert row.store_id == 7 + assert row.product_id == 3 + assert row.date_start == date(2026, 1, 1) + assert row.date_end == date(2026, 3, 31) + assert row.created_objects["winning_run_id"] == "run-abc123def456" + assert row.created_objects["alias"] == "demo-production" + assert row.created_objects["agent_session_id"] == "sess-0123abcd" + assert row.created_objects["train_model_types"] == [ + "moving_average", + "naive", + "seasonal_naive", + ] + # None-valued accumulators are dropped, not stored as nulls. + assert "v2_run_id" not in row.created_objects + assert "batch_id" not in row.created_objects + assert row.result_summary == { + "winner_model_type": "seasonal_naive", + "winner_wape": 0.15, + "wall_clock_s": 12.5, + } + + +async def test_finalize_workspace_failed(db_session: AsyncSession) -> None: + """finalize(failed=True) settles to failed, still recording partial ids.""" + workspace_id = await workspace.create_workspace(_keep_request(workspace_name="it-fail")) + assert workspace_id is not None + + ctx = _finished_ctx() + ctx.winning_run_id = None # run died before register + ctx.winner_model_type = None + ctx.winner_wape = None + await workspace.finalize_workspace(workspace_id, ctx, failed=True, wall_clock_s=3.0) + + row = await workspace.get_workspace(db_session, workspace_id) + assert row is not None + assert row.status == WORKSPACE_STATUS_FAILED + assert "winning_run_id" not in row.created_objects + assert "alias" not in row.created_objects + # Partial state still recorded -- the agent session + trained models. + assert row.created_objects["agent_session_id"] == "sess-0123abcd" + assert row.created_objects["train_model_types"] == [ + "moving_average", + "naive", + "seasonal_naive", + ] + + +async def test_finalize_workspace_missing_id_is_noop(db_session: AsyncSession) -> None: + """Finalizing an unknown workspace_id neither raises nor inserts.""" + await workspace.finalize_workspace( + "deadbeef" * 4, _finished_ctx(), failed=False, wall_clock_s=1.0 + ) + rows = await workspace.list_workspaces(db_session) + assert rows == [] + + +async def test_list_workspaces_newest_first_limit_offset(db_session: AsyncSession) -> None: + """list_workspaces orders newest first and honours limit/offset.""" + ids: list[str] = [] + for index in range(3): + workspace_id = await workspace.create_workspace( + _keep_request(workspace_name=f"it-list-{index}") + ) + assert workspace_id is not None + ids.append(workspace_id) + + rows = await workspace.list_workspaces(db_session) + assert [r.workspace_id for r in rows] == list(reversed(ids)) + + page = await workspace.list_workspaces(db_session, limit=1, offset=1) + assert [r.workspace_id for r in page] == [ids[1]] + + +async def test_get_workspace_missing_returns_none(db_session: AsyncSession) -> None: + """get_workspace returns None for an unknown id.""" + assert await workspace.get_workspace(db_session, "0" * 32) is None diff --git a/app/features/demo/workspace.py b/app/features/demo/workspace.py new file mode 100644 index 00000000..44e8b475 --- /dev/null +++ b/app/features/demo/workspace.py @@ -0,0 +1,195 @@ +"""Showcase workspace persistence helpers (E1, issue #390). + +Create/finalize the ``showcase_workspace`` row a ``preservation="keep"`` demo +run records itself into. The write helpers open their OWN sessions via +``app.core.database.get_session_maker()`` -- ``run_pipeline`` is not +request-scoped, so no FastAPI dependency is available (precedent: the lifespan +config-override load in ``app/main.py`` and the agents websocket per-message +sessions). + +CONTRACT -- warn-and-continue: a workspace DB failure must NEVER break the +demo pipeline. :func:`create_workspace` returns ``None`` on any error; +:func:`finalize_workspace` swallows any error. Both log a structured warning +(pattern: the ``app/main.py`` lifespan config-override load). + +:func:`get_workspace` / :func:`list_workspaces` are unrouted in E1 -- consumed +by the integration tests now and by the E4 restore/replay routes later +(epic #393). +""" + +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING, Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import get_session_maker +from app.core.logging import get_logger +from app.features.demo.models import ( + WORKSPACE_STATUS_COMPLETED, + WORKSPACE_STATUS_FAILED, + ShowcaseWorkspace, +) +from app.features.demo.schemas import DemoRunRequest + +if TYPE_CHECKING: + # NOTE: pipeline imports this module at runtime; importing DemoContext + # eagerly here would close an import cycle. The type-only import is safe. + from app.features.demo.pipeline import DemoContext + +logger = get_logger(__name__) + + +async def create_workspace(req: DemoRunRequest) -> str | None: + """Insert a ``running`` workspace row for a ``preservation="keep"`` run. + + Args: + req: The validated demo run request (config recorded verbatim). + + Returns: + The new row's ``workspace_id``, or ``None`` when the insert failed + (warn-and-continue -- the pipeline proceeds without a workspace). + """ + workspace_id = uuid.uuid4().hex + try: + session_maker = get_session_maker() + async with session_maker() as db: + db.add( + ShowcaseWorkspace( + workspace_id=workspace_id, + name=req.workspace_name, + seed=req.seed, + scenario=req.scenario.value, + reset=req.reset, + skip_seed=req.skip_seed, + ) + ) + await db.commit() + except Exception as exc: # workspace must never break the demo + logger.warning( + "demo.workspace_create_failed", + error=str(exc), + error_type=type(exc).__name__, + ) + return None + logger.info("demo.workspace_created", workspace_id=workspace_id, name=req.workspace_name) + return workspace_id + + +def _collect_created_objects(ctx: DemoContext) -> dict[str, Any]: + """Map ``DemoContext`` accumulator fields to the ``created_objects`` JSONB. + + Every value is already a plain ``str`` / ``None`` on ``ctx`` (HTTP response + payloads). ``None`` and empty values are dropped so the JSONB stays sparse + and greppable. + """ + raw: dict[str, Any] = { + "winning_run_id": ctx.winning_run_id, + "v2_run_id": ctx.v2_run_id, + "v2_model_path": ctx.v2_model_path, + # Literal mirrors ``pipeline.DEMO_ALIAS`` -- importing pipeline here + # would close an import cycle (pipeline imports this module). + "alias": "demo-production" if ctx.winning_run_id else None, + "agent_session_id": ctx.session_id, + "batch_id": ctx.batch_id, + "scenario_plan_ids": [s for s in (ctx.price_cut_scenario_id, ctx.holiday_scenario_id) if s], + "scenario_artifact_key": ctx.scenario_artifact_key, + "train_model_types": sorted(ctx.train_results), + "stale_alias_run_id": ctx.stale_alias_run_id, + } + return {key: value for key, value in raw.items() if value not in (None, [])} + + +async def finalize_workspace( + workspace_id: str, + ctx: DemoContext, + *, + failed: bool, + wall_clock_s: float | None = None, +) -> None: + """Settle a workspace row to ``completed`` / ``failed`` with collected ids. + + Called by ``run_pipeline`` BEFORE the final ``pipeline_complete`` yield -- + including the mid-run-failure path, so a partial run still records what it + created. Finalizing a missing ``workspace_id`` (its create failed earlier) + is a silent no-op. + + Args: + workspace_id: The row to finalize (from :func:`create_workspace`). + ctx: The pipeline's cross-step accumulator. + failed: Whether any step failed. + wall_clock_s: Total pipeline wall-clock, recorded in ``result_summary``. + """ + try: + session_maker = get_session_maker() + async with session_maker() as db: + result = await db.execute( + select(ShowcaseWorkspace).where(ShowcaseWorkspace.workspace_id == workspace_id) + ) + row = result.scalar_one_or_none() + if row is None: # create failed earlier -- nothing to finalize + return + row.status = WORKSPACE_STATUS_FAILED if failed else WORKSPACE_STATUS_COMPLETED + row.store_id = ctx.store_id + row.product_id = ctx.product_id + row.date_start = ctx.date_start + row.date_end = ctx.date_end + row.created_objects = _collect_created_objects(ctx) + row.result_summary = { + "winner_model_type": ctx.winner_model_type, + "winner_wape": ctx.winner_wape, + "wall_clock_s": wall_clock_s, + } + await db.commit() + except Exception as exc: # workspace must never break the demo + logger.warning( + "demo.workspace_finalize_failed", + workspace_id=workspace_id, + error=str(exc), + error_type=type(exc).__name__, + ) + return + logger.info("demo.workspace_finalized", workspace_id=workspace_id, failed=failed) + + +async def get_workspace(db: AsyncSession, workspace_id: str) -> ShowcaseWorkspace | None: + """Load a workspace row by its external id. + + Args: + db: An open async session (caller-owned). + workspace_id: The external id to look up. + + Returns: + The row, or ``None`` when missing. + """ + result = await db.execute( + select(ShowcaseWorkspace).where(ShowcaseWorkspace.workspace_id == workspace_id) + ) + return result.scalar_one_or_none() + + +async def list_workspaces( + db: AsyncSession, + *, + limit: int = 50, + offset: int = 0, +) -> list[ShowcaseWorkspace]: + """List workspace rows, newest first (tie-broken by id, descending). + + Args: + db: An open async session (caller-owned). + limit: Maximum rows to return. + offset: Rows to skip from the newest end. + + Returns: + The matching rows, newest first. + """ + result = await db.execute( + select(ShowcaseWorkspace) + .order_by(ShowcaseWorkspace.created_at.desc(), ShowcaseWorkspace.id.desc()) + .limit(limit) + .offset(offset) + ) + return list(result.scalars().all()) diff --git a/docs/_base/API_CONTRACTS.md b/docs/_base/API_CONTRACTS.md index 27d75ea1..abcebd1a 100644 --- a/docs/_base/API_CONTRACTS.md +++ b/docs/_base/API_CONTRACTS.md @@ -58,7 +58,7 @@ All endpoints serve JSON; error responses use `application/problem+json` (RFC 78 | agents | WS | `/agents/stream` | Token-by-token streaming + tool-call events | | seeder | (see `app/features/seeder/routes.py`) | `/seeder/*` | Trigger scenarios, status, customization | | seeder | POST | `/seeder/phase2-enrichment` | PRP-38 — run Phase 2 generators (lifecycle, replenishment, exogenous, returns) against the existing seeded data. `422 application/problem+json` on an empty database. | -| demo | POST | `/demo/run` | Run the end-to-end demo pipeline in-process; returns a `DemoRunResult`. `409 application/problem+json` if a run is already active. **PRP-38** — body accepts an Optional `scenario: 'demo_minimal' \| 'showcase_rich' \| 'sparse'` field; default `'demo_minimal'` (back-compat). | +| demo | POST | `/demo/run` | Run the end-to-end demo pipeline in-process; returns a `DemoRunResult`. `409 application/problem+json` if a run is already active. **PRP-38** — body accepts an Optional `scenario: 'demo_minimal' \| 'showcase_rich' \| 'sparse'` field; default `'demo_minimal'` (back-compat). **E1 (#390)** — body accepts additive Optional `preservation: 'ephemeral' \| 'keep'` (default `'ephemeral'`, today's no-row behavior) and `workspace_name: str \| null` (pattern `^[a-z0-9][a-z0-9\-_]*$`, ≤100 chars); `workspace_name` without `preservation='keep'` → `422 application/problem+json`. `preservation='keep'` records the run as a `showcase_workspace` row; `DemoRunResult` gains an additive Optional `workspace_id: str \| null`. | | demo | WS | `/demo/stream` | Stream one `StepEvent` per pipeline step for the live Showcase page | | config | GET | `/config/ai` | Effective AI-model config (agent LLM + RAG embeddings); API keys masked, never raw | | config | PATCH | `/config/ai` | Persist + apply AI-model changes live (no restart). `409` if an embedding-dimension change would orphan indexed RAG chunks (resend with `force=true`) | @@ -83,12 +83,12 @@ Verified against `app/features/agents/websocket.py` and `app/features/agents/sch Drives the end-to-end demo pipeline for the dashboard Showcase page. Verified against `app/features/demo/routes.py` and `app/features/demo/schemas.py` (`StepEvent`). -- **Client → server (one start frame):** `{"seed": int, "reset": bool, "skip_seed": bool, "scenario"?: "demo_minimal" | "showcase_rich" | "sparse"}` — all fields optional (`DemoRunRequest` supplies defaults `seed=42`, `reset=false`, `skip_seed=true`, `scenario="demo_minimal"`). The pipeline runs once, then the server closes. +- **Client → server (one start frame):** `{"seed": int, "reset": bool, "skip_seed": bool, "scenario"?: "demo_minimal" | "showcase_rich" | "sparse", "preservation"?: "ephemeral" | "keep", "workspace_name"?: str}` — all fields optional (`DemoRunRequest` supplies defaults `seed=42`, `reset=false`, `skip_seed=true`, `scenario="demo_minimal"`, `preservation="ephemeral"`, `workspace_name=null`). E1 (#390) — `workspace_name` requires `preservation="keep"` (else one `error` event from validation); unknown start-frame keys remain ignored (forward/backward compat). The pipeline runs once, then the server closes. - **Server → client (every frame):** Pydantic-serialized `StepEvent` — `{"event_type", "step_name", "step_index", "total_steps", "status", "detail", "duration_ms", "data", "timestamp", "phase_name"?, "phase_index"?, "phase_total"?}`. PRP-38 — the three `phase_*` fields are Optional + Nullable so legacy clients that don't render phases keep working. - **`event_type` values (Literal in `StepEvent`):** - `step_start` — a step began; `status` is `null`. - `step_complete` — a step finished; `status ∈ {pass, fail, skip, warn}`, `data` carries structured payload (backtest `per_model` WAPE + `winner` + `bucketed_aggregated_metrics` on PRP-36/38 feature-aware runs; register `run_id` + `alias`; PRP-38 `v2_train` → `v2_run_id` + `feature_frame_version` + `feature_columns_count` + `feature_groups` + `artifact_uri_full`). - - `pipeline_complete` — final event; `data` carries `winner_model_type`, `winner_wape`, `winning_run_id`, `alias`, `wall_clock_s`, `v2_run_id` (PRP-38; null when no V2 run was registered). + - `pipeline_complete` — final event; `data` carries `winner_model_type`, `winner_wape`, `winning_run_id`, `alias`, `wall_clock_s`, `v2_run_id` (PRP-38; null when no V2 run was registered), and `workspace_id` (E1 #390; additive — a string on `preservation="keep"` runs, null otherwise). - `error` — bad start frame or a concurrent run already in progress; one event, then the server closes. - Concurrency: a module-level `asyncio.Lock` allows one pipeline at a time. A second `POST /demo/run` returns `409`; a second `WS /demo/stream` receives one `error` event. - PRP-38 — `scenario="showcase_rich"` extends the data phase with `phase2_enrichment` + `historical_backfill` steps and the modeling phase with `v2_train` (one V2 `prophet_like` run). Phase ids are `data` / `modeling` / `decision` / `verify` / `agent` / `cleanup` (6 phases).