diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml new file mode 100644 index 0000000..601fe76 --- /dev/null +++ b/.github/workflows/integration-tests.yml @@ -0,0 +1,80 @@ +name: Integration Tests + +on: + push: + branches: [main] + pull_request: + branches: [main] + workflow_dispatch: + +jobs: + # Parity check runs on every PR and push: confirms every scenario in + # www.hotdata.dev/api/test-scenarios.yaml has a matching test file in this + # repo. www.hotdata.dev is private, so we fetch via the GitHub App token — + # same pattern as regenerate.yml. + scenario-parity: + runs-on: ubuntu-latest + steps: + - name: Generate GitHub App token + id: app-token + uses: actions/create-github-app-token@f8d387b68d61c58ab83c6c016672934102569859 # v3 + with: + app-id: 3060111 + private-key: ${{ secrets.HOTDATA_AUTOMATION_PRIVATE_KEY }} + owner: hotdata-dev + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 + with: + python-version: '3.12' + - name: Install PyYAML + run: pip install --quiet pyyaml + - name: Fetch scenarios manifest + env: + GH_TOKEN: ${{ steps.app-token.outputs.token }} + run: | + curl -sS -f -L \ + -H "Accept: application/vnd.github.v3.raw" \ + -H "Authorization: Bearer $GH_TOKEN" \ + https://api.github.com/repos/hotdata-dev/www.hotdata.dev/contents/api/test-scenarios.yaml \ + -o test-scenarios.yaml + - name: Check parity + run: | + python3 - <<'PY' + import sys, pathlib, yaml + scenarios = yaml.safe_load(pathlib.Path("test-scenarios.yaml").read_text())["scenarios"] + missing = [] + for s in scenarios: + if "python" in (s.get("optional_for") or []): + continue + expected = pathlib.Path("tests/integration") / f"test_{s['name']}.py" + if not expected.exists(): + missing.append(str(expected)) + if missing: + print(f"::error::sdk-python is missing tests for {len(missing)} scenarios:") + for m in missing: + print(f" - {m}") + sys.exit(1) + print(f"All {len(scenarios)} scenarios have corresponding test files.") + PY + + # Integration tests run against production. Skipped automatically by the + # conftest if HOTDATA_SDK_TEST_API_KEY / HOTDATA_SDK_TEST_WORKSPACE_ID aren't + # set (e.g. PRs from forks where secrets aren't injected). + integration: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 + with: + python-version: '3.12' + - name: Install package and test deps + run: | + pip install --quiet -r requirements.txt -r test-requirements.txt + pip install --quiet -e . + - name: Run integration tests + env: + HOTDATA_SDK_TEST_API_URL: ${{ vars.HOTDATA_SDK_TEST_API_URL }} + HOTDATA_SDK_TEST_API_KEY: ${{ secrets.HOTDATA_SDK_TEST_API_KEY }} + HOTDATA_SDK_TEST_WORKSPACE_ID: ${{ vars.HOTDATA_SDK_TEST_WORKSPACE_ID }} + HOTDATA_SDK_TEST_CONNECTION_ID: ${{ vars.HOTDATA_SDK_TEST_CONNECTION_ID }} + run: pytest tests/integration -v diff --git a/.github/workflows/regenerate.yml b/.github/workflows/regenerate.yml index 157ad0a..c5643f8 100644 --- a/.github/workflows/regenerate.yml +++ b/.github/workflows/regenerate.yml @@ -110,6 +110,35 @@ jobs: # cd away from the source tree so the import resolves against the installed wheel. cd /tmp && python -c "import hotdata; print(hotdata.__version__)" + - name: Check integration test scenario parity + env: + GH_TOKEN: ${{ steps.app-token.outputs.token }} + run: | + curl -sS -f -L \ + -H "Accept: application/vnd.github.v3.raw" \ + -H "Authorization: Bearer $GH_TOKEN" \ + https://api.github.com/repos/hotdata-dev/www.hotdata.dev/contents/api/test-scenarios.yaml \ + -o test-scenarios.yaml + pip install --quiet pyyaml + python3 - <<'PY' + import sys, pathlib, yaml + scenarios = yaml.safe_load(pathlib.Path("test-scenarios.yaml").read_text())["scenarios"] + missing = [] + for s in scenarios: + if "python" in (s.get("optional_for") or []): + continue + expected = pathlib.Path("tests/integration") / f"test_{s['name']}.py" + if not expected.exists(): + missing.append(str(expected)) + if missing: + print(f"::warning::sdk-python is missing tests for {len(missing)} scenarios after regen:") + for m in missing: + print(f" - {m}") + else: + print(f"All {len(scenarios)} scenarios have corresponding test files.") + PY + rm -f test-scenarios.yaml + - name: Create PR uses: peter-evans/create-pull-request@c0f553fe549906ede9cf27b5156039d195d2ece0 # v8 with: diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..347003c --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,124 @@ +"""Shared fixtures for SDK integration tests. + +Tests run against production. See www.hotdata.dev/api/README.md for the +contract — env vars, naming conventions, blast-radius rules. +""" + +from __future__ import annotations + +import os +import uuid +from dataclasses import dataclass +from typing import Iterator + +import pytest + +from hotdata import ApiClient, Configuration +from hotdata.api.connections_api import ConnectionsApi +from hotdata.api.datasets_api import DatasetsApi +from hotdata.api.indexes_api import IndexesApi +from hotdata.api.saved_queries_api import SavedQueriesApi +from hotdata.api.secrets_api import SecretsApi +from hotdata.api.workspaces_api import WorkspacesApi + + +REQUIRED_ENV = ("HOTDATA_SDK_TEST_API_KEY", "HOTDATA_SDK_TEST_WORKSPACE_ID") +DEFAULT_API_URL = "https://api.hotdata.dev" + + +@dataclass(frozen=True) +class TestEnv: + api_url: str + api_key: str + workspace_id: str + connection_id: str | None + + +def _load_env() -> TestEnv: + missing = [name for name in REQUIRED_ENV if not os.environ.get(name)] + if missing: + pytest.skip( + "SDK integration tests require env vars: " + ", ".join(missing) + ) + # GitHub Actions sets `env:` keys even when the underlying secret/var is + # unset, producing empty strings rather than absent keys. Use `or` to fall + # back to the default for url and to None for the optional connection id. + return TestEnv( + api_url=os.environ.get("HOTDATA_SDK_TEST_API_URL") or DEFAULT_API_URL, + api_key=os.environ["HOTDATA_SDK_TEST_API_KEY"], + workspace_id=os.environ["HOTDATA_SDK_TEST_WORKSPACE_ID"], + connection_id=os.environ.get("HOTDATA_SDK_TEST_CONNECTION_ID") or None, + ) + + +@pytest.fixture(scope="session") +def env() -> TestEnv: + return _load_env() + + +@pytest.fixture(scope="session") +def api_client(env: TestEnv) -> Iterator[ApiClient]: + config = Configuration( + host=env.api_url, + api_key=env.api_key, + workspace_id=env.workspace_id, + ) + with ApiClient(config) as client: + yield client + + +@pytest.fixture(scope="session") +def workspace_id(env: TestEnv) -> str: + return env.workspace_id + + +@pytest.fixture(scope="session") +def connection_id(env: TestEnv) -> str: + if not env.connection_id: + pytest.skip("HOTDATA_SDK_TEST_CONNECTION_ID required for this scenario") + return env.connection_id + + +@pytest.fixture +def sdkci_name() -> "callable[[str], str]": + """Returns `sdkci--` so orphans are identifiable. + + See api/README.md — every test-created resource must use this prefix. + """ + + def _make(scenario: str) -> str: + return f"sdkci-{scenario}-{uuid.uuid4().hex[:8]}" + + return _make + + +# Per-API client fixtures keep tests one-liner short and avoid every test +# instantiating its own *Api(api_client). +@pytest.fixture +def datasets_api(api_client: ApiClient) -> DatasetsApi: + return DatasetsApi(api_client) + + +@pytest.fixture +def workspaces_api(api_client: ApiClient) -> WorkspacesApi: + return WorkspacesApi(api_client) + + +@pytest.fixture +def connections_api(api_client: ApiClient) -> ConnectionsApi: + return ConnectionsApi(api_client) + + +@pytest.fixture +def indexes_api(api_client: ApiClient) -> IndexesApi: + return IndexesApi(api_client) + + +@pytest.fixture +def saved_queries_api(api_client: ApiClient) -> SavedQueriesApi: + return SavedQueriesApi(api_client) + + +@pytest.fixture +def secrets_api(api_client: ApiClient) -> SecretsApi: + return SecretsApi(api_client) diff --git a/tests/integration/test_auth_missing_token_401.py b/tests/integration/test_auth_missing_token_401.py new file mode 100644 index 0000000..79c87ea --- /dev/null +++ b/tests/integration/test_auth_missing_token_401.py @@ -0,0 +1,27 @@ +"""Scenario: auth_missing_token_401. + +Calls without a bearer token return 401 with the documented ApiErrorResponse +shape. Uses an unauthenticated client built locally — does not touch the +session-scoped api_client. +""" + +from __future__ import annotations + +import pytest + +from hotdata import ApiClient, Configuration +from hotdata.api.workspaces_api import WorkspacesApi +from hotdata.exceptions import ApiException + + +def test_auth_missing_token_401(env) -> None: + config = Configuration(host=env.api_url) # no api_key, no workspace_id + with ApiClient(config) as client: + api = WorkspacesApi(client) + with pytest.raises(ApiException) as excinfo: + api.list_workspaces() + assert excinfo.value.status == 401, ( + f"expected 401 without bearer token, got {excinfo.value.status}" + ) + body = excinfo.value.body or "" + assert body, "expected non-empty error body on 401" diff --git a/tests/integration/test_auth_unknown_workspace.py b/tests/integration/test_auth_unknown_workspace.py new file mode 100644 index 0000000..3f53f5e --- /dev/null +++ b/tests/integration/test_auth_unknown_workspace.py @@ -0,0 +1,32 @@ +"""Scenario: auth_unknown_workspace. + +A valid bearer token combined with a fabricated workspace id (random UUID) must +return a 4xx error and never leak data from another workspace. Server may +respond 403 (forbidden) or 404 (not found) — both are acceptable. +""" + +from __future__ import annotations + +import uuid + +import pytest + +from hotdata import ApiClient, Configuration +from hotdata.api.datasets_api import DatasetsApi +from hotdata.exceptions import ApiException + + +def test_auth_unknown_workspace(env) -> None: + fake_workspace = f"ws_{uuid.uuid4().hex}" + config = Configuration( + host=env.api_url, + api_key=env.api_key, + workspace_id=fake_workspace, + ) + with ApiClient(config) as client: + api = DatasetsApi(client) + with pytest.raises(ApiException) as excinfo: + api.list_datasets() + assert excinfo.value.status in (403, 404), ( + f"expected 403/404 for fabricated workspace, got {excinfo.value.status}" + ) diff --git a/tests/integration/test_connections_read.py b/tests/integration/test_connections_read.py new file mode 100644 index 0000000..abdb832 --- /dev/null +++ b/tests/integration/test_connections_read.py @@ -0,0 +1,29 @@ +"""Scenario: connections_read. + +Read-only lifecycle ops on the seeded connection — get, list, health check, +and cache purge. Does not create or delete connections in prod (would require +real datastore credentials in CI secrets). +""" + +from __future__ import annotations + +from hotdata.api.connections_api import ConnectionsApi + + +def test_connections_read(connections_api: ConnectionsApi, connection_id: str) -> None: + detail = connections_api.get_connection(connection_id) + assert detail.id == connection_id + assert detail.source_type + assert detail.name + + listing = connections_api.list_connections() + assert any(c.id == connection_id for c in listing.connections), ( + f"seeded connection {connection_id} not in list_connections" + ) + + health = connections_api.check_connection_health(connection_id) + assert health.connection_id == connection_id + assert health.healthy, f"seeded connection unhealthy: {health.error}" + + # purge_connection_cache returns None on success. + connections_api.purge_connection_cache(connection_id) diff --git a/tests/integration/test_dataset_versioning.py b/tests/integration/test_dataset_versioning.py new file mode 100644 index 0000000..681970f --- /dev/null +++ b/tests/integration/test_dataset_versioning.py @@ -0,0 +1,56 @@ +"""Scenario: dataset_versioning. + +Create a dataset, exercise list_dataset_versions, pin to a specific version, +then unpin. Confirms the versioning surface is reachable and consistent. +""" + +from __future__ import annotations + +from hotdata.api.datasets_api import DatasetsApi +from hotdata.exceptions import ApiException +from hotdata.models.create_dataset_request import CreateDatasetRequest +from hotdata.models.dataset_source import DatasetSource +from hotdata.models.inline_data import InlineData +from hotdata.models.inline_dataset_source import InlineDatasetSource +from hotdata.models.update_dataset_request import UpdateDatasetRequest + + +def _inline_csv_source() -> DatasetSource: + return DatasetSource( + InlineDatasetSource( + inline=InlineData(content="a,b\n1,2\n3,4\n", format="csv") + ) + ) + + +def test_dataset_versioning(datasets_api: DatasetsApi, sdkci_name) -> None: + label = sdkci_name("dataset-versioning") + created_id: str | None = None + + try: + created = datasets_api.create_dataset( + CreateDatasetRequest(label=label, source=_inline_csv_source()) + ) + created_id = created.id + + versions = datasets_api.list_dataset_versions(created.id) + assert versions.dataset_id == created.id + assert versions.count >= 1 + assert any(v.version == 1 for v in versions.versions), ( + f"expected version 1 in {[v.version for v in versions.versions]}" + ) + + pinned = datasets_api.update_dataset( + created.id, UpdateDatasetRequest(pinned_version=1) + ) + assert pinned.pinned_version == 1 + assert pinned.latest_version >= 1 + + fetched = datasets_api.get_dataset(created.id) + assert fetched.pinned_version == 1 + finally: + if created_id is not None: + try: + datasets_api.delete_dataset(created_id) + except ApiException: + pass diff --git a/tests/integration/test_datasets_crud.py b/tests/integration/test_datasets_crud.py new file mode 100644 index 0000000..98cab62 --- /dev/null +++ b/tests/integration/test_datasets_crud.py @@ -0,0 +1,72 @@ +"""Scenario: datasets_crud. + +Defined in www.hotdata.dev/api/test-scenarios.yaml — create, read, list, +update, and delete a dataset; assert 404 after delete. +""" + +from __future__ import annotations + +import pytest + +from hotdata.api.datasets_api import DatasetsApi +from hotdata.exceptions import ApiException +from hotdata.models.create_dataset_request import CreateDatasetRequest +from hotdata.models.dataset_source import DatasetSource +from hotdata.models.inline_data import InlineData +from hotdata.models.inline_dataset_source import InlineDatasetSource +from hotdata.models.update_dataset_request import UpdateDatasetRequest + + +def _inline_csv_source() -> DatasetSource: + return DatasetSource( + InlineDatasetSource( + inline=InlineData( + content="a,b\n1,2\n3,4\n", + format="csv", + ) + ) + ) + + +def test_datasets_crud(datasets_api: DatasetsApi, sdkci_name) -> None: + label = sdkci_name("datasets-crud") + new_label = f"{label}-renamed" + created_id: str | None = None + + try: + created = datasets_api.create_dataset( + CreateDatasetRequest(label=label, source=_inline_csv_source()) + ) + created_id = created.id + assert created.label == label + assert created.id + + fetched = datasets_api.get_dataset(created.id) + assert fetched.id == created.id + assert fetched.label == label + assert fetched.columns, "expected inferred columns from inline CSV" + + listing = datasets_api.list_datasets() + assert any(d.id == created.id for d in listing.datasets), ( + f"newly created dataset {created.id} not present in list_datasets" + ) + + updated = datasets_api.update_dataset( + created.id, UpdateDatasetRequest(label=new_label) + ) + assert updated.label == new_label + + datasets_api.delete_dataset(created.id) + created_id = None # successful delete — skip teardown + + with pytest.raises(ApiException) as excinfo: + datasets_api.get_dataset(created.id) + assert excinfo.value.status == 404, ( + f"expected 404 after delete, got {excinfo.value.status}" + ) + finally: + if created_id is not None: + try: + datasets_api.delete_dataset(created_id) + except ApiException: + pass diff --git a/tests/integration/test_query_async_polling.py b/tests/integration/test_query_async_polling.py new file mode 100644 index 0000000..fbf46df --- /dev/null +++ b/tests/integration/test_query_async_polling.py @@ -0,0 +1,89 @@ +"""Scenario: query_async_polling. + +Submit a query asynchronously, poll get_query_run until terminal status, fetch +results, and verify list_query_runs / list_results surface the run. +""" + +from __future__ import annotations + +import time + +import pytest + +from hotdata.api.query_api import QueryApi +from hotdata.api.query_runs_api import QueryRunsApi +from hotdata.api.results_api import ResultsApi +from hotdata.models.query_request import QueryRequest + + +TERMINAL_STATUSES = {"succeeded", "failed", "cancelled"} +POLL_TIMEOUT_S = 60.0 +POLL_INTERVAL_S = 1.0 + + +@pytest.fixture +def query_api(api_client) -> QueryApi: + return QueryApi(api_client) + + +@pytest.fixture +def query_runs_api(api_client) -> QueryRunsApi: + return QueryRunsApi(api_client) + + +@pytest.fixture +def results_api(api_client) -> ResultsApi: + return ResultsApi(api_client) + + +def test_query_async_polling( + query_api: QueryApi, + query_runs_api: QueryRunsApi, + results_api: ResultsApi, +) -> None: + # async=True with a small async_after_ms forces the run to come back as + # AsyncQueryResponse rather than synchronous. The QueryResponse / async + # response variants are union-shaped on the client; we treat anything with + # query_run_id as the start of the polling loop. + submitted = query_api.query( + QueryRequest(var_async=True, async_after_ms=1000, sql="SELECT 1 AS x") + ) + query_run_id = submitted.query_run_id + assert query_run_id + + deadline = time.monotonic() + POLL_TIMEOUT_S + run = None + while time.monotonic() < deadline: + run = query_runs_api.get_query_run(query_run_id) + if run.status in TERMINAL_STATUSES: + break + time.sleep(POLL_INTERVAL_S) + assert run is not None + assert run.status in TERMINAL_STATUSES, ( + f"query {query_run_id} did not reach terminal status within " + f"{POLL_TIMEOUT_S}s; last status was {run.status if run else 'None'}" + ) + assert run.status == "succeeded", ( + f"expected succeeded, got {run.status}: {run.error_message}" + ) + assert run.row_count == 1 + + runs_listing = query_runs_api.list_query_runs(limit=50) + assert any(r.id == query_run_id for r in runs_listing.query_runs), ( + f"query run {query_run_id} not surfaced by list_query_runs" + ) + + if run.result_id: + result = results_api.get_result(run.result_id) + assert result.result_id == run.result_id + assert result.status in {"ready", "processing"} + if result.status == "ready": + assert result.row_count == 1 + assert result.rows == [[1]] + + # ResultInfo (list_results) exposes the id as `id`, not `result_id` — + # only GetResultResponse uses `result_id`. + results_listing = results_api.list_results(limit=50) + assert any(r.id == run.result_id for r in results_listing.results), ( + f"result {run.result_id} not surfaced by list_results" + ) diff --git a/tests/integration/test_saved_query_versioning.py b/tests/integration/test_saved_query_versioning.py new file mode 100644 index 0000000..61a875a --- /dev/null +++ b/tests/integration/test_saved_query_versioning.py @@ -0,0 +1,61 @@ +"""Scenario: saved_query_versioning. + +Create a saved query, update its SQL twice, confirm list_saved_query_versions +reflects the edits, and confirm execute_saved_query runs the latest SQL. +""" + +from __future__ import annotations + +from hotdata.api.saved_queries_api import SavedQueriesApi +from hotdata.exceptions import ApiException +from hotdata.models.create_saved_query_request import CreateSavedQueryRequest +from hotdata.models.update_saved_query_request import UpdateSavedQueryRequest + + +def test_saved_query_versioning( + saved_queries_api: SavedQueriesApi, sdkci_name +) -> None: + name = sdkci_name("savedq-versioning") + created_id: str | None = None + + try: + created = saved_queries_api.create_saved_query( + CreateSavedQueryRequest( + name=name, + sql="SELECT 1 AS x", + description="sdkci versioning test", + ) + ) + created_id = created.id + assert created.latest_version == 1 + assert created.sql == "SELECT 1 AS x" + + v2 = saved_queries_api.update_saved_query( + created.id, UpdateSavedQueryRequest(sql="SELECT 2 AS x") + ) + assert v2.latest_version == 2 + assert v2.sql == "SELECT 2 AS x" + + v3 = saved_queries_api.update_saved_query( + created.id, UpdateSavedQueryRequest(sql="SELECT 3 AS x") + ) + assert v3.latest_version == 3 + assert v3.sql == "SELECT 3 AS x" + + versions = saved_queries_api.list_saved_query_versions(created.id) + assert versions.saved_query_id == created.id + assert versions.count >= 3 + version_numbers = {v.version for v in versions.versions} + assert {1, 2, 3}.issubset(version_numbers), ( + f"expected versions 1,2,3 in {sorted(version_numbers)}" + ) + + executed = saved_queries_api.execute_saved_query(created.id) + assert executed.row_count == 1 + assert executed.rows == [[3]] + finally: + if created_id is not None: + try: + saved_queries_api.delete_saved_query(created_id) + except ApiException: + pass diff --git a/tests/integration/test_secrets_crud.py b/tests/integration/test_secrets_crud.py new file mode 100644 index 0000000..de05d2e --- /dev/null +++ b/tests/integration/test_secrets_crud.py @@ -0,0 +1,69 @@ +"""Scenario: secrets_crud. + +Create, read, update, delete a secret. Critically: confirm that get_secret / +list_secrets never echo the plaintext value back. The SDK's typed response +model has no `value` field, but we also defensively scan the raw payload to +catch a server-side regression where the value leaks through. +""" + +from __future__ import annotations + +import json + +import pytest + +from hotdata.api.secrets_api import SecretsApi +from hotdata.exceptions import ApiException +from hotdata.models.create_secret_request import CreateSecretRequest +from hotdata.models.update_secret_request import UpdateSecretRequest + + +def test_secrets_crud(secrets_api: SecretsApi, sdkci_name) -> None: + # Server normalizes secret names — pass through whatever sdkci_name returns. + secret_name = sdkci_name("secrets-crud").replace("-", "_") + initial_value = "INITIAL_PLAINTEXT_VALUE_DO_NOT_LEAK" + updated_value = "UPDATED_PLAINTEXT_VALUE_DO_NOT_LEAK" + created = False + + try: + create_resp = secrets_api.create_secret( + CreateSecretRequest(name=secret_name, value=initial_value) + ) + created = True + assert create_resp.name == secret_name + # Defensive: typed model shouldn't have a `value` field, but if a + # server-side regression added one, dump and check. + dumped = json.dumps(create_resp.to_dict(), default=str) + assert initial_value not in dumped, "create_secret response leaked plaintext value" + + got = secrets_api.get_secret(secret_name) + assert got.name == secret_name + assert initial_value not in json.dumps(got.to_dict(), default=str), ( + "get_secret response leaked plaintext value" + ) + + listing = secrets_api.list_secrets() + names = [s.name for s in listing.secrets] + assert secret_name in names + assert initial_value not in json.dumps(listing.to_dict(), default=str), ( + "list_secrets response leaked plaintext value" + ) + + secrets_api.update_secret(secret_name, UpdateSecretRequest(value=updated_value)) + + got2 = secrets_api.get_secret(secret_name) + assert updated_value not in json.dumps(got2.to_dict(), default=str) + assert initial_value not in json.dumps(got2.to_dict(), default=str) + + secrets_api.delete_secret(secret_name) + created = False + + with pytest.raises(ApiException) as excinfo: + secrets_api.get_secret(secret_name) + assert excinfo.value.status == 404 + finally: + if created: + try: + secrets_api.delete_secret(secret_name) + except ApiException: + pass diff --git a/tests/integration/test_workspaces_list.py b/tests/integration/test_workspaces_list.py new file mode 100644 index 0000000..b4745ec --- /dev/null +++ b/tests/integration/test_workspaces_list.py @@ -0,0 +1,19 @@ +"""Scenario: workspaces_list. + +List workspaces and confirm the seeded HOTDATA_TEST_WORKSPACE_ID is present. +Read-only — never creates or deletes workspaces against prod. +""" + +from __future__ import annotations + +from hotdata.api.workspaces_api import WorkspacesApi + + +def test_workspaces_list(workspaces_api: WorkspacesApi, workspace_id: str) -> None: + response = workspaces_api.list_workspaces() + assert response.ok + found = [w for w in response.workspaces if w.public_id == workspace_id] + assert found, ( + f"expected seeded workspace {workspace_id} in list, got " + f"{[w.public_id for w in response.workspaces]}" + )